Source code for snipar.map

from numba import njit, prange
import numpy as np
from os import path
import snipar
from snipar.utilities import make_id_dict
from snipar.gtarray import gtarray

[docs]@njit def pos_to_cM(pos,boundaries, cM_pos): cM_out = np.zeros((pos.shape[0]), dtype=np.float_) cM_out[...] = np.nan current_seg = 0 for i in range(pos.shape[0]): if boundaries[cM_pos.shape[0]] > pos[i] >= boundaries[0]: unmapped = True while unmapped: if boundaries[current_seg + 1] > pos[i] >= boundaries[current_seg]: cM_out[i] = cM_pos[current_seg] unmapped = False else: current_seg += 1 return cM_out
[docs]def decode_map_from_pos(chrom,pos): decode_map_path = path.join(path.dirname(snipar.__file__), f'util_data/decode_map/chr_{chrom}.gz') map = np.loadtxt(decode_map_path, dtype=float, skiprows=1) boundaries = np.hstack((np.array(map[0, 0], dtype=np.int_),np.array(map[:, 1], dtype=np.int_))) return pos_to_cM(pos, boundaries, map[:, 2])
# Read header of mapfile
[docs]def get_map_positions(mapfile,gts,min_map_prop = 0.5): map_file = open(mapfile,'r') map_header = map_file.readline() map_header = np.array(map_header.split(' ')) map_header[len(map_header)-1] = map_header[len(map_header)-1].split('\n')[0] map_file.close() if 'pposition' in map_header and 'gposition' in map_header: bp_pos = np.loadtxt(mapfile,usecols = np.where(map_header=='pposition')[0][0], dtype=int, skiprows =1) pos_dict = make_id_dict(bp_pos) cm_pos = np.loadtxt(mapfile,usecols = np.where(map_header=='gposition')[0][0], dtype=float, skiprows =1) # Check for NAs if np.sum(np.isnan(cm_pos)) > 0: raise (ValueError('Map cannot have NAs')) if np.min(cm_pos) < 0: raise (ValueError('Map file cannot have negative values')) if np.var(cm_pos) == 0: raise (ValueError('Map file has no variation')) # Check ordering ordered_map = np.sort(cm_pos) if np.array_equal(cm_pos, ordered_map): pass else: raise (ValueError('Map not monotonic. Please make sure input is ordered correctly')) # Check scale if np.max(cm_pos) > 5000: raise (ValueError('Maximum value of map too large')) # Find positions of SNPs in map file map = np.zeros((gts.shape[1]),dtype=float) map[:] = np.nan in_map = np.array([x in pos_dict for x in gts.pos]) # Check if we have at least 50% of SNPs in map prop_in_map = np.mean(in_map) if prop_in_map < min_map_prop: raise(ValueError('Only '+str(round(100*prop_in_map))+'% of SNPs have genetic positions in '+mapfile+'. Need at least '+str(round(100*min_map_prop))+'%')) print('Found genetic map positions for '+str(round(100*prop_in_map))+'% of SNPs in '+mapfile) # Fill in map values map[in_map] = cm_pos[[pos_dict[x] for x in gts.pos[in_map]]] # Linearly interpolate map if prop_in_map < 1: print('Linearly interpolating genetic map for SNPs not in input map') map = np.interp(gts.pos, gts.pos[in_map], map[in_map]) return map else: raise(ValueError('Map file must contain columns pposition and gposition'))
[docs]def map_from_bed(bedfile, chrom): bimfile = bedfile.split('.bed')[0]+'.bim' print('Attempting to read map from ' + bimfile) bim = np.loadtxt(bimfile, usecols=(1,2,3), dtype=str) bim_map = np.array(bim[:,1],dtype=float) bim_pos = np.array(bim[:,2],dtype=int) # Check for NAs if np.var(bim_map) == 0: print('Map information not found in bim file.') print('Using default map (decode sex averaged map on Hg19 coordinates)') map = decode_map_from_pos(chrom, bim_pos) pc_mapped = str(round(100*(1-np.mean(np.isnan(map))),2)) print('Found map positions for '+str(pc_mapped)+'% of SNPs') else: if np.sum(np.isnan(bim_map)) > 0: raise (ValueError('Map cannot have NAs')) if np.min(bim_map) < 0: raise (ValueError('Map file cannot have negative values')) # Check ordering ordered_map = np.sort(bim_map) if np.array_equal(bim_map, ordered_map): pass else: raise (ValueError('Map not monotonic. Please make sure input is ordered correctly')) # Check scale if np.max(bim_map) > 500: raise (ValueError('Maximum value of map too large')) map = bim_map print('Read map from bim file') return bim[:,0], map