Source code for snipar.utilities

from typing import Any, Dict, Optional, Union
from pathlib import Path
import numpy as np
import pandas as pd
import bgen_reader
from os import path
import argparse
import re
from typing import Tuple

[docs]def make_id_dict(x,col=0): """ Make a dictionary that maps from the values in the given column (col) to their row-index in the input array """ if len(x.shape)>1: x = x[:,col] id_dict = {} for i in range(0,x.shape[0]): id_dict[x[i]] = i return id_dict
[docs]def convert_str_array(x): """ Convert an ascii array to unicode array (UTF-8) """ x = np.array(x) x_shape = x.shape x = x.flatten() x_out = np.array([y.decode('UTF-8') for y in x]) return x_out.reshape(x_shape)
[docs]def encode_str_array(x): """ Encode a unicode array as an ascii array """ x = np.array(x) x_shape = x.shape x = x.flatten() x_out = np.array([y.encode('ascii') for y in x]) return x_out.reshape(x_shape)
[docs]def parse_obsfiles(obsfiles, obsformat='bed', append = True, wildcard = '@', chromosomes=None): obs_files = [] chroms = [] if wildcard in obsfiles: bed_ixes = obsfiles.split(wildcard) if chromosomes is None: chromosomes = np.arange(1,23) for i in chromosomes: obsfile = bed_ixes[0]+str(i)+bed_ixes[1]+'.'+obsformat if path.exists(obsfile): if append: obs_files.append(obsfile) else: obs_files.append(obsfile[:-(len(obsformat)+1)]) chroms.append(i) if len(obs_files)==0: raise(ValueError('Observed genotype files not found')) else: print(str(len(obs_files))+' files found') else: obsfile = obsfiles+'.'+obsformat if path.exists(obsfile): obs_files = [obsfile] chroms = [0] else: raise(ValueError(obsfile+' not found')) return np.array(obs_files), np.array(chroms,dtype=int)
[docs]def parse_filelist(obsfiles, impfiles, obsformat, chromosomes=None): obs_files = [] imp_files = [] chroms = [] if '@' in obsfiles and impfiles: bed_ixes = obsfiles.split('@') imp_ixes = impfiles.split('@') if chromosomes is None: chromosomes = np.arange(1,23) for i in chromosomes: obsfile = bed_ixes[0]+str(i)+bed_ixes[1]+'.'+obsformat impfile = imp_ixes[0]+str(i)+imp_ixes[1]+'.hdf5' if path.exists(impfile) and path.exists(obsfile): obs_files.append(obsfile) imp_files.append(impfile) chroms.append(i) if len(imp_files)==0: raise(ValueError('Observed/imputed genotype files not found')) else: print(str(len(imp_files))+' matched observed and imputed genotype files found') else: obsfile = obsfiles+'.'+obsformat impfile = impfiles+'.hdf5' if path.exists(obsfile) and path.exists(impfile): obs_files = [obsfile] imp_files = [impfile] chroms = [0] else: if not path.exists(obsfile): raise(ValueError(obsfile+' not found')) if not path.exists(impfile): raise(ValueError(impfile+' not found')) return np.array(obs_files), np.array(imp_files), np.array(chroms,dtype=int)
[docs]def outfile_name(outprefix,outsuffix,chrom=None): if '@' in outprefix: if chrom is None: raise(ValueError('Must provide chromosome number with wildcard character')) outprefix = outprefix.split('@') outprefix = outprefix[0]+str(chrom)+outprefix[1] return outprefix+outsuffix elif chrom is not None: return outprefix+'chr_'+str(chrom)+outsuffix else: return outprefix+outsuffix
[docs]def parseNumRange(string): """reads either a int or a range""" match_range = re.match(r' *(\d+) *- *(\d+) *', string) match_list = re.match(r' *(\d+) *', string) if match_range: start = int(match_range.group(1)) end = int(match_range.group(2)) result = [str(n) for n in range(start, end+1)] elif match_list: result = match_list.group(0) else: raise Exception(f"{string} is neither a range of the form x-y nor a list of integers of the form x y z") return result
[docs]class NumRangeAction(argparse.Action): """flattens and sorts the resulting num range. also removes duplicates""" def __call__(self, parser, args, values, option_string=None): result = [] for v in values: if isinstance(v,list): result += v if isinstance(v,str): result.append(v) result = np.unique(result).astype(int) result.sort() result = result.astype(str).tolist() setattr(args, self.dest, result)
[docs]def coord2linear(ind1: int, ind2: int) -> int: row_ind, col_ind = max(ind1, ind2), min(ind1, ind2) return int(row_ind * (row_ind + 1) / 2 + col_ind)
[docs]def linear2coord(ind: int) -> Tuple[int, int]: ind += 1 ind1 = np.ceil((2 * ind + 0.25) ** 0.5 - 0.5) ind2 = ind - (ind1 - 1) * ind1 / 2 return int(ind1 - 1), int(ind2 - 1)
[docs]def get_parser_doc(parser): doc = "" for action in parser._actions: options = str(action.option_strings)[1:-1] default = action.default type = "" if action.type: if "'" in str(action.type): type = str(action.type).split("'")[1] help = action.help default_substring = "" if default: default_substring = f", default={default}" arg_doc = f""" {options} : {type}{default_substring} {help} """ doc += arg_doc return doc
class _bgen: """Simple wrapper class of bgen_reader.open_bgen that controls access to sample ids.""" def __init__(self, filepath: Union[str, Path], samples_filepath: Optional[Union[str, Path]] = None, allow_complex: bool = False, verbose: bool = True,): self._bgen = bgen_reader.open_bgen(filepath, allow_complex=allow_complex, verbose=verbose) self._samples = self._read_sample(samples_filepath) if self._bgen.samples.shape[0] != self._samples.shape[0]: raise ValueError('sample file length and bgen file length do not match.') @property def samples(self) -> np.ndarray: return self._samples def __getattr__(self, name: str) -> Any: """Access to attributes other than sample ids.""" return getattr(self._bgen, name) def _read_sample(self, samples_filepath: str) -> np.ndarray: return pd.read_csv(samples_filepath, sep='\s+')['ID_2'][1:].to_numpy(dtype=str)
[docs]def open_bgen(filename: str, verbose: bool = False) -> bgen_reader.open_bgen: """Wrapper of bgen_reader.open_bgen that checks if sample ids make sense.""" bgen = bgen_reader.open_bgen(filename, verbose=verbose) if bgen.samples[0] == 'sample_0': print('WARNING: Sample ids in bgen file are generic. Trying to read the corresponding .sample file ...') samples_filepath = filename[:-4] + 'sample' if not path.exists(samples_filepath): raise FileNotFoundError(f'{samples_filepath} does not exist.') bgen = _bgen(filename, verbose=verbose, samples_filepath=samples_filepath) return bgen
[docs]def read_bgen(filename: str, verbose: bool = False) -> Dict: """Wrapper of bgen_reader.read_bgen that checks if sample ids make sense.""" bgen = bgen_reader.read_bgen(filename, verbose=verbose) if bgen['samples'][0] == 'sample_0': print('WARNING: Sample ids in bgen file are generic. Trying to read the corresponding .sample file ...') samples_filepath = filename[:-4] + 'sample' if not path.exists(samples_filepath): raise FileNotFoundError(f'{samples_filepath} does not exist.') bgen = bgen_reader.read_bgen(filename, verbose=verbose, samples_filepath=samples_filepath) bgen['samples'] = pd.read_csv(samples_filepath, sep='\s+')['ID_2'][1:].to_numpy() return bgen