Source code for snipar.gtarray

import numpy as np
import numpy.ma as ma
import h5py
from numba import njit
from snipar.utilities import make_id_dict, convert_str_array
from snipar.types import SparseGRMRepr, Ids, IdDict
from scipy.sparse import csc_matrix, tril
from scipy.sparse.linalg import spsolve
# import logging

# logger = logging.getLogger(__name__)
[docs] class gtarray(object): """Define a genotype or PGS array that stores individual IDs, family IDs, and SNP information. Args: garray : :class:`~numpy:numpy.array` 2 or 3 dimensional numpy array of genotypes/PGS values. First dimension is individuals. For a 2 dimensional array, the second dimension is SNPs or PGS values. For a 3 dimensional array, the second dimension indexes the individual and his/her relatives' genotypes (for example: proband, paternal, and maternal); and the third dimension is the SNPs. ids : :class:`~numpy:numpy.array` vector of individual IDs of same length as first dimension of garray sid : :class:`~numpy:numpy.array` vector of SNP ids, equal in length, L, to last dimension of array alleles : :class:`~numpy:numpy.array` [L x 2] matrix of ref and alt alleles for the SNPs. L must match size of sid pos : :class:`~numpy:numpy.array` vector of SNP positions; must match size of sid chrom : :class:`~numpy:numpy.array` vector of SNP chromosomes; must match size of sid map : :class:`~numpy:numpy.array` vector of SNP chromosomes; must match size of sid fams : :class:`~numpy:numpy.array` vector of family IDs; must match size of ids par_status : :class:`~numpy:numpy.array' [N x 2] numpy matrix that records whether parents have observed or imputed genotypes/PGS, where N matches size of ids. The first column is for the father of that individual; the second column is for the mother of that individual. If the parent is neither observed nor imputed, the value is -1; if observed, 0; and if imputed, 1. num_obs_par_al : :class:`~numpy:numpy.array' 2 dimensioanal numpy arrray, with each entry holding the number of observed parental alleles for each individual on each locus. Returns: G : :class:`snipar.gtarray` """ def __init__(self, garray, ids, sid=None, alleles=None, pos=None, chrom=None, map=None, error_probs=None, fams=None, par_status=None, num_obs_par_al=None, ped=None, standard_f=None): if type(garray) == np.ndarray or type(garray) == np.ma.core.MaskedArray: if type(garray) == np.ndarray: self.gts = ma.array(garray,mask=np.isnan(garray)) else: self.gts = garray self.shape = garray.shape self.ndim = garray.ndim self.dtype = garray.dtype self.freqs = None else: raise ValueError('Genotypes must be a numpy ndarray') if garray.shape[0] == ids.shape[0]: self.ids = ids self.id_dict = make_id_dict(ids) else: raise ValueError('Shape of genotypes and ids does not match') if sid is not None: if sid.shape[0] == garray.shape[1]: self.snp_index = 1 self.sid = sid self.sid_dict = make_id_dict(sid) elif sid.shape[0] == garray.shape[2]: self.snp_index = 2 self.sid = sid self.sid_dict = make_id_dict(sid) else: raise ValueError('Shape of SNP ids (sid) does not match shape of genotype array') if alleles is not None: if self.sid is not None: if alleles.shape[0] == self.sid.shape[0]: self.alleles = alleles else: raise ValueError('Size of alleles does not match size of genotypes') else: raise(ValueError('Must provide SNP ids')) else: self.alleles = None if pos is not None: if self.sid is not None: if pos.shape[0] == self.sid.shape[0]: self.pos = pos else: raise ValueError('Size of position vector does not match size of genotypes') else: raise(ValueError('Must provide SNP ids')) else: self.pos = None if chrom is not None: if self.sid is not None: if chrom.shape[0] == self.sid.shape[0]: self.chrom = chrom else: raise ValueError('Size of map does not match number of SNPs') else: raise(ValueError('Must provide SNP ids')) else: self.chrom = None if map is not None: if self.sid is not None: if map.shape[0] == self.sid.shape[0]: self.map = map else: raise ValueError('Size of map does not match number of SNPs') else: raise(ValueError('Must provide SNP ids')) else: self.map = None if error_probs is not None: if self.sid is not None: if error_probs.shape[0] == self.sid.shape[0]: self.error_probs = error_probs else: raise ValueError('Size of map does not match number of SNPs') else: raise(ValueError('Must provide SNP ids')) else: self.error_probs = None if fams is not None: if fams.shape[0] == ids.shape[0] and fams.ndim==1: self.fams = fams else: raise ValueError('Fams not of same length as IDs') else: self.fams = None if par_status is not None: if par_status.shape[0] == ids.shape[0] and par_status.shape[1] == 2: self.par_status = par_status else: raise ValueError('Incompatible par status array') else: self.par_status = None if ped is not None: self.ped = ped else: self.ped = None if num_obs_par_al is not None: if self.ndim == 2 and num_obs_par_al.shape[0] == self.shape[0] and num_obs_par_al.shape[1] == self.shape[1]: self.num_obs_par_al = num_obs_par_al elif self.ndim == 3 and num_obs_par_al.shape[0] == self.shape[0] and num_obs_par_al.shape[1] == self.shape[2]: self.num_obs_par_al = num_obs_par_al else: raise ValueError('Incompatible par status array') else: self.num_obs_par_al = None if standard_f is not None: if standard_f.shape[0] == self.sid.shape[0]: self.standard_f = standard_f else: raise ValueError('Size of standard_f does not match number of SNPs') else: self.standard_f = None self.mean_normalised = False if np.sum(self.gts.mask)>0: self.has_NAs = True else: self.has_NAs = False self.info = None
[docs] def compute_freqs(self): """ Computes the frequencies of the SNPs. Stored in self.freqs. """ if self.ndim == 2: self.freqs = ma.mean(self.gts,axis=0)/2.0 elif self.ndim == 3: self.freqs = ma.mean(self.gts[:,0,:], axis=0) / 2.0
[docs] def filter(self, filter_pass): if self.freqs is not None: self.freqs = self.freqs[filter_pass] if self.ndim == 2: self.gts = self.gts[:,filter_pass] elif self.ndim == 3: self.gts = self.gts[:,:,filter_pass] self.shape = self.gts.shape if self.sid is not None: self.sid = self.sid[filter_pass] self.sid_dict = make_id_dict(self.sid) if self.pos is not None: self.pos = self.pos[filter_pass] if self.alleles is not None: self.alleles = self.alleles[filter_pass] if self.chrom is not None: self.chrom = self.chrom[filter_pass] if self.map is not None: self.map = self.map[filter_pass] if self.error_probs is not None: self.error_probs = self.error_probs[filter_pass] if self.standard_f is not None: self.standard_f = self.standard_f[filter_pass]
[docs] def filter_maf(self, min_maf = 0.01, verbose=False): """ Filter SNPs based on having minor allele frequency (MAF) greater than min_maf, and have % missing observations less than max_missing. """ if self.freqs is None: self.compute_freqs() freqs_pass = np.logical_and(self.freqs > min_maf, self.freqs < (1 - min_maf)) if verbose: print(str(self.freqs.shape[0] - np.sum(freqs_pass)) + ' SNPs with MAF<' + str(min_maf)) self.filter(freqs_pass) return freqs_pass
[docs] def filter_missingness(self, max_missing = 5, verbose=False): if self.ndim == 2: missingness = ma.mean(self.gts.mask,axis=0) elif self.ndim == 3: missingness = ma.mean(self.gts.mask,axis = (0,1)) missingness_pass = 100 * missingness < max_missing if verbose: print(str(self.freqs.shape[0] - np.sum(missingness_pass)) + ' SNPs with missingness >' + str(max_missing) + '%') self.filter(missingness_pass) return missingness_pass
[docs] def compute_info(self): if self.freqs is None: self.compute_freqs() if self.ndim == 2: self.variances = np.var(self.gts, axis=0) elif self.ndim==3: self.variances = np.var(self.gts[:,0,:], axis=0) self.info = self.variances/(2.0*self.freqs*(1-self.freqs))
[docs] def filter_info(self, min_info = 0.99, verbose=False): if self.info is None: self.compute_info() info_pass = self.info > min_info if verbose: print(str(self.info.shape[0] - np.sum(info_pass)) + ' SNPs with INFO <' + str(min_info)) self.filter(info_pass)
[docs] def filter_ids(self,keep_ids, verbose=False): """ Keep only individuals with ids given by keep_ids """ in_ids = np.array([x in self.id_dict for x in keep_ids]) n_filtered = np.sum(in_ids) if n_filtered==0: raise(ValueError('No individuals would be left after filtering')) else: if verbose: print('After filtering, '+str(n_filtered)+' individuals remain') indices = np.array([self.id_dict[x] for x in keep_ids[in_ids]]) if self.ndim == 2: self.gts = self.gts[indices, :] elif self.ndim == 3: self.gts = self.gts[indices, :, :] self.ids = self.ids[indices] self.id_dict = make_id_dict(self.ids) self.shape = self.gts.shape if self.fams is not None: self.fams = self.fams[indices] if self.par_status is not None: self.par_status = self.par_status[indices, :]
[docs] def mean_normalise(self): """ This normalises the SNPs/PGS columns to have mean-zero. """ if not self.mean_normalised: if self.gts.ndim == 2: self.gts = self.gts - ma.mean(self.gts,axis=0) elif self.gts.ndim==3: for i in range(0, self.gts.shape[1]): self.gts[:, i, :] = self.gts[:, i, :] - ma.mean(self.gts[:, i, :], axis=0) self.mean_normalised = True
[docs] def scale(self): """ This normalises the SNPs/PGS columns to have variance 1. """ if self.gts.ndim == 2: self.gts = self.gts/ma.std(self.gts, axis=0) elif self.gts.ndim == 3: for i in range(0, self.gts.shape[1]): self.gts[:, i, :] = self.gts[:, i, :]/ma.std(self.gts[:, i, :], axis=0)
[docs] def fill_NAs(self): """ This normalises the SNP columns to have mean-zero, then fills in NA values with zero. """ if not self.mean_normalised: self.mean_normalise() NAs = np.sum(self.gts.mask, axis=0) self.gts[self.gts.mask] = 0 self.gts.mask = False self.has_NAs = False return NAs
[docs] def add(self,garray): """ Adds another gtarray of the same dimension to this array and returns the sum. It matches IDs before summing. """ if type(garray)==gtarray: pass else: raise ValueError('Must add to another gtarray') if not self.gts.ndim == garray.gts.ndim: raise ValueError('Arrays must have same number of dimensions') if self.gts.ndim == 2: if not self.gts.shape[1] == garray.gts.shape[1]: raise ValueError('Arrays must have same dimensions (apart from first)') if self.gts.ndim == 3: if not self.gts.shape[1:3] == garray.gts.shape[1:3]: raise ValueError('Arrays must have same dimensions (apart from first)') # Match IDs common_ids = list(self.id_dict.keys() & garray.id_dict.keys()) if len(common_ids) == 0: raise ValueError('No IDs in common') self_index = np.array([self.id_dict[x] for x in common_ids]) other_index = np.array([garray.id_dict[x] for x in common_ids]) # Out if self.ids.ndim == 1: ids_out = self.ids[self_index] else: ids_out = self.ids[self_index, :] if self.gts.ndim ==2: add_gts = self.gts[self_index, :]+garray.gts[other_index, :] else: add_gts = self.gts[self_index, :, :] + garray.gts[other_index, :, :] if self.ped is None: ped = garray.ped else: ped = self.ped return gtarray(add_gts, ids_out, self.sid, alleles=self.alleles, fams=self.fams[self_index], par_status=self.par_status[self_index,:], ped=ped)
[docs] def diagonalise(self,inv_root): """ This will transform the genotype array based on the inverse square root of the phenotypic covariance matrix from the family based linear mixed model. """ if self.fams is None: raise(ValueError('Family labels needed for diagonalization')) if not self.mean_normalised: self.mean_normalise() unique_fams, famsizes = np.unique(self.fams, return_counts = True) fam_indices = dict() # Transform for fam in unique_fams: fam_indices[fam] = np.where(self.fams == fam)[0] famsize = fam_indices[fam].shape[0] if self.ndim == 2: if famsize == 1: self.gts[fam_indices[fam], :] = inv_root[1]*self.gts[fam_indices[fam],:] else: self.gts[fam_indices[fam],:] = inv_root[famsize].dot(self.gts[fam_indices[fam],:]) elif self.ndim == 3: if famsize == 1: self.gts[fam_indices[fam], : , :] = inv_root[1]*self.gts[fam_indices[fam], : , :] else: for j in range(self.shape[1]): self.gts[fam_indices[fam],j, :] = inv_root[famsize].dot(self.gts[fam_indices[fam],j, :]) self.fam_indices = fam_indices
@njit def _is_whole(n): return n % 1 == 0 @njit def _lin_imp(freq, g, gp): res = (4 * freq + 2 * g - gp) / 3. return min(2, max(res, 0)) @njit def _po_imp(freq, g, gp): if g == 0: return freq elif g == 1: if gp == 0: return 1 + freq elif gp == 1: return 2 + freq else: # gp == 2 return freq else: # g == 2 return 1 + freq @njit def _impute_unrel_pat_gts(gts: np.ndarray, freqs: np.ndarray, parsum: bool) -> np.ndarray: missing_ind = gts[:, 1, 0] == -1 imp = 0.5 * gts[missing_ind, 0, :] + freqs gts[missing_ind, 1, :] = imp if parsum: gts[missing_ind, 1, :] += imp else: gts[missing_ind, 2, :] = imp return gts def _impute_unrel_pat_gts_cond_gau(G: gtarray, ped: np.ndarray, grm: SparseGRMRepr, unrelated_inds: Ids, parsum: bool) -> np.ndarray: freqs = G.freqs ids = G.ids id_dict = G.id_dict unrelated_inds_dict = make_id_dict(unrelated_inds) R12_data, R12_row, R12_col = [], [], [] for i in range(len(grm[0])): row = grm[1][i] col = grm[2][i] row_in = row in unrelated_inds col_in = col in unrelated_inds if not row_in and not col_in: continue elif row_in and col_in: if row == col: continue R12_row.append(unrelated_inds_dict[row]) R12_col.append(col) R12_data.append(grm[0][i]) R12_row.append(unrelated_inds_dict[col]) R12_col.append(row) R12_data.append(grm[0][i]) else: if row_in and not col_in: ind1, ind2 = row, col else: ind2, ind1 = row, col R12_row.append(unrelated_inds_dict[ind1]) R12_col.append(ind2) R12_data.append(grm[0][i]) R12_col += [idx for idx in unrelated_inds] R12_row += [unrelated_inds_dict[idx] for idx in unrelated_inds] R12_data += [0.5 for k in range(len(unrelated_inds))] for i in unrelated_inds: x = ped[:,2] == ids[i] y = ped[:,3] == ids[i] # assert x.sum() * y.sum() == 0 if x.sum() == 0 and y.sum() == 0: continue inds = np.where(x)[0] if x.sum()>0 else np.where(y)[0] for j in inds: iid = ped[j, 1] if iid not in ids: continue R12_row.append(unrelated_inds_dict[i]) R12_col.append(id_dict[iid]) R12_data.append(0.25) R12_row = np.array(R12_row, dtype='uint32') R12_col = np.array(R12_col, dtype='uint32') R12_data = np.array(R12_data) R12 = csc_matrix((R12_data, (R12_row, R12_col)), shape=(len(unrelated_inds), G.shape[0])) R22 = csc_matrix((grm[0], (grm[1], grm[2])), shape=(G.shape[0], G.shape[0])) R22_triu: csc_matrix = tril(R22, k=-1, format='csc') R22 = R22 + R22_triu.T for fam in unrelated_inds: G.gts[fam, 1, G.gts[fam, 1, :] < 0] = 0 G.gts[fam, 1, G.gts[fam, 1, :] > 2] = 2 if parsum: G.gts[unrelated_inds, 1, :] += G.gts[unrelated_inds, 1, :] else: G.gts[unrelated_inds, 2, :] = G.gts[unrelated_inds, 1, :] return G.gts.data @njit def _impute_missing(gts: np.ndarray, freqs: np.ndarray) -> np.ndarray: for i in range(gts.shape[2]): freq = freqs[i] is_na = np.where(np.sum(np.isnan(gts[:,:,i]), axis=1) > 0)[0] if len(is_na) == 0: continue for j in is_na: mask = np.isnan(gts[j, :, i]) if gts.shape[1] == 2: if np.sum(mask) == 2: gts[j, :, i] = freq * 2 if mask[0] and not mask[1]: gts[j, 0, i] = gts[j, 1, i] / 2 if not mask[0] and mask[1]: gts[j, 1, i] = gts[j, 0, i] + 2 * freq elif gts.shape[1] == 3: if mask[0]: if np.sum(mask[1:]) == 2: gts[j, :, i] = freq * 2 elif np.sum(mask[1:]) == 0: gts[j, 0, i] = np.sum(gts[j, 1:, i]) / 2 elif mask[1]: gts[j, 0, i] = gts[j, 2, i] / 2 + freq gts[j, 1, i] = freq * 2 elif mask[2]: gts[j, 0, i] = gts[j, 1, i] / 2 + freq gts[j, 2, i] = freq * 2 else: if np.sum(mask[1:]) == 2: gts[j, 1:3, i] = gts[j, 0, i] / 2 + freq elif mask[1]: if _is_whole(gts[j, 2, i]): gts[j, 1, i] = _po_imp(freq, gts[j, 0, i], gts[j, 2, i]) else: gts[j, 1, i] = _lin_imp(freq, gts[j, 0, i], gts[j, 2, i]) elif mask[2]: if _is_whole(gts[j, 1, i]): gts[j, 2, i] = _po_imp(freq, gts[j, 0, i], gts[j, 1, i]) else: gts[j, 2, i] = _lin_imp(freq, gts[j, 0, i], gts[j, 1, i]) return gts
[docs] def impute_unrel_par_gts(G: gtarray, sib: bool = False, parsum: bool = False, ped: np.ndarray = None, grm: SparseGRMRepr = None, unrelated_inds: Ids = None, true_par=None, par_ids=None) -> gtarray: """Impute parental genotypes of unrelated individuals. Args: G (gtarray): gtarray object holding genetic data. sib (bool, optional): Whether sib effect is modelled. Defaults to False. parsum (bool, optional): Whether sum of parental genotypes is modelled. Defaults to False. Returns: gtarray: gtarray object with imputed parental genotypes. """ if sib: # logger.warning( # 'No need to impute parental genotypes of unrelated individuals if sib effect is modelled.' # ) # print('WARNING: No need to impute parental genotypes of unrelated individuals if sib effect is modelled.') return G if G.freqs is None: G.compute_freqs() if grm is not None and unrelated_inds is not None: # logger.info( # 'Imputing parental geotypes using conditional gaussian.' # ) # print('Imputing parental geotypes using conditional gaussian') gts = _impute_unrel_pat_gts_cond_gau(G, parsum=parsum, ped=ped, grm=grm, unrelated_inds=unrelated_inds) elif grm is None and unrelated_inds is None: gts = _impute_unrel_pat_gts(G.gts.data, G.freqs, parsum=parsum) else: raise TypeError('One of grm and unrelated_inds is None.') G.gts = ma.array(gts, mask=np.isnan(gts)) # logger.info( # 'Done imputing parental geotypes of unrelated individuals.' # ) # print('Done imputing parental geotypes of unrelated individuals.') return G
[docs] def impute_missing(G: gtarray) -> gtarray: """Imputing missing entries of the genotype design matrix. Args: G (gtarray): genotype design matrix. Returns: gtarray: genotype design matrix with NAs imputed. """ if G.freqs is None: G.compute_freqs() gts = _impute_missing(G.gts.data, G.freqs) G.gts = ma.array(gts, mask=np.isnan(gts)) assert G.gts.mask.sum() == 0 # logger.info( # 'Done imputing missing genotypes.' # ) # print('Done imputing missing genotypes.') return G