import gzip
# import logging
from functools import lru_cache
from operator import attrgetter
from typing import Tuple, NamedTuple, Sequence
from typing_extensions import Literal
from itertools import product, combinations_with_replacement
from collections import defaultdict
import numpy as np
import pandas as pd
from scipy.optimize import minimize, OptimizeResult
from numpy.linalg import slogdet, solve, inv
from scipy.linalg import cho_factor, cho_solve
from scipy.sparse import csc_matrix, tril
from scipy.sparse.linalg import splu, SuperLU, spsolve, cg
from snipar.types import FamLabels, SparseGRMRepr, Ids, IdDict
from snipar.gtarray import gtarray
# logger = logging.getLogger(__name__)
[docs]
def build_sib_arr(fam_labels: FamLabels) -> SparseGRMRepr:
"""Build lower-triangular nonzero entries of sibship matrix.
Args:
fam_labels (FamLabels): ndarray of family id strings corresponding to each individual.
Returns:
SparseGRMRepr: sparse GRM representation.
"""
# logger.info('Building sibship arr...')
data = []
row_ind = []
col_ind = []
label_indices = defaultdict(list)
for l, f in enumerate(fam_labels):
label_indices[f].append(l)
f = lambda lst: len(lst) * (len(lst) + 1) / 2
n = sum(map(f, label_indices.values()))
# logger.info('Done creating label_indices. ' + str(len(label_indices)) + ' ' + str(int(n)))
for f, indices_lst in label_indices.items():
for pair in combinations_with_replacement(indices_lst, 2):
ind1, ind2 = max(pair[0], pair[1]), min(pair[0], pair[1])
data.append(1)
row_ind.append(ind1)
col_ind.append(ind2)
# logger.info(f'Done building sibship arr. nnz={len(data)}')
return np.array(data), np.array(row_ind, dtype='uint32'), np.array(col_ind, dtype='uint32')
[docs]
def build_ibdrel_arr(ibdrel_path: str, id_dict: IdDict,
keep: Ids,
thres: float = 0.05) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Build ibd relatedness array (lower triangular entries) and corresponding indices from KING ibdseg output.
Args:
ibdrel_path (str): Path to ibdseg output.
id_dict (IdDict): dictionary of id-index pairs.
keep (Ids): list of ids to keep.
ignore_sib (bool): whether to set sibling entries to 0.
thres (float, optional): sparsity threshold. Defaults to 0.0205.
Returns:
SparseGRMRepr: sparse GRM representation.
"""
# Determine delimiter and read header
with open(ibdrel_path, 'r') as file:
first_line = file.readline()
delimiter = '\t' if '\t' in first_line else ' '
header = first_line.split(delimiter)
header[-1] = header[-1].strip() # Remove newline character
if not 'ID1' in header and 'ID2' in header:
raise ValueError('GRM should have ID1 and ID2 columns.')
if 'PropIBD' in header:
relcol = 'PropIBD'
elif 'relatedness' in header:
relcol = 'relatedness'
else:
raise ValueError('GRM should have PropIBD or relatedness column.')
king = pd.read_csv(ibdrel_path,
sep=delimiter)[['ID1', 'ID2',relcol]]
king['ID1'] = king['ID1'].astype(str)
king['ID2'] = king['ID2'].astype(str)
# filter out IDs that are not in keep
# logger.info('Filtering...')
king = king.query('ID1 in @keep & ID2 in @keep & PropIBD > @thres')
king = king.reset_index(drop=True)
n = king.shape[0]
# the additional len(keep) entries are for diagonal terms
data = np.zeros(n + len(keep))
row_ind = np.zeros(n + len(keep), dtype=int)
col_ind = np.zeros(n + len(keep), dtype=int)
for row in king.itertuples():
id1 = row.ID1
id2 = row.ID2
if row.PropIBD > 0.8:
raise ValueError(f'Impossible ibd relatedness {row.PropIBD} for pair ({id1}, {id2}). MZ pairs should be removed.')
ind1, ind2 = id_dict[id1], id_dict[id2]
ind1, ind2 = max(ind1, ind2), min(ind1, ind2)
# data[row.Index] = 0. if ignore_sib and row.InfType == 'FS' else row.PropIBD # if row.PropIBD > thres else 0.
data[row.Index] = row.PropIBD # if row.PropIBD > thres else 0.
if ind1 == ind2: raise RuntimeError('ind1 and ind2 cannot equal')
# data[row.Index] = min(0.5, row.PropIBD) # if row.PropIBD > thres else 0.
row_ind[row.Index] = ind1
col_ind[row.Index] = ind2
for i in range(len(keep)):
data[n + i] = 1.
row_ind[n + i] = i
col_ind[n + i] = i
# logger.info('Done building ibd arr.')
return np.array(data), np.array(row_ind, dtype='uint32'), np.array(col_ind, dtype='uint32')
[docs]
def build_grm_arr(grm_path: str, id_dict: IdDict,
thres: float) -> SparseGRMRepr:
"""Build GRM data array and corresponding indices.
"""
gcta_id = dict()
row_n = 1 # index starts from 1 in gcta grm output
with open(f'{grm_path}.grm.id', 'r') as f:
for line in f:
id = line.strip().split('\t')[1]
gcta_id[row_n] = id
row_n += 1
n = len(id_dict)
# grm_arr = np.full(int(n * (n + 1) / 2), np.nan)
data = []
row_ind = []
col_ind = []
# logger.info('Building grm...')
with gzip.open(f'{grm_path}.grm.gz', 'rt') as f:
for line in f:
id1, id2, _, gr = line.strip().split('\t')
gr_: float = float(gr)
if gr_ < thres:
# ignore relatedness coeffs less than thres
continue
id1, id2 = gcta_id[int(id1)], gcta_id[int(id2)]
if id1 not in id_dict or id2 not in id_dict:
# ignore individuals that are not in id_dict
continue
ind1, ind2 = id_dict[id1], id_dict[id2]
ind1, ind2 = max(ind1, ind2), min(ind1, ind2)
data.append(gr_)
row_ind.append(ind1)
col_ind.append(ind2)
# logger.info(f'Done building grm. nnz={len(data)}')
return np.array(data), np.array(row_ind, dtype='uint32'), np.array(col_ind, dtype='uint32')
[docs]
def match_grm_ids(ids: Ids,
fam_labels: FamLabels,
grm_path: str,
grm_source: Literal['gcta', 'ibdrel'],
) -> Tuple[np.ndarray, np.ndarray]:
"""Match ids with GRM individual ids.
"""
orig_ids = pd.DataFrame({'ids': ids, 'fam_labels': fam_labels})
if grm_source == 'gcta':
grm_ids = pd.read_csv(f'{grm_path}.grm.id', sep='\s+', names=[
'_', 'ids_'], dtype={'_': str, 'ids_': str})[['ids_']]
elif grm_source == 'ibdrel':
grm_ids = pd.read_csv(grm_path,
sep='\t')[['ID1', 'ID2']]
id1 = grm_ids['ID1'].to_numpy(dtype=str)
id2 = grm_ids['ID2'].to_numpy(dtype=str)
grm_ids = pd.DataFrame({'ids_': np.union1d(id1, id2)})
else:
raise ValueError('Incorrect source.')
orig_ids = orig_ids.merge(grm_ids, how='inner',
left_on='ids', right_on='ids_')
return orig_ids['ids'].to_numpy(), orig_ids['fam_labels'].to_numpy()
[docs]
class GradHessComponents(NamedTuple):
P_y: np.ndarray
P_varcomp_mats: Tuple[np.ndarray, ...]
# cache class attribute calculation dependent on another attribute
# https://stackoverflow.com/questions/48262273/python-bookkeeping-dependencies-in-cached-attributes-that-might-change#answer-48264126
[docs]
def cached_property_depends_on(*args):
attrs = attrgetter(*args)
def decorator(func):
_cache = lru_cache(maxsize=None)(lambda self, _: func(self))
def _with_tracked(self):
return _cache(self, attrs(self))
return property(_with_tracked, doc=func.__doc__)
return decorator
[docs]
class LinearMixedModel:
"""
Wrapper of data and functions that compute estimates of variance components or SNP effects.
Core functions: fit_snp_eff, robust_est, sib_diff_est
"""
# logger = logger.getChild(__qualname__)
def __init__(self, y: np.ndarray,
varcomp_arr_lst: Sequence[Tuple[np.ndarray, np.ndarray, np.ndarray]],
varcomps: Sequence[float] = None,
covar_X: np.ndarray = None,
add_intercept: bool = False,
add_jitter: bool = False) -> None:
"""Initialize a LinearMixedModel instance.
Args:
y (np.ndarray): 1-d array phenotpye
varcomp_arr_lst (Sequence[np.ndarray, ...]): a Sequence of arbitrary length, holding variance component matrices, excluding the residual variance matrix.
varcomps (Sequence[float, ...]): a sequence of variance component parameters.
covar_X (np.ndarray, optional): 2-d array holding fixed covariates. Defaults to None.
add_intercept: (bool): a boolean indicating whether to add intercept or not.
add_jitter: (bool): a boolean indicating whether to add jitter to the diagonal of the covariance matrix or not.
Raises:
ValueError: y should be a 1-d array.
ValueError: covar_X should be a 2-d array.
ValueError: Dimensions of y and covar_X do not match.
"""
if y.ndim != 1:
raise ValueError('y should be a 1-d array.')
self.y = y
self.n = len(y)
self.has_covar = False
if covar_X is not None:
# logger.info('has covar')
if covar_X.ndim == 1:
covar_X = covar_X.reshape((self.n,1))
if covar_X.ndim > 2:
raise ValueError('covar_X should be a 1-d or 2-d array.')
if covar_X.shape[0] != y.shape[0]:
raise ValueError(
f'Dimensions of y and covar_X do not match ({y.shape} and {covar_X.shape}).')
self.Z = np.hstack((np.ones((self.n, 1),dtype=covar_X.dtype), covar_X)) if add_intercept else covar_X
self.has_covar = True
else:
pass
# self.logger.info('no covar')
# self.logger.info(f'#individuals: {self.n}.')
self._varcomp_mats: Tuple[csc_matrix, ...] = \
tuple(self._build_sym_mat(data, row_ind, col_ind) for data, row_ind, col_ind in varcomp_arr_lst) \
+ (
csc_matrix((np.ones(self.n), (np.arange(0, self.n), np.arange(0, self.n))), shape=(self.n, self.n)),
)
self.n_varcomps = len(varcomp_arr_lst) + 1
self.y_var = self.y.var()
# self.logger.info(f'Phenotypic variance: {self.y_var}.')
self.jitter = self.y_var / 1000.0 if add_jitter else 0.0
if varcomps is not None:
if len(varcomps) != self.n_varcomps:
raise ValueError('varcomps and varcomps_arr_lst must have the same length.')
# _varcomps should be tuple for cached_dependent_property to work
self._varcomps = tuple(varcomps)
self.optimized = True
else:
self._varcomps = tuple(
# self.y.var() if i == self.n_varcomps - 1 else 0. for i in range(self.n_varcomps))
self.y_var / self.n_varcomps for i in range(self.n_varcomps))
# self.logger.info(f'init varcomps: {self._varcomps}.')
self.optimized = False
@staticmethod
def _empty_solver_result(sps_mat: csc_matrix, dense_mat: np.ndarray) -> np.ndarray:
if dense_mat.ndim != 3:
raise ValueError('dense_mat must be a 3-d array.')
n1, n2 = sps_mat.shape
m, n, c = dense_mat.shape
if n != n2:
raise ValueError(f'Input dims do not match: {n2} and {n}.')
out = np.empty((m, n1, c))
return out
[docs]
@staticmethod
def sp_solve_dense3d(sps_mat: csc_matrix,
dense_mat: np.ndarray) -> np.ndarray:
"""Compute product of the inverse of given sparse matrix and given 3-d dense array; used for repeated OLS.
Args:
sps_mat (csc_matrix): 2-d sparse matrix.
dense_mat (np.ndarray): 3-d dense array.
Raises:
ValueError: dense_mat should be 3-d.
ValueError: 2nd dimensions of both inputs should match.
Returns:
np.ndarray: 3-d dense array.
"""
out = __class__._empty_solver_result(sps_mat, dense_mat)
for i in range(dense_mat.shape[0]):
out[i, :, :] = spsolve(sps_mat, dense_mat[i, :, :])
return out
[docs]
@staticmethod
def sp_solve_dense3d_lu(sps_mat: csc_matrix,
dense_mat: np.ndarray) -> np.ndarray:
"""Compute product of the inverse of given sparse matrix and given 3-d dense array uisng LU; used for repeated OLS.
Args:
sps_mat (csc_matrix): 2-d sparse matrix.
dense_mat (np.ndarray): 3-d dense array.
Raises:
ValueError: dense_mat should be 3-d.
ValueError: 2nd dimensions of both inputs should match.
Returns:
np.ndarray: 3-d dense array.
"""
out = __class__._empty_solver_result(sps_mat, dense_mat)
lu = splu(sps_mat)
for i in range(dense_mat.shape[0]):
for j in range(dense_mat.shape[-1]):
out[i, :, j] = lu.solve(dense_mat[i, :, j])
return out
[docs]
@staticmethod
def sp_mul_dense3d(sps_mat: csc_matrix,
dense_mat: np.ndarray) -> np.ndarray:
"""Compute product of given sparse matrix and given 3-d dense array; used for repeated OLS.
Args:
sps_mat (csc_matrix): 2-d sparse matrix.
dense_mat (np.ndarray): 3-d dense array.
Raises:
ValueError: dense_mat should be 3-d.
ValueError: 2nd dimensions of both inputs should match.
Returns:
np.ndarray: 3-d dense array.
"""
out = __class__._empty_solver_result(sps_mat, dense_mat)
for i in range(dense_mat.shape[0]):
out[i, :, :] = sps_mat.dot(dense_mat[i, :, :])
return out
def _build_sym_mat(self, data: np.ndarray, row_ind: np.ndarray, col_ind: np.ndarray) -> csc_matrix:
"""Build sparse matrix using given lower triangular entries.
Args:
data (np.ndarray): data array.
row_ind (np.ndarray): row indices.
col_ind (np.ndarray): column indices.
Returns:
csc_matrix: symmetric sparse matrix in csc format.
"""
tril_mat = csc_matrix((data, (row_ind, col_ind)), shape=(self.n, self.n))
triu_mat = tril(tril_mat, k=-1, format='csc')
return tril_mat + triu_mat.T
@property
def varcomps(self) -> Tuple[float, ...]:
"""Return optimized variance components.
Raises:
ValueError: self.optimized should be True
Returns:
Tuple[float, ...]: a tuple of variance components
"""
if not self.optimized:
raise ValueError('Variance components are not optimized.')
return self._varcomps
@cached_property_depends_on('_varcomps')
def V(self) -> csc_matrix:
"""Compute V.
Returns:
csc_matrix: V in sparse csc format
"""
# self.logger.debug('Calculating V...')
varcomps = np.array(self._varcomps)
varcomps[-1] += self.jitter
V: csc_matrix = sum(
varcomps[i] * self._varcomp_mats[i] for i in range(self.n_varcomps)
)
return V
@cached_property_depends_on('_varcomps')
def V_lu(self) -> SuperLU:
"""Compute sparse LU factorization of V.
Returns:
SuperLU: wrapper object holding LU factorization of V
"""
# self.logger.debug('Calculating V_lu')
lu = splu(self.V)
return lu
@cached_property_depends_on('_varcomps')
def V_logdet(self) -> float:
"""Compute log determinant of V using LU.
Returns:
float: log determinant of V
"""
diag_l = self.V_lu.L.diagonal()
diag_u = self.V_lu.U.diagonal()
V_logdet = np.log(diag_l).sum() + np.log(diag_u).sum()
return V_logdet
@cached_property_depends_on('_varcomps')
def Vinv_y(self) -> np.ndarray:
"""Compute matrix-vector product of inverse of V and y
Returns:
np.ndarray: Vinv_e
"""
# self.logger.debug('Calculating Vinv_y')
return self.V_lu.solve(self.y)
[docs]
def Vinv_mat(self, dense_mat: np.ndarray) -> np.ndarray:
"""Calculate matrix-matrix product of inverse of V and a dense matrix
Args:
dense_mat (np.ndarray): 2-d array in the dense format
Raises:
ValueError: dense_mat must be a 2-d array
ValueError: dimensions of V and dense_mat must match
Returns:
np.ndarray: matrix product in the dense format
"""
if dense_mat.ndim != 2:
raise ValueError('dense_mat must be a 2-d array.')
n, c = dense_mat.shape
if n != self.n:
raise ValueError(f'Input dims do not match: {self.n} and {n}.')
out = np.empty((self.n, c))
for i in range(c):
out[:, i] = self.V_lu.solve(dense_mat[:, i])
return out
@cached_property_depends_on('_varcomps')
def Vinv_Z(self):
return self.Vinv_mat(self.Z)
@cached_property_depends_on('_varcomps')
def Vinv_e(self) -> np.ndarray:
"""Compute matrix-vector product of inverse of V and one-vector
Returns:
np.ndarray: Vinv_e
"""
# self.logger.debug('Calculating V_inv_e')
e = np.ones(self.n)
return self.V_lu.solve(e)
@cached_property_depends_on('_varcomps')
def Vinv_varcomp_mats(self) -> Tuple[csc_matrix, ...]:
"""Compute matrix multiplications of inverse of V and all variance component matrices.
Returns:
Tuple[csc_matrix, ...]: a tuple holding all Vinv_varcomp_mat
"""
# self.logger.debug('Calculating V_inv_varcomp_mats...')
return tuple(self.Vinv_mat(self._varcomp_mats[i]) for i in range(self.n_varcomps))
@cached_property_depends_on('_varcomps')
def P_attrs(self) -> GradHessComponents:
"""Compute ingredients for gradient and hessian (Vinv_y, Vinv_varcomp_mats).
Returns:
GradHessComponents: a NameTuple holding the computation results
"""
# self.logger.debug('Calculating P_mats...')
if self.has_covar:
P_comp = self.Vinv_Z @ solve(self.Z.T @ self.Vinv_Z, self.Vinv_Z.T)
else:
P_comp = np.outer(
self.Vinv_e, self.Vinv_e) / (np.ones(self.n) @ self.Vinv_e)
P_y = self.Vinv_y - P_comp @ self.y
P_varcomp_mats = tuple(
(self.Vinv_varcomp_mats[i] - P_comp @ self._varcomp_mats[i]).A for i in range(self.n_varcomps))
return GradHessComponents(P_y=P_y, P_varcomp_mats=P_varcomp_mats)
@cached_property_depends_on('_varcomps')
def grad(self) -> np.ndarray:
"""Compute gradient.
Returns:
np.ndarray: 1-d array of gradient
"""
# self.logger.debug('Calculating grad...')
P_y = self.P_attrs.P_y
grad = -0.5 * np.array(
[
P_mat.diagonal().sum() - self.y @ P_mat @ P_y for P_mat in self.P_attrs.P_varcomp_mats
]
)
return grad
@cached_property_depends_on('_varcomps')
def hessian(self) -> np.ndarray:
"""Compute hessian.
Returns:
np.ndarray: 2-d n_varcomps-by-n_varcomps array
"""
# self.logger.debug('Calculating hessian...')
hessian = np.empty((self.n_varcomps, self.n_varcomps))
P_y = self.P_attrs.P_y
for i in range(self.n_varcomps):
for j in range(self.n_varcomps):
hessian[i, j] = self.y @ self.P_attrs.P_varcomp_mats[i] @ self.P_attrs.P_varcomp_mats[j] @ P_y
return 0.5 * hessian
@cached_property_depends_on('_varcomps')
def reml_loglik(self) -> float:
"""Compute REML log likelihood
Returns:
float: REML log likelihood
"""
if self.has_covar:
ZT_Vinv_Z = self.Z.T @ self.Vinv_Z
xx_ = solve(ZT_Vinv_Z, self.Vinv_Z.T.dot(self.y)) # @ self.y
# P_comp: np.ndarray = self.Vinv_Z @ xx_
# P_y: np.ndarray = self.Vinv_y - P_comp @ self.y
P_y = self.Vinv_y - self.Vinv_Z @ xx_
_, logdet_ZT_Vinv_Z = slogdet(ZT_Vinv_Z)
return -0.5 * (self.V_logdet + logdet_ZT_Vinv_Z + self.y @ P_y)
else:
e = np.ones(self.n)
yT_Vinv_y = self.y @ self.Vinv_y
eT_Vinv_e = e @ self.Vinv_e
eT_Vinv_y = e @ self.Vinv_y
logdet_eT_Vinv_e = np.log(np.ones(self.n) @ self.Vinv_e)
return -0.5 * (self.V_logdet + logdet_eT_Vinv_e + yT_Vinv_y - eT_Vinv_y ** 2 / eT_Vinv_e) / self.n
# TODO: add support for covariates; current version doesn't allow controlling for covariates
# TODO: check sparsity of V and apply dense version if not sparse enough
[docs]
def dense_reml_loglik(self, method: Literal['chol', 'cg'] = 'cg') -> float:
"""Dense version of reml_loglik
Args:
method (Literal['chol', 'cg'], optional): a string specifying method for computing V inverse. Defaults to 'cg'.
Raises:
RuntimeError: Conjugate gradient did not converge
RuntimeError: Illigal input for conjugate gradient
Returns:
float: REML log likelihood
"""
varcomps = np.array(self._varcomps)
varcomps[-1] += self.jitter
V: csc_matrix = sum(
varcomps[i] * self._varcomp_mats[i] for i in range(self.n_varcomps)
)
V_ = V.toarray()
if method == 'chol':
del V
_, logdet_V = slogdet(V_)
e = np.ones(self.n)
if method == 'chol':
c, low = cho_factor(V_)
Vinv_y = cho_solve((c, low), self.y)
Vinv_e = cho_solve((c, low), e)
elif method == 'cg':
Vinv_y, info1 = cg(V, self.y, atol='legacy')
if info1 == 0:
pass
elif info1 > 0:
self.logger.warning('Conjugate gradient did not converge.')
elif info1 < 1:
raise RuntimeError(
'Illegal input or breakdown for conjugate gradient.')
Vinv_e, info2 = cg(V, e, atol='legacy')
if info2 == 0:
pass
elif info2 > 0:
self.logger.warning('Conjugate gradient did not converge.')
elif info2 < 1:
raise RuntimeError(
'Illegal input or breakdown for conjugate gradient.')
y_T_V_inv_y = self.y @ Vinv_y
e_T_V_inv_e = e @ Vinv_e
e_T_V_inv_y = e @ Vinv_y
logdet_e_T_V_inv_e = np.log(e_T_V_inv_e)
return -0.5 * (logdet_V + logdet_e_T_V_inv_e + y_T_V_inv_y - e_T_V_inv_y ** 2 / e_T_V_inv_e)
[docs]
def grid_search_reml(self) -> None:
"""Perform grid search on variance component parameters.
"""
if self.optimized:
self.logger.warning('Variance components are already optimized.')
print('WARNING: variance components are already optimized.')
# reserve jitter for sibma_sib
upper = np.var(self.y) - self.jitter
step = upper / 100.
max_loglik = float('-inf')
max_sigma = (0.,)
# logging.info('Starting grid search...')
i = 1
possible_values = np.arange(0.0, upper + step, step)
for sigma in product(possible_values, repeat=self.n_varcomps - 1):
if sum(sigma) > upper:
continue
self._varcomps = sigma + (upper - sum(sigma),)
# logging.info(f'{i}: {self.reml_loglik}')
i += 1
if self.reml_loglik > max_loglik:
max_loglik = self.reml_loglik
max_sigma = self._varcomps
# logging.info('Finished grid search.')
self._varcomps = max_sigma
self.optimized = True
[docs]
def ai_reml(self) -> None:
"""Perform AI-REML algorithm to obtain maximum likelihood estimates of variance components.
"""
if self.optimized:
# self.logger.warning('Variance components are already optimized.')
print('WARNING: variance components are already optimized.')
ll_ = float('inf')
max_ll = float('-inf')
iter = 1
# self.logger.info('Starting AI-REML...')
while True:
# self.logger.info(f'Iter: {iter}\tloglik: {self.reml_loglik}')
if abs(self.reml_loglik - ll_) < 1e-4:
break
if iter > 50:
self._varcomps = max_sigma
ll_ = self.reml_loglik
sigma = np.array(
self._varcomps) + solve(self.hessian, self.grad)
sigma[sigma <= 0.] = self.y.var() * 1e-5
self._varcomps = tuple(sigma)
iter += 1
if ll_ > max_ll:
max_ll = ll_
max_sigma = self._varcomps
self.optimized = True
# logger.info('Finished AI-REML.')
[docs]
def scipy_optimize(self) -> None:
"""Perform LBFGS-B to optimize variance components.
"""
if self.optimized:
# self.logger.warning('Variance components are already optimized.')
print('WARNING: variance components are already optimized.')
# self.logger.info('Starting LBFGS-B...')
yvar = self.y.var()
def nll(x):
self._varcomps = tuple(x)
return -1 * self.reml_loglik
# def c(x):
# self.logger.info(inspect.currentframe().f_back.f_locals)
def callbackF(Xi):
print(Xi, nll(Xi))
res: OptimizeResult = minimize(nll, x0=np.array(self._varcomps), # options={'gtol': 1e-5, 'eps': 1e-5},
method='L-BFGS-B', bounds=[(0.00001 * yvar, yvar) for i in range(self.n_varcomps)])
# callback=callbackF)
__varcomp_mats = self._varcomp_mats
if not res.success:
if self.n_varcomps == 2:
raise ValueError(f'Scipy minimize failed: {res.message}')
else:
# self.logger.warning(f'Scipy minimize failed: {res.message}. Trying a reduced model...')
print(f'WARNING: Scipy minimize failed: {res.message}. Trying a reduced model...')
self.n_varcomps -= 1
if (0.00001 * self.y_var >= res.x[0] or res.x[0] >= self.y_var) and \
(0.00001 * self.y_var >= res.x[1] or res.x[1] >= self.y_var):
raise ValueError(f'Scipy minimize failed: {res.message}).')
elif 0.00001 * self.y_var >= res.x[0] or res.x[0] >= self.y_var:
# self.logger.warning(f'Dropping variance component 0: {res.x[0]}, and re-estimate...')
print(f'WARNING: dropping variance component 0: {res.x[0]}, and re-estimate...')
self._varcomp_mats = __varcomp_mats[1:]
self._varcomps = tuple(
self.y_var / self.n_varcomps for _ in range(self.n_varcomps))
res: OptimizeResult = minimize(nll, x0=np.array(self._varcomps), # options={'gtol': 1e-5, 'eps': 1e-5},
method='L-BFGS-B', bounds=[(0.00001 * yvar, yvar) for i in range(self.n_varcomps)],
callback=callbackF)
if not res.success:
raise ValueError(f'Scipy minimize failed: {res.message}')
elif 0.00001 * self.y_var >= res.x[1] or res.x[1] >= self.y_var:
# self.logger.warning(f'Dropping variance component 1: {res.x[1]}, and re-estimate...')
print(f'WARNING: dropping variance component 1: {res.x[1]}, and re-estimate...')
self._varcomp_mats = __varcomp_mats[::2]
self._varcomps = tuple(
self.y_var / self.n_varcomps for _ in range(self.n_varcomps))
res: OptimizeResult = minimize(nll, x0=np.array(self._varcomps), # options={'gtol': 1e-5, 'eps': 1e-5},
method='L-BFGS-B', bounds=[(0.00001 * yvar, yvar) for i in range(self.n_varcomps)],
callback=callbackF)
if not res.success:
raise ValueError(f'Scipy minimize failed: {res.message}')
else: raise RuntimeError()
self.optimized = True
self._varcomps = tuple(res.x)
# self.logger.info(f'Finished LBFGS-B. # of Likelihood evaluations: {res.nfev}')
[docs]
def fit_snps_eff(self, gts: np.ndarray, standard_gwas: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Perform repeated OLS to estimate SNP effects and sampling variance-covariance.
Args:
gts (np.ndarray): 3-d array of genetic data.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: 3 arrays of SNP effects, covarinaces and standard errors.
"""
if standard_gwas:
gts = gts[:, 0:1, :]
# np.testing.assert_array_almost_equal(gts[:, 1, :], gts[:, 2, :])
n, k, l = gts.shape
if n != self.n:
raise ValueError(f'Size of genotype matrix does not match pheno: {n},{self.n}.')
if self.has_covar:
gts_ = gts.reshape((gts.shape[0], int(k * l)))
M_X = gts_ - self.Z.dot(solve(self.Z.T @ self.Z, self.Z.T.dot(gts_)))
X_ = M_X.reshape((gts_.shape[0], k, l)).transpose(2, 0, 1)
y = self.y - self.Z @ solve(self.Z.T @ self.Z, self.Z.T.dot(self.y))
Vinv_X = self.sp_solve_dense3d_lu(self.V, X_)
XT_Vinv_X = np.einsum('...ij,...ik', X_, Vinv_X)
XT_Vinv_y = np.einsum('...ij,i', Vinv_X, y)
alpha = solve(XT_Vinv_X, XT_Vinv_y)
alpha_cov = np.linalg.inv(XT_Vinv_X)
else:
gts = gts.transpose(2, 0, 1)
Vinv_X = self.sp_solve_dense3d_lu(self.V, gts)
XT_Vinv_X = np.einsum('...ij,...ik', gts, Vinv_X)
XT_Vinv_y = np.einsum('...ij,i', gts, self.Vinv_y)
alpha = solve(XT_Vinv_X, XT_Vinv_y)
alpha_cov = np.linalg.inv(XT_Vinv_X)
alpha_ses: np.ndarray = np.sqrt(
np.diagonal(alpha_cov, axis1=1, axis2=2))
return alpha, alpha_cov, alpha_ses
[docs]
def robust_est(self, G: gtarray, G_sib: gtarray):
num_obs_par_al = G.num_obs_par_al
par_status = G.par_status
ped = G.ped
X = G.gts.data
X_sib = G_sib.gts.data
n, k, l = G.shape
assert n == self.n
assert l == G_sib.shape[2]
alpha = np.full((l,1), fill_value=np.nan)
alpha_ses = np.full((l,1), fill_value=np.nan)
alpha_cov = np.full((l,1,1), fill_value=np.nan)
ct = 0
for s in range(l):
if np.isnan(num_obs_par_al[:, s]).all():
ct += 1
continue
notnan = np.isfinite(num_obs_par_al[:, s])
both = (num_obs_par_al[:, s] == 4) * notnan
pat = (num_obs_par_al[:, s] == 3) * (par_status[:, 0] == 0) * notnan
mat = (num_obs_par_al[:, s] == 3) * (par_status[:, 1] == 0) * notnan
one = (num_obs_par_al[:, s] == 3) * ((par_status[:, 0] == 1) & (par_status[:, 1] == 1)) * notnan
if both.sum() == 0 or one.sum() == 0 or pat.sum() == 0 or mat.sum() == 0:
ct += 1
# TODO: handle the case where one or more groups are missing
continue
X_both = X[both, :, s]
X_pat = X[pat, :, s]
X_pat[:, 2] = X_pat[:, 0] - X_pat[:, 2] // 1 # see np.divmod or np.modf
X_mat = X[mat, :, s]
X_mat[:, 1] = X_mat[:, 0] - X_mat[:, 1] // 1
# TODO: check G_sib.ids in G.ids
idx_one = [G_sib.id_dict[i] for i in G.ids[one]]
X_one = X_sib[idx_one, :, s]
y_both = self.y[both]
y_pat = self.y[pat]
y_mat = self.y[mat]
y_one = self.y[one]
V_both = self.V[both, :][:, both]
V_pat = self.V[pat, :][:, pat]
V_mat = self.V[mat, :][:, mat]
V_one = self.V[one, :][:, one]
V_both_one = self.V[both, :][:, one]
V_both_pat = self.V[both, :][:, pat]
V_both_mat = self.V[both, :][:, mat]
V_one_pat = self.V[one, :][:, pat]
V_one_mat = self.V[one, :][:, mat]
V_pat_mat = self.V[pat, :][:, mat]
if self.has_covar:
X_both -= self.Z[both] @ solve(self.Z[both].T @ self.Z[both], self.Z[both].T.dot(X_both))
y_both -= self.Z[both] @ solve(self.Z[both].T @ self.Z[both], self.Z[both].T.dot(y_both))
X_pat -= self.Z[pat] @ solve(self.Z[pat].T @ self.Z[pat], self.Z[pat].T.dot(X_pat))
y_pat -= self.Z[pat] @ solve(self.Z[pat].T @ self.Z[pat], self.Z[pat].T.dot(y_pat))
X_mat -= self.Z[mat] @ solve(self.Z[mat].T @ self.Z[mat], self.Z[mat].T.dot(X_mat))
y_mat -= self.Z[mat] @ solve(self.Z[mat].T @ self.Z[mat], self.Z[mat].T.dot(y_mat))
X_one -= self.Z[one] @ solve(self.Z[one].T @ self.Z[one], self.Z[one].T.dot(X_one))
y_one -= self.Z[one] @ solve(self.Z[one].T @ self.Z[one], self.Z[one].T.dot(y_one))
X_both -= np.mean(X_both, axis=0)
X_pat -= np.mean(X_pat, axis=0)
X_mat -= np.mean(X_mat, axis=0)
X_one -= np.mean(X_one, axis=0)
y_both -= y_both.mean()
y_pat -= y_pat.mean()
y_mat -= y_mat.mean()
y_one -= y_one.mean()
try:
Vinv_X_both = spsolve(V_both, X_both)
XT_Vinv_X_both = X_both.T @ Vinv_X_both
XT_Vinv_y_both = Vinv_X_both.T @ y_both
alpha_both = solve(XT_Vinv_X_both, XT_Vinv_y_both)[0]
alpha_cov_both = np.linalg.inv(XT_Vinv_X_both)[0,0]
Vinv_X_one = spsolve(V_one, X_one)
XT_Vinv_X_one = X_one.T @ Vinv_X_one
XT_Vinv_y_one = Vinv_X_one.T @ y_one
alpha_one = solve(XT_Vinv_X_one, XT_Vinv_y_one)[0]
alpha_cov_one = np.linalg.inv(XT_Vinv_X_one)[0,0]
Vinv_X_mat = spsolve(V_mat, X_mat)
XT_Vinv_X_mat = X_mat.T @ Vinv_X_mat
XT_Vinv_y_mat = Vinv_X_mat.T @ y_mat
alpha_mat = solve(XT_Vinv_X_mat, XT_Vinv_y_mat)[0]
alpha_cov_mat = np.linalg.inv(XT_Vinv_X_mat)[0,0]
Vinv_X_pat = spsolve(V_pat, X_pat)
XT_Vinv_X_pat = X_pat.T @ Vinv_X_pat
XT_Vinv_y_pat = Vinv_X_pat.T @ y_pat
alpha_pat = solve(XT_Vinv_X_pat, XT_Vinv_y_pat)[0]
alpha_cov_pat = np.linalg.inv(XT_Vinv_X_pat)[0,0]
except np.linalg.LinAlgError:
continue
cov_both_one = alpha_cov_both * Vinv_X_both[:, 0].T @ V_both_one @ Vinv_X_one[:, 0] * alpha_cov_one
cov_both_pat = alpha_cov_both * Vinv_X_both[:, 0].T @ V_both_pat @ Vinv_X_pat[:, 0] * alpha_cov_pat
cov_both_mat = alpha_cov_both * Vinv_X_both[:, 0].T @ V_both_mat @ Vinv_X_mat[:, 0] * alpha_cov_mat
cov_one_mat = alpha_cov_one * Vinv_X_one[:, 0].T @ V_one_mat @ Vinv_X_mat[:, 0] * alpha_cov_mat
cov_one_pat = alpha_cov_one * Vinv_X_one[:, 0].T @ V_one_pat @ Vinv_X_pat[:, 0] * alpha_cov_pat
cov_pat_mat = alpha_cov_pat * Vinv_X_pat[:, 0].T @ V_pat_mat @ Vinv_X_mat[:, 0] * alpha_cov_mat
S = np.block(
[[alpha_cov_both, cov_both_one, cov_both_pat, cov_both_mat],
[cov_both_one.T, alpha_cov_one, cov_one_pat, cov_one_mat],
[cov_both_pat.T, cov_one_pat.T, alpha_cov_pat, cov_pat_mat],
[cov_both_mat.T, cov_one_mat.T, cov_pat_mat.T, alpha_cov_mat]]
)
A = np.ones(4)
robust_var = np.power(A @ solve(S, A), -1)
alpha[s,0] = robust_var * (A @ solve(S, np.array([alpha_both, alpha_one, alpha_pat, alpha_mat])))
alpha_cov[s,0,0] = robust_var
alpha_ses[s,0] = robust_var ** 0.5
return alpha, alpha_cov, alpha_ses
[docs]
def sib_diff_est(self, gts: np.ndarray):
"""
Perform the sib-difference estimator
"""
n, k, l = gts.shape
if k != 2:
raise ValueError('Second dimension of the genotype matrix for the sib-difference estimator should be of size 2.')
if n != self.n:
raise ValueError(f'Size of genotype matrix does not match pheno: {n},{self.n}.')
alpha = np.full((l,1), fill_value=np.nan)
alpha_ses = np.full((l,1), fill_value=np.nan)
alpha_cov = np.full((l,1,1), fill_value=np.nan)
if self.has_covar:
gts_ = gts.reshape((gts.shape[0], int(k * l)))
M_X = gts_ - self.Z.dot(solve(self.Z.T @ self.Z, self.Z.T.dot(gts_)))
X_ = M_X.reshape((gts_.shape[0], k, l)).transpose(2, 0, 1)
y = self.y - self.Z @ solve(self.Z.T @ self.Z, self.Z.T.dot(self.y))
Vinv_X = self.sp_solve_dense3d_lu(self.V, X_)
XT_Vinv_X = np.einsum('...ij,...ik', X_, Vinv_X)
XT_Vinv_y = np.einsum('...ij,i', Vinv_X, y)
alpha[:, 0] = solve(XT_Vinv_X, XT_Vinv_y)[:, 0]
alpha_cov[:, 0, 0] = np.linalg.inv(XT_Vinv_X)[:, 0, 0]
else:
gts_ = gts.transpose(2, 0, 1)
Vinv_X = self.sp_solve_dense3d_lu(self.V, gts_)
XT_Vinv_X = np.einsum('...ij,...ik', gts_, Vinv_X)
XT_Vinv_y = np.einsum('...ij,i', gts_, self.Vinv_y)
alpha[:, 0] = solve(XT_Vinv_X, XT_Vinv_y)[:,0]
alpha_cov[:, 0, 0] = np.linalg.inv(XT_Vinv_X)[:,0,0]
alpha_ses[:, 0] = np.sqrt(
alpha_cov)[:, 0, 0]
return alpha, alpha_cov, alpha_ses
[docs]
def trios_sibs_est(self, gts: np.ndarray,
complete_trios_inds: np.ndarray,
sibs_inds: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Perform default regression on trios and sib-pairs.
Args:
gts (np.ndarray): 3-d array of genetic data.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: 3 arrays of SNP effects, covarinaces and standard errors.
"""
n, k, l = gts.shape
assert n == self.n
if self.has_covar:
gts_ = gts.reshape((gts.shape[0], int(k * l)))
M_X = gts_ - self.Z.dot(solve(self.Z.T @ self.Z, self.Z.T.dot(gts_)))
X = M_X.reshape((gts_.shape[0], k, l)).transpose(2, 0, 1)
y = self.y - self.Z @ solve(self.Z.T @ self.Z, self.Z.T.dot(self.y))
else:
X = gts.transpose(2, 0, 1)
# y = self.y - self.y.mean()
y = self.y
alpha = np.full((l,1), fill_value=np.nan)
alpha_ses = np.full((l,1), fill_value=np.nan)
alpha_cov = np.full((l,1,1), fill_value=np.nan)
ct = 0
X_trios = X[:, complete_trios_inds, :]
X_sibs = X[:, sibs_inds, :2]
y_trios = y[complete_trios_inds]
y_sibs = y[sibs_inds]
V_trios = self.V[complete_trios_inds, :][:, complete_trios_inds]
V_sibs = self.V[sibs_inds, :][:, sibs_inds]
V_trios_sibs = self.V[complete_trios_inds, :][:, sibs_inds]
X_trios -= X_trios.mean(axis=1, keepdims=True)
X_sibs -= X_sibs.mean(axis=1, keepdims=True)
y_trios -= y_trios.mean()
y_sibs -= y_sibs.mean()
Vinv_X_trios = self.sp_solve_dense3d_lu(V_trios, X_trios)
XT_Vinv_X_trios = np.einsum('...ij,...ik', X_trios, Vinv_X_trios)
XT_Vinv_y_trios = np.einsum('...ij,i', Vinv_X_trios, y_trios)
alpha_trios = solve(XT_Vinv_X_trios, XT_Vinv_y_trios)[:, 0]
alpha_cov_trios = np.linalg.inv(XT_Vinv_X_trios)[:, 0, 0]
Vinv_X_sibs = self.sp_solve_dense3d_lu(V_sibs, X_sibs)
XT_Vinv_X_sibs = np.einsum('...ij,...ik', X_sibs, Vinv_X_sibs)
XT_Vinv_y_sibs = np.einsum('...ij,i', Vinv_X_sibs, y_sibs)
alpha_sibs = solve(XT_Vinv_X_sibs, XT_Vinv_y_sibs)[:, 0]
alpha_cov_sibs = np.linalg.inv(XT_Vinv_X_sibs)[:, 0, 0]
# numpy cannot do the following einsum
# print(np.einsum('ji,ik->jk', Vinv_X_trios[:, :, 0], V_trios_sibs))
for i in range(l):
cov_trios_sibs = alpha_cov_trios[i] * Vinv_X_trios[i, :, 0].T @ V_trios_sibs @ Vinv_X_sibs[i, :, 0] * alpha_cov_sibs[i]
S = np.block(
[[alpha_cov_trios[i], cov_trios_sibs],
[cov_trios_sibs.T, alpha_cov_sibs[i]]]
)
A = np.ones(2)
robust_var = np.power(A @ solve(S, A), -1)
alpha[i, 0] = robust_var * (A @ solve(S, np.array([alpha_trios[i], alpha_sibs[i]])))
alpha_cov[i, 0, 0] = robust_var
alpha_ses[i, 0] = robust_var ** 0.5
return alpha, alpha_cov, alpha_ses