Source code for kinetics

"""
Map kinetics from gibbs data to md trajectory.

This module provides the `MapKinetics` class, which creates trajectories and
weighted densities based on the clustered gibbs data and original trajectory.
"""

from tqdm import tqdm
from basicrta.util import get_start_stop_frames
import numpy as np
import MDAnalysis as mda
import os
import pickle


[docs] class MapKinetics(object): r""" The MapKinetics class takes processed Gibbs sampler data (an instance of the Gibbs class) and the contact file to create costomized trajectories, containing all of `sel1` from the initial contact analysis and a single `sel2` residue. :param gibbs: Processed instance of :class:`Gibbs` class :type gibbs: str :param contacts: Contact pickle file (`contacts-{cutoff}.pkl`) :type contacts: str """ def __init__(self, gibbs, contacts): self.gibbs = gibbs self.cutoff = float(contacts.split('/')[-1].strip('.pkl'). split('_')[-1]) self.write_sel = None self.contacts = contacts with open(contacts, 'rb') as f: tmpcontacts = pickle.load(f) metadata = tmpcontacts.dtype.metadata self.ag1 = metadata['ag1'] self.ag2 = metadata['ag2'] self.ts = metadata['ts'] self.utop = metadata['top'] self.utraj = metadata['traj'] del tmpcontacts self.dataname = (f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'den_write_data.npy') self.topname = (f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'reduced.gro') self.fulltraj = (f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'chol_traj_all.xtc') def _create_data(self): from numpy.lib.format import open_memmap with open(self.contacts, 'rb') as f: contacts = pickle.load(f) resid = int(self.gibbs.residue[1:]) ncomp = self.gibbs.processed_results.ncomp times = np.array(contacts[contacts[:, 0] == resid][:, 3]) trajtimes = np.array(contacts[contacts[:, 0] == resid][:, 2]) lipinds = np.array(contacts[contacts[:, 0] == resid][:, 1]) del contacts indicators = self.gibbs.processed_results.indicator bframes, eframes = get_start_stop_frames(trajtimes, times, self.ts) tmplens = [len(np.arange(b, e)) for b, e in zip(bframes, eframes)] totlen = sum(tmplens) write_data = open_memmap(self.dataname, mode='w+', dtype=np.float64, shape=(totlen, ncomp+2)) j = 0 for b, e, l, i in tqdm(zip(bframes, eframes, lipinds, indicators), total=len(bframes)): tmp = np.arange(b, e) tmpl = np.ones_like(np.arange(b, e)) * l tmpi = i * np.ones((len(np.arange(b, e)), ncomp)) write_data[j:j+len(tmp), 0] = tmp write_data[j:j+len(tmp), 1] = tmpl write_data[j:j+len(tmp), 2:] = tmpi j += len(tmp)
[docs] def create_traj(self, top_n=None): r""" Create the customized trajectories for the individual mixture components of the model. If `top_n` is None, a single trajectory is created with all of `sel1` and a single `sel2` residue, with every contact accounted for (ie. a single frame in the original trajectory may be used multiple times due to multiple contacts formed with the `sel1` residue of interest at that frame). :param top_n: Number of frames desired for the individual trajectories (sorted in order of decreasing classification probability). :type top_n: int """ if os.path.exists(self.fulltraj) and top_n is None: raise FileExistsError(f'{self.fulltraj} exists, remove then rerun') write_ag = self.ag1.atoms + self.ag2.residues[0].atoms write_ag.atoms.write(self.topname) if not os.path.exists(self.dataname): self._create_data() tmp = np.load(self.dataname, mmap_mode='r') u = mda.Universe(f'{self.utop}', f'{self.utraj}') ag1 = u.atoms[self.ag1.indices] ag2 = u.atoms[self.ag2.indices] if top_n is not None: sortinds = [tmp[:, i+2].argsort()[::-1][:top_n] for i in range(self.gibbs.processed_results.ncomp)] for k in range(self.gibbs.processed_results.ncomp): swf = tmp[sortinds[k], 0].astype(int) swl = tmp[sortinds[k], 1].astype(int) with mda.Writer(f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'chol_traj_comp{k}_top{top_n}.xtc', len(write_ag.atoms)) as W: for i, ts in tqdm(enumerate(u.trajectory[swf]), total=len(swf), desc=f'writing component {k}'): W.write(ag1 + ag2.select_atoms(f'resid {swl[i]}')) else: with mda.Writer(self.fulltraj, len(write_ag.atoms)) as W: for i, ts in tqdm(enumerate(u.trajectory[tmp[:, 0]. astype(int)]), total=len(tmp), desc='writing trajectory'): W.write(ag1 + ag2.select_atoms(f'resid {int(tmp[i, 1])}'))
[docs] def weighted_densities(self, step=1, top_n=None, filterP=0): """ Create weighted densities based on the marginal posterior probabilities of the model component classification. :param step: Use every `Nth` frame :type step: int :param top_n: Use the `N` most likely frames for each component to compute the weighted densities from. :type top_n: int :param filterP: Probabilities less than `filterP` are not used in the weighted density calculation :type filterP: float """ if not os.path.exists(self.fulltraj): self.create_traj() resid = int(self.gibbs.residue[1:]) tmp = np.load(self.dataname, mmap_mode='r+') wi = tmp[:, 2:] # filter anything less than 50% certain if filterP > 0: wi[wi < filterP] = 0 # filter_inds = np.where(wi > filterP) # wi = wi[filter_inds[0]][::self.step] # comp_inds = [np.where(filter_inds[1] == i)[0] for i in # range(self.gibbs.processed_results.ncomp)] u_red = mda.Universe(self.topname, self.fulltraj) chol_red = u_red.select_atoms('not protein') if top_n is None: from basicrta.pwdensity import WDensityAnalysis u_red = mda.Universe(self.topname, self.fulltraj) chol_red = u_red.select_atoms('not protein') d = WDensityAnalysis(chol_red, wi, gridcenter=u_red.select_atoms(f'protein and ' f'resid {resid}') .center_of_geometry(), xdim=40, ydim=40, zdim=40) d.run(verbose=True, step=step) if step > 1: outnames = [(f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'wcomp{k}_all_step{step}.dx') for k in range(self.gibbs.processed_results.ncomp)] else: outnames = [(f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'wcomp{k}_all.dx') for k in range(self.gibbs.processed_results.ncomp)] [den.export(outnames[k]) for k, den in enumerate(d.results.densities)] else: from basicrta.wdensity import WDensityAnalysis sortinds = [wi[:, i].argsort()[::-1] for i in range(self.gibbs.processed_results.ncomp)] for k in range(self.gibbs.processed_results.ncomp): frames = np.where(wi[sortinds[k], k] > 0)[0][:top_n:step] tmpwi = wi[frames, k] d = WDensityAnalysis(chol_red, tmpwi, gridcenter=u_red.select_atoms(f'protein ' f'and resid ' f'{resid}') .center_of_geometry(), xdim=40, ydim=40, zdim=40) d.run(verbose=True, frames=sortinds[k][frames]) if step > 1: outname = (f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'wcomp{k}_top{top_n}_step{step}.dx') else: outname = (f'basicrta-{self.cutoff}/{self.gibbs.residue}/' f'wcomp{k}_top{top_n}.dx') d.results.density.export(outname)
[docs] def get_parser(): import argparse parser = argparse.ArgumentParser(description="""map kinetics from clustered results onto trajectory, create weighted densities if flag is used""") required = parser.add_argument_group('required arguments') required.add_argument("--gibbs", type=str, required=True, help="""gibbs pickle file to use for creating kinetic trajectories and densities""") required.add_argument("--contacts", type=str, required=True, help="""contacts file used in creation of the gibbs sampler data""") parser.add_argument("--top_n", type=int, nargs='?', help="""use the `top_n` most likely frames to create trajectory or densities""") parser.add_argument("--step", type=int, nargs='?', default=1, help="""write out frame if frame%%step=0""") parser.add_argument("--wdensity", action='store_true', help="""create weighted densities""") # this is to make the cli work, should be just a temporary solution parser.add_argument('kinetics', nargs='?', help=argparse.SUPPRESS) return parser
[docs] def main(): from basicrta.gibbs import Gibbs parser = get_parser() args = parser.parse_args() g = Gibbs().load(args.gibbs) mk = MapKinetics(g, args.contacts) mk.create_traj(top_n=args.top_n) if args.wdensity: mk.weighted_densities(step=args.step, top_n=args.top_n)
if __name__ == "__main__": exit(main())