import numpy as np
import numpy.ma as ma
from snipar.utilities import make_id_dict
[docs]class gtarray(object):
"""Define a genotype or PGS array that stores individual IDs, family IDs, and SNP information.
Args:
garray : :class:`~numpy:numpy.array`
2 or 3 dimensional numpy array of genotypes/PGS values. First dimension is individuals. For a 2 dimensional array, the second dimension is SNPs or PGS values.
For a 3 dimensional array, the second dimension indexes the individual and his/her relatives' genotypes (for example: proband, paternal, and maternal); and
the third dimension is the SNPs.
ids : :class:`~numpy:numpy.array`
vector of individual IDs of same length as first dimension of garray
sid : :class:`~numpy:numpy.array`
vector of SNP ids, equal in length, L, to last dimension of array
alleles : :class:`~numpy:numpy.array`
[L x 2] matrix of ref and alt alleles for the SNPs. L must match size of sid
pos : :class:`~numpy:numpy.array`
vector of SNP positions; must match size of sid
chrom : :class:`~numpy:numpy.array`
vector of SNP chromosomes; must match size of sid
map : :class:`~numpy:numpy.array`
vector of SNP chromosomes; must match size of sid
fams : :class:`~numpy:numpy.array`
vector of family IDs; must match size of ids
par_status : :class:`~numpy:numpy.array'
[N x 2] numpy matrix that records whether parents have observed or imputed genotypes/PGS, where N matches size of ids.
The first column is for the father of that individual; the second column is for the mother of that individual.
If the parent is neither observed nor imputed, the value is -1; if observed, 0; and if imputed, 1.
Returns:
G : :class:`snipar.gtarray`
"""
def __init__(self, garray, ids, sid=None, alleles=None, pos=None, chrom=None, map=None, error_probs=None, fams=None, par_status=None, ped=None):
if type(garray) == np.ndarray or type(garray) == np.ma.core.MaskedArray:
if type(garray) == np.ndarray:
self.gts = ma.array(garray,mask=np.isnan(garray))
else:
self.gts = garray
self.shape = garray.shape
self.ndim = garray.ndim
self.dtype = garray.dtype
self.freqs = None
else:
raise ValueError('Genotypes must be a numpy ndarray')
if garray.shape[0] == ids.shape[0]:
self.ids = ids
self.id_dict = make_id_dict(ids)
else:
raise ValueError('Shape of genotypes and ids does not match')
if sid is not None:
if sid.shape[0] == garray.shape[1]:
self.snp_index = 1
self.sid = sid
self.sid_dict = make_id_dict(sid)
elif sid.shape[0] == garray.shape[2]:
self.snp_index = 2
self.sid = sid
self.sid_dict = make_id_dict(sid)
else:
raise ValueError('Shape of SNP ids (sid) does not match shape of genotype array')
if alleles is not None:
if self.sid is not None:
if alleles.shape[0] == self.sid.shape[0]:
self.alleles = alleles
else:
raise ValueError('Size of alleles does not match size of genotypes')
else:
raise(ValueError('Must provide SNP ids'))
else:
self.alleles = None
if pos is not None:
if self.sid is not None:
if pos.shape[0] == self.sid.shape[0]:
self.pos = pos
else:
raise ValueError('Size of position vector does not match size of genotypes')
else:
raise(ValueError('Must provide SNP ids'))
else:
self.pos = None
if chrom is not None:
if self.sid is not None:
if chrom.shape[0] == self.sid.shape[0]:
self.chrom = chrom
else:
raise ValueError('Size of map does not match number of SNPs')
else:
raise(ValueError('Must provide SNP ids'))
else:
self.chrom = None
if map is not None:
if self.sid is not None:
if map.shape[0] == self.sid.shape[0]:
self.map = map
else:
raise ValueError('Size of map does not match number of SNPs')
else:
raise(ValueError('Must provide SNP ids'))
else:
self.map = None
if error_probs is not None:
if self.sid is not None:
if error_probs.shape[0] == self.sid.shape[0]:
self.error_probs = error_probs
else:
raise ValueError('Size of map does not match number of SNPs')
else:
raise(ValueError('Must provide SNP ids'))
else:
self.error_probs = None
if fams is not None:
if fams.shape[0] == ids.shape[0] and fams.ndim==1:
self.fams = fams
else:
raise ValueError('Fams not of same length as IDs')
else:
self.fams = None
if par_status is not None:
if par_status.shape[0] == ids.shape[0] and par_status.shape[1] == 2:
self.par_status = par_status
else:
raise ValueError('Incompatible par status array')
else:
self.par_status = None
if ped is not None:
self.ped = ped
else:
self.ped = None
self.mean_normalised = False
if np.sum(self.gts.mask)>0:
self.has_NAs = True
else:
self.has_NAs = False
self.info = None
[docs] def compute_freqs(self):
"""
Computes the frequencies of the SNPs. Stored in self.freqs.
"""
if self.ndim == 2:
self.freqs = ma.mean(self.gts,axis=0)/2.0
elif self.ndim == 3:
self.freqs = ma.mean(self.gts[:,0,:], axis=0) / 2.0
[docs] def filter(self, filter_pass):
if self.freqs is not None:
self.freqs = self.freqs[filter_pass]
if self.ndim == 2:
self.gts = self.gts[:,filter_pass]
elif self.ndim == 3:
self.gts = self.gts[:,:,filter_pass]
self.shape = self.gts.shape
if self.sid is not None:
self.sid = self.sid[filter_pass]
self.sid_dict = make_id_dict(self.sid)
if self.pos is not None:
self.pos = self.pos[filter_pass]
if self.alleles is not None:
self.alleles = self.alleles[filter_pass]
if self.chrom is not None:
self.chrom = self.chrom[filter_pass]
if self.map is not None:
self.map = self.map[filter_pass]
if self.error_probs is not None:
self.error_probs = self.error_probs[filter_pass]
[docs] def filter_maf(self, min_maf = 0.01, verbose=False):
"""
Filter SNPs based on having minor allele frequency (MAF) greater than min_maf, and have % missing observations less than max_missing.
"""
if self.freqs is None:
self.compute_freqs()
freqs_pass = np.logical_and(self.freqs > min_maf, self.freqs < (1 - min_maf))
if verbose:
print(str(self.freqs.shape[0] - np.sum(freqs_pass)) + ' SNPs with MAF<' + str(min_maf))
self.filter(freqs_pass)
[docs] def filter_missingness(self, max_missing = 5, verbose=False):
if self.ndim == 2:
missingness = ma.mean(self.gts.mask,axis=0)
elif self.ndim == 3:
missingness = ma.mean(self.gts.mask,axis = (0,1))
missingness_pass = 100 * missingness < max_missing
if verbose:
print(str(self.freqs.shape[0] - np.sum(missingness_pass)) + ' SNPs with missingness >' + str(max_missing) + '%')
self.filter(missingness_pass)
[docs] def compute_info(self):
if self.freqs is None:
self.compute_freqs()
if self.ndim == 2:
self.variances = np.var(self.gts, axis=0)
elif self.ndim==3:
self.variances = np.var(self.gts[:,0,:], axis=0)
self.info = self.variances/(2.0*self.freqs*(1-self.freqs))
[docs] def filter_info(self, min_info = 0.99, verbose=False):
if self.info is None:
self.compute_info()
info_pass = self.info > min_info
if verbose:
print(str(self.info.shape[0] - np.sum(info_pass)) + ' SNPs with INFO <' + str(min_info))
self.filter(info_pass)
[docs] def filter_ids(self,keep_ids, verbose=False):
"""
Keep only individuals with ids given by keep_ids
"""
in_ids = np.array([x in self.id_dict for x in keep_ids])
n_filtered = np.sum(in_ids)
if n_filtered==0:
raise(ValueError('No individuals would be left after filtering'))
else:
if verbose:
print('After filtering, '+str(n_filtered)+' individuals remain')
indices = np.array([self.id_dict[x] for x in keep_ids[in_ids]])
if self.ndim == 2:
self.gts = self.gts[indices, :]
elif self.ndim == 3:
self.gts = self.gts[indices, :, :]
self.ids = self.ids[indices]
self.id_dict = make_id_dict(self.ids)
self.shape = self.gts.shape
if self.fams is not None:
self.fams = self.fams[indices]
if self.par_status is not None:
self.par_status = self.par_status[indices, :]
[docs] def mean_normalise(self):
"""
This normalises the SNPs/PGS columns to have mean-zero.
"""
if not self.mean_normalised:
if self.gts.ndim == 2:
self.gts = self.gts - ma.mean(self.gts,axis=0)
elif self.gts.ndim==3:
for i in range(0, self.gts.shape[1]):
self.gts[:, i, :] = self.gts[:, i, :] - ma.mean(self.gts[:, i, :], axis=0)
self.mean_normalised = True
[docs] def scale(self):
"""
This normalises the SNPs/PGS columns to have variance 1.
"""
if self.gts.ndim == 2:
self.gts = self.gts/ma.std(self.gts, axis=0)
elif self.gts.ndim == 3:
for i in range(0, self.gts.shape[1]):
self.gts[:, i, :] = self.gts[:, i, :]/ma.std(self.gts[:, i, :], axis=0)
[docs] def fill_NAs(self):
"""
This normalises the SNP columns to have mean-zero, then fills in NA values with zero.
"""
if not self.mean_normalised:
self.mean_normalise()
NAs = np.sum(self.gts.mask, axis=0)
self.gts[self.gts.mask] = 0
self.gts.mask = False
self.has_NAs = False
return NAs
[docs] def add(self,garray):
"""
Adds another gtarray of the same dimension to this array and returns the sum. It matches IDs before summing.
"""
if type(garray)==gtarray:
pass
else:
raise ValueError('Must add to another gtarray')
if not self.gts.ndim == garray.gts.ndim:
raise ValueError('Arrays must have same number of dimensions')
if self.gts.ndim == 2:
if not self.gts.shape[1] == garray.gts.shape[1]:
raise ValueError('Arrays must have same dimensions (apart from first)')
if self.gts.ndim == 3:
if not self.gts.shape[1:3] == garray.gts.shape[1:3]:
raise ValueError('Arrays must have same dimensions (apart from first)')
# Match IDs
common_ids = list(self.id_dict.keys() & garray.id_dict.keys())
if len(common_ids) == 0:
raise ValueError('No IDs in common')
self_index = np.array([self.id_dict[x] for x in common_ids])
other_index = np.array([garray.id_dict[x] for x in common_ids])
# Out
if self.ids.ndim == 1:
ids_out = self.ids[self_index]
else:
ids_out = self.ids[self_index, :]
if self.gts.ndim ==2:
add_gts = self.gts[self_index, :]+garray.gts[other_index, :]
else:
add_gts = self.gts[self_index, :, :] + garray.gts[other_index, :, :]
if self.ped is None:
ped = garray.ped
else:
ped = self.ped
return gtarray(add_gts, ids_out, self.sid, alleles=self.alleles, fams=self.fams[self_index], par_status=self.par_status[self_index,:], ped=ped)
[docs] def diagonalise(self,inv_root):
"""
This will transform the genotype array based on the inverse square root of the phenotypic covariance matrix
from the family based linear mixed model.
"""
if self.fams is None:
raise(ValueError('Family labels needed for diagonalization'))
if not self.mean_normalised:
self.mean_normalise()
unique_fams, famsizes = np.unique(self.fams, return_counts = True)
fam_indices = dict()
# Transform
for fam in unique_fams:
fam_indices[fam] = np.where(self.fams == fam)[0]
famsize = fam_indices[fam].shape[0]
if self.ndim == 2:
if famsize == 1:
self.gts[fam_indices[fam], :] = inv_root[1]*self.gts[fam_indices[fam],:]
else:
self.gts[fam_indices[fam],:] = inv_root[famsize].dot(self.gts[fam_indices[fam],:])
elif self.ndim == 3:
if famsize == 1:
self.gts[fam_indices[fam], : , :] = inv_root[1]*self.gts[fam_indices[fam], : , :]
else:
for j in range(self.shape[1]):
self.gts[fam_indices[fam],j, :] = inv_root[famsize].dot(self.gts[fam_indices[fam],j, :])
self.fam_indices = fam_indices