Source code for snipar.lmm

import numpy as np
from scipy.optimize import fmin_l_bfgs_b

[docs]class model(object): """Define a linear model with within-class correlations. Args: y : :class:`~numpy:numpy.array` 1D array of phenotype observations X : :class:`~numpy:numpy.array` Design matrix for the fixed mean effects. labels : :class:`~numpy:numpy.array` 1D array of sample labels Returns: model : :class:`snipar.model` """ def __init__(self, y, X, labels, add_intercept=False): if y.shape[0] == X.shape[0] and X.shape[0] == labels.shape[0]: pass else: raise (ValueError('inconsistent sample sizes of response, covariates, and labels')) # Get sample size self.n = X.shape[0] if X.ndim == 1: X = X.reshape((self.n, 1)) if add_intercept: X = np.hstack((np.ones((self.n, 1), dtype=X.dtype), X)) self.X = X # Label mapping self.label_counts = dict() self.label_indices = dict() for l in range(0, labels.shape[0]): if labels[l] not in self.label_counts: self.label_counts[labels[l]] = 1 self.label_indices[labels[l]] = [l] else: self.label_counts[labels[l]] += 1 self.label_indices[labels[l]].append(l) self.y_lab = dict() self.X_lab = dict() for label in self.label_indices.keys(): self.y_lab[label] = y[self.label_indices[label]] self.X_lab[label] = X[self.label_indices[label], :] self.n_labels = len(self.y_lab.keys()) # response self.y = y self.labels = labels
[docs] def alpha_mle(self, tau, sigma2, compute_cov=False, xtx_out=False): """ Compute the MLE of alpha given variance parameters Args: sigma2 : :class:`float` variance of model residuals tau : :class:`float` ratio of variance of model residuals to variance explained by mean differences between classes Returns: alpha : :class:`~numpy:numpy.array` MLE of alpha """ X_T_X = np.zeros((self.X.shape[1], self.X.shape[1]), dtype=np.float64) X_T_y = np.zeros((self.X.shape[1]), dtype=np.float64) for label in self.y_lab.keys(): sigma_u = sigma2 / tau Sigma_lab = sigma_u * np.ones((self.label_counts[label], self.label_counts[label])) np.fill_diagonal(Sigma_lab, sigma_u + sigma2) Sigma_lab_inv = np.linalg.inv(Sigma_lab) X_T_X = X_T_X + np.dot(self.X_lab[label].T, Sigma_lab_inv.dot(self.X_lab[label])) X_T_y = X_T_y + np.dot(self.X_lab[label].T, Sigma_lab_inv.dot(self.y_lab[label])) if xtx_out: return [X_T_X, X_T_y.reshape((self.X.shape[1]))] else: alpha = np.linalg.solve(X_T_X, X_T_y) alpha = alpha.reshape((alpha.shape[0],)) if compute_cov: alpha_cov = np.linalg.inv(X_T_X) return [alpha, alpha_cov] else: return alpha
# Compute likelihood of data given beta, alpha
[docs] def likelihood_and_gradient(self, sigma2, tau): """ Compute the loss function, which is -2 times the likelihood along with its gradient Args: sigma2 : :class:`float` variance of model residuals tau : :class:`float` ratio of variance of model residuals to variance explained by mean differences between classes Returns: L, grad : :class:`float` loss function and gradient, divided by sample size """ ## Likelihood alpha = self.alpha_mle(tau, sigma2) resid = self.y - self.X.dot(alpha) RSS = np.sum(np.square(resid)) L = self.n * np.log(sigma2) + RSS / sigma2 ## Gradient with respect to sigma2 grad_sigma2 = self.n / sigma2 - RSS / np.square(sigma2) ## Gradient with respect to tau grad_tau = 0 for label in self.y_lab.keys(): resid_label = resid[self.label_indices[label]] resid_sum = np.sum(resid_label) resid_square_sum = np.square(resid_sum) # Add to likelihood L = L - resid_square_sum / (sigma2 * (tau + self.label_counts[label])) + np.log( 1 + self.label_counts[label] / tau) # Add to grad sigma2 grad_sigma2 += resid_square_sum / (np.square(sigma2) * (tau + self.label_counts[label])) # Add to grad tau grad_tau += (resid_square_sum / sigma2 - self.label_counts[label] * ( 1 + self.label_counts[label] / tau)) / np.square(tau + self.label_counts[label]) # Overall gradient vector grad = np.hstack((grad_sigma2, grad_tau)) return L / self.n, grad / self.n
[docs] def optimize_model(self, init_params): """ Find the parameters that minimise the loss function for a given regularisation parameter Args: init_param : :class:`array` initial values for residual variance (sigma^2_epsilon) followed by ratio of residual variance to within-class variance (tau) Returns: optim : :class:`dict` dictionary with keys: 'success', whether optimisation was successful (bool); 'warnflag', output of L-BFGS-B algorithm giving warnings; 'sigma2', MLE of residual variance; 'tau', MLE of ratio of residual variance to within-class variance; 'likelihood', maximum of likelihood. """ # Paramtere boundaries parbounds = [(0.00001, None), (0.00001, None)] # Optimize optimized = fmin_l_bfgs_b(func=lik_and_grad, x0=init_params, args=(self.y, self.X, self.labels), bounds=parbounds) # Get MLE optim = {} optim['success'] = True optim['warnflag'] = optimized[2]['warnflag'] if optim['warnflag'] != 0: print('Optimization unsuccessful.') optim['success'] = False optim['sigma2'] = optimized[0][0] optim['tau'] = optimized[0][1] # Get parameter covariance optim['likelihood'] = -0.5 * np.float64(self.n) * (optimized[1] + np.log(2 * np.pi)) return optim
[docs] def sigma_inv_root(self, tau, sigma2): sigma_u = sigma2 / tau sigma2_nsqrt = dict() famsizes = np.unique(list(self.label_counts.values())) sigma2_nsqrt[1] = np.power(sigma_u + sigma2, -0.5) famsizes = famsizes[famsizes > 1] for famsize in famsizes: Sigma_lab = sigma_u * np.ones((famsize, famsize)) np.fill_diagonal(Sigma_lab, sigma_u + sigma2) vals, vectors = np.linalg.eigh(Sigma_lab) vals = np.power(vals, 0.25) vectors = vectors / vals sigma2_nsqrt[famsize] = vectors.dot(vectors.T) return sigma2_nsqrt
[docs] def predict(self, X): """ Predict new observations based on model regression coefficients Args: X : :class:`array` matrix of covariates to predict from Returns: y : :class:`array` predicted values """ if hasattr(self, 'alpha'): return X.dot(self.alpha) else: raise (AttributeError('Model does not have known regression coefficients. Try optimizing model first'))
[docs] def set_alpha(self, alpha): self.alpha = alpha
[docs]def lik_and_grad(pars, *args): # Wrapper for function to pass to L-BFGS-B y, X, labels = args mod = model(y, X, labels) return mod.likelihood_and_gradient(pars[0], pars[1])
[docs]def simulate(n, alpha, sigma2, tau): """Simulate from a linear model with correlated observations within-class. The mean for each class is drawn from a normal distribution. Args: n : :class:`int` sample size alpha : :class:`~numpy:numpy.array` value of regression coefficeints sigma2 : :class:`float` variance of residuals tau : :class:`float` ratio of variance of residuals to variance of distribution of between individual means Returns: model : :class:`regrnd.model` linear model with repeated observations """ c = alpha.shape[0] # X = np.random.randn((n * c)).reshape((n, c)) X_cov = np.ones((c, c)) np.fill_diagonal(X_cov, 1.2) X = np.random.multivariate_normal(np.zeros((c)), X_cov, n).reshape((n, c)) labels = np.random.choice(n // 10, n) random_effects = np.sqrt(sigma2 // tau) * np.random.randn(n) y = X.dot(alpha) + random_effects[labels - 1] + np.random.randn(n) * np.sqrt(sigma2) return model(y, X, labels)
[docs]def fit_model(y, X, fam_labels, add_intercept=False, tau_init=1, return_model=True, return_vcomps=True, return_fixed=True): """Compute the MLE for the fixed effects in a family-based linear mixed model. Args: y : :class:`~numpy:numpy.array` vector of phenotype values X: :class:`~numpy:numpy.array` regression design matrix for fixed effects fam_labels : :class:`~numpy:numpy.array` vector of family labels: residual correlations in y are modelled between family members (that share a fam_label) add_intercept : :class:`bool` whether to add an intercept to the fixed effect design matrix Returns: model : :class:`snipar.model` the snipar model object, if return_model=True vcomps: :class:`float` the MLEs for the variance parameters: sigma2 (residual variance) and tau (ratio between sigma2 and family variance), if return_vcomps=True alpha : :class:`~numpy:numpy.array` MLE of fixed effects, if return_fixed=True alpha_cov : :class:`~numpy:numpy.array` sampling variance-covariance matrix for MLE of fixed effects, if return_fixed=True """ # Optimize model sigma_2_init = np.var(y)*tau_init/(1+tau_init) null_model = model(y, X, fam_labels, add_intercept=add_intercept) null_optim = null_model.optimize_model(np.array([sigma_2_init,tau_init])) # Create return list return_list = [] if return_model: return_list.append(null_model) if return_vcomps: return_list += [null_optim['sigma2'], null_optim['tau']] if return_fixed: return_list += null_model.alpha_mle(null_optim['tau'], null_optim['sigma2'], compute_cov=True) return return_list