From 195d5fd72f01f820cb9fc3231e10e7a44d4accf0 Mon Sep 17 00:00:00 2001 From: sylvain tricot Date: Thu, 1 Apr 2021 18:37:19 +0200 Subject: [PATCH] Add a module for multiprocessing. --- src/msspec/looper.py | 287 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 src/msspec/looper.py diff --git a/src/msspec/looper.py b/src/msspec/looper.py new file mode 100644 index 0000000..192acb3 --- /dev/null +++ b/src/msspec/looper.py @@ -0,0 +1,287 @@ +# coding: utf8 +# -*- encoding: future_fstrings -*- +# vim: set et sw=4 ts=4 nu tw=79 cc=+1: + +from collections import OrderedDict +from functools import partial +import itertools +import logging +import multiprocessing as mp +import numpy as np +import pandas as pd +import time + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +class Variable: + def __init__(self, name, doc=""): + self.name = name + self.doc = doc + + def __repr__(self): + return f"" + +class Sweep: + def __init__(self, key, comments="", unit=None, + values=None, + start=None, stop=None, step=None, + num=10, scale='lin', base=10, + default=None, + folded=False, + unzip=False, + group=None, + accumulations=1, + linked_to=None): + self.key = key # The variable's name + self.comments = comments + self.default = default + self.folded = folded + self.unzip = unzip + self.linked_to = linked_to + self.group = None + # First use case: values are specidied + if values is not None: + self.values = values + else: + assert start is not None and stop is not None + self.start = start + self.stop = stop + if step is not None: + self.step = step + self.values = np.arange(start, stop, step) + else: + self.num = num + if scale == 'lin': + self.values = np.linspace(start, stop, num) + elif scale == 'log': + self.base = base + self.values = np.logspace(start, stop, num, base=base) + + def __getitem__(self, index): + return (self.key, self.values[index]) + + def __len__(self): + return len(self.values) + + def __repr__(self): + return "<{}(\'{}\')>".format(self.__class__.__name__, self.key) + + @property + def unfolded(self): + return not(self.folded) + + +class SweepRange: + def __init__(self, *sweeps, passindex=False): + self.sweeps = sweeps + self.passindex = passindex + self.index = 0 + + # First check that sweeps that are linked to another on are all included + links = {} + for sweep in sweeps: + if sweep.linked_to is not None: + if sweep.linked_to not in sweeps: + raise NameError(("The sweep \'{}\' is linked to \'{}\' " + "but is not included in the loop!").format( + sweep.key, sweep.linked_to.key)) + # add the linked sweep to the list of links + if sweep.linked_to not in links.keys(): + links[sweep.linked_to] = [sweep,] + else: + links[sweep.linked_to].append(sweep) + + # The cumulative product of lengths + lengths = [] + for sweep in sweeps: + if sweep.linked_to is None: + lengths.append(len(sweep) if sweep.unfolded else 1) + else: + lengths.append(1) + cn = np.cumprod(lengths) + # Get the total number of combinations is the last one + ntot = cn[-1] + + self.links = links + self.cn = cn + self.ntot = ntot + + def __len__(self): + return self.ntot + + def __iter__(self): + self.index = 0 + return self + + def __next__(self): + if self.index < self.ntot: + i = self.index + self.index += 1 + + row = OrderedDict({k: None for k in self.columns}) + for isweep, sweep in enumerate(self.sweeps): + # If sweep is linked to another one, do nothing as its value + # will be added by its parent + if sweep.linked_to is not None: + continue + + # Compute the index + if sweep.folded: + key, value = sweep.key, sweep.values + row[key] = value + else: + idx = int(i/(self.ntot/self.cn[isweep])) % len(sweep) + + # If this sweep has links, add also all values from its + # children + children = self.links.get(sweep, []) + for s in [sweep,] + children: + key, value = s[idx] + row[key] = value + if self.passindex: + row['sweep_index'] = i + return row + else: + raise StopIteration + + @property + def columns(self): + cols = [sweep.key for sweep in self.sweeps] + if self.passindex: + cols.append('sweep_index') + return cols + + @property + def values(self): + return [list(row.values()) for row in self] + + @property + def items(self): + items = [] + for row in self: + items.append(row) + return items + + +class Looper: + def __init__(self): + pass + + @property + def pipeline(self): + try: + return self._pipeline + except AttributeError as error: + return None + + @pipeline.setter + def pipeline(self, value): + self._pipeline = value + + def _wrapper(self, x): + logger.debug("Pipeline called with {}".format(x)) + return self.pipeline(**x) + + def run(self, *sweeps, ncpu=1, passindex=False): + logger.info("Loop starts...") + # prepare the list of inputs + sr = SweepRange(*sweeps, passindex=passindex) + items = sr.items + + data = [] + + if ncpu == 1: + # serial processing... + logger.info("serial processing...") + t0 = time.time() + + for i, values in enumerate(items): + result = self._wrapper(values) + data.append(result) + + t1 = time.time() + dt = t1 - t0 + logger.info("Processed {:d} sets of inputs in {:.3f} seconds".format( + len(sr), dt)) + + else: + # Parallel processing... + chunksize = 1 #int(nsets/ncpu) + logger.info(("Parallel processing over {:d} cpu (chunksize={:d})..." + "").format(ncpu, chunksize)) + t0 = time.time() + + pool = mp.Pool(processes=ncpu) + data = pool.map(self._wrapper, items, chunksize=chunksize) + pool.close() + pool.join() + + t1 = time.time() + dt = t1 - t0 + logger.info(("Processed {:d} sets of inputs in {:.3f} seconds" + "").format(len(sr), dt)) + + # Create the DataFrame + dfdata = [] + columns = sr.columns + ['output',] + + for i in range(len(sr)): + row = list(items[i].values()) + row.append(data[i]) + dfdata.append(row) + + df = pd.DataFrame(dfdata, columns=columns) + + return df + + + + +if __name__ == "__main__": + import numpy as np + import time + + logger.setLevel(logging.DEBUG) + + + def bar(**kwargs): + return 0 + + def post_process(data): + x = data.x.unique() + y = data.y.unique() + + + theta = Sweep(key='theta', comments="The polar angle", + start=-70, stop=70, num=3, + unit='degree', + folded=True) + + phi = Sweep('phi', comments="The azimutal angle", + values=[0, 45], + unit='degree', + folded=True) + + emitter_plane = Sweep('emitter_plane', comments="The emitter plane", + start=0, stop=3, step=1) + + emitter = Sweep(key='emitter', values=('Ti', 'Sr')) + + levels = Sweep(key='level', values=('2p', '3d'), linked_to=emitter) + + lmaxt = Sweep(key='lmaxt', values=(25, 29, 30))#, linked_to=emitter) + + uij = Sweep(key='uij', values=(0.01, 0.02, 0.03))#, linked_to=lmaxt) + + sweeps = [theta, phi, emitter_plane, emitter, levels, lmaxt, uij] + + + looper = Looper() + looper.pipeline = bar + data = looper.run(emitter, emitter_plane, uij, theta, levels, ncpu=1, + passindex=True) + + print(data)