Add a module for multiprocessing.
epsi-builds/msspec_python3/pipeline/head This commit looks good
Details
epsi-builds/msspec_python3/pipeline/head This commit looks good
Details
This commit is contained in:
parent
94ce7648d4
commit
195d5fd72f
|
@ -0,0 +1,287 @@
|
||||||
|
# coding: utf8
|
||||||
|
# -*- encoding: future_fstrings -*-
|
||||||
|
# vim: set et sw=4 ts=4 nu tw=79 cc=+1:
|
||||||
|
|
||||||
|
from collections import OrderedDict
|
||||||
|
from functools import partial
|
||||||
|
import itertools
|
||||||
|
import logging
|
||||||
|
import multiprocessing as mp
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Variable:
|
||||||
|
def __init__(self, name, doc=""):
|
||||||
|
self.name = name
|
||||||
|
self.doc = doc
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<Variable(\'{self.name}\')>"
|
||||||
|
|
||||||
|
class Sweep:
|
||||||
|
def __init__(self, key, comments="", unit=None,
|
||||||
|
values=None,
|
||||||
|
start=None, stop=None, step=None,
|
||||||
|
num=10, scale='lin', base=10,
|
||||||
|
default=None,
|
||||||
|
folded=False,
|
||||||
|
unzip=False,
|
||||||
|
group=None,
|
||||||
|
accumulations=1,
|
||||||
|
linked_to=None):
|
||||||
|
self.key = key # The variable's name
|
||||||
|
self.comments = comments
|
||||||
|
self.default = default
|
||||||
|
self.folded = folded
|
||||||
|
self.unzip = unzip
|
||||||
|
self.linked_to = linked_to
|
||||||
|
self.group = None
|
||||||
|
# First use case: values are specidied
|
||||||
|
if values is not None:
|
||||||
|
self.values = values
|
||||||
|
else:
|
||||||
|
assert start is not None and stop is not None
|
||||||
|
self.start = start
|
||||||
|
self.stop = stop
|
||||||
|
if step is not None:
|
||||||
|
self.step = step
|
||||||
|
self.values = np.arange(start, stop, step)
|
||||||
|
else:
|
||||||
|
self.num = num
|
||||||
|
if scale == 'lin':
|
||||||
|
self.values = np.linspace(start, stop, num)
|
||||||
|
elif scale == 'log':
|
||||||
|
self.base = base
|
||||||
|
self.values = np.logspace(start, stop, num, base=base)
|
||||||
|
|
||||||
|
def __getitem__(self, index):
|
||||||
|
return (self.key, self.values[index])
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.values)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "<{}(\'{}\')>".format(self.__class__.__name__, self.key)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unfolded(self):
|
||||||
|
return not(self.folded)
|
||||||
|
|
||||||
|
|
||||||
|
class SweepRange:
|
||||||
|
def __init__(self, *sweeps, passindex=False):
|
||||||
|
self.sweeps = sweeps
|
||||||
|
self.passindex = passindex
|
||||||
|
self.index = 0
|
||||||
|
|
||||||
|
# First check that sweeps that are linked to another on are all included
|
||||||
|
links = {}
|
||||||
|
for sweep in sweeps:
|
||||||
|
if sweep.linked_to is not None:
|
||||||
|
if sweep.linked_to not in sweeps:
|
||||||
|
raise NameError(("The sweep \'{}\' is linked to \'{}\' "
|
||||||
|
"but is not included in the loop!").format(
|
||||||
|
sweep.key, sweep.linked_to.key))
|
||||||
|
# add the linked sweep to the list of links
|
||||||
|
if sweep.linked_to not in links.keys():
|
||||||
|
links[sweep.linked_to] = [sweep,]
|
||||||
|
else:
|
||||||
|
links[sweep.linked_to].append(sweep)
|
||||||
|
|
||||||
|
# The cumulative product of lengths
|
||||||
|
lengths = []
|
||||||
|
for sweep in sweeps:
|
||||||
|
if sweep.linked_to is None:
|
||||||
|
lengths.append(len(sweep) if sweep.unfolded else 1)
|
||||||
|
else:
|
||||||
|
lengths.append(1)
|
||||||
|
cn = np.cumprod(lengths)
|
||||||
|
# Get the total number of combinations is the last one
|
||||||
|
ntot = cn[-1]
|
||||||
|
|
||||||
|
self.links = links
|
||||||
|
self.cn = cn
|
||||||
|
self.ntot = ntot
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return self.ntot
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
self.index = 0
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __next__(self):
|
||||||
|
if self.index < self.ntot:
|
||||||
|
i = self.index
|
||||||
|
self.index += 1
|
||||||
|
|
||||||
|
row = OrderedDict({k: None for k in self.columns})
|
||||||
|
for isweep, sweep in enumerate(self.sweeps):
|
||||||
|
# If sweep is linked to another one, do nothing as its value
|
||||||
|
# will be added by its parent
|
||||||
|
if sweep.linked_to is not None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Compute the index
|
||||||
|
if sweep.folded:
|
||||||
|
key, value = sweep.key, sweep.values
|
||||||
|
row[key] = value
|
||||||
|
else:
|
||||||
|
idx = int(i/(self.ntot/self.cn[isweep])) % len(sweep)
|
||||||
|
|
||||||
|
# If this sweep has links, add also all values from its
|
||||||
|
# children
|
||||||
|
children = self.links.get(sweep, [])
|
||||||
|
for s in [sweep,] + children:
|
||||||
|
key, value = s[idx]
|
||||||
|
row[key] = value
|
||||||
|
if self.passindex:
|
||||||
|
row['sweep_index'] = i
|
||||||
|
return row
|
||||||
|
else:
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
|
@property
|
||||||
|
def columns(self):
|
||||||
|
cols = [sweep.key for sweep in self.sweeps]
|
||||||
|
if self.passindex:
|
||||||
|
cols.append('sweep_index')
|
||||||
|
return cols
|
||||||
|
|
||||||
|
@property
|
||||||
|
def values(self):
|
||||||
|
return [list(row.values()) for row in self]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def items(self):
|
||||||
|
items = []
|
||||||
|
for row in self:
|
||||||
|
items.append(row)
|
||||||
|
return items
|
||||||
|
|
||||||
|
|
||||||
|
class Looper:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pipeline(self):
|
||||||
|
try:
|
||||||
|
return self._pipeline
|
||||||
|
except AttributeError as error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@pipeline.setter
|
||||||
|
def pipeline(self, value):
|
||||||
|
self._pipeline = value
|
||||||
|
|
||||||
|
def _wrapper(self, x):
|
||||||
|
logger.debug("Pipeline called with {}".format(x))
|
||||||
|
return self.pipeline(**x)
|
||||||
|
|
||||||
|
def run(self, *sweeps, ncpu=1, passindex=False):
|
||||||
|
logger.info("Loop starts...")
|
||||||
|
# prepare the list of inputs
|
||||||
|
sr = SweepRange(*sweeps, passindex=passindex)
|
||||||
|
items = sr.items
|
||||||
|
|
||||||
|
data = []
|
||||||
|
|
||||||
|
if ncpu == 1:
|
||||||
|
# serial processing...
|
||||||
|
logger.info("serial processing...")
|
||||||
|
t0 = time.time()
|
||||||
|
|
||||||
|
for i, values in enumerate(items):
|
||||||
|
result = self._wrapper(values)
|
||||||
|
data.append(result)
|
||||||
|
|
||||||
|
t1 = time.time()
|
||||||
|
dt = t1 - t0
|
||||||
|
logger.info("Processed {:d} sets of inputs in {:.3f} seconds".format(
|
||||||
|
len(sr), dt))
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Parallel processing...
|
||||||
|
chunksize = 1 #int(nsets/ncpu)
|
||||||
|
logger.info(("Parallel processing over {:d} cpu (chunksize={:d})..."
|
||||||
|
"").format(ncpu, chunksize))
|
||||||
|
t0 = time.time()
|
||||||
|
|
||||||
|
pool = mp.Pool(processes=ncpu)
|
||||||
|
data = pool.map(self._wrapper, items, chunksize=chunksize)
|
||||||
|
pool.close()
|
||||||
|
pool.join()
|
||||||
|
|
||||||
|
t1 = time.time()
|
||||||
|
dt = t1 - t0
|
||||||
|
logger.info(("Processed {:d} sets of inputs in {:.3f} seconds"
|
||||||
|
"").format(len(sr), dt))
|
||||||
|
|
||||||
|
# Create the DataFrame
|
||||||
|
dfdata = []
|
||||||
|
columns = sr.columns + ['output',]
|
||||||
|
|
||||||
|
for i in range(len(sr)):
|
||||||
|
row = list(items[i].values())
|
||||||
|
row.append(data[i])
|
||||||
|
dfdata.append(row)
|
||||||
|
|
||||||
|
df = pd.DataFrame(dfdata, columns=columns)
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def bar(**kwargs):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def post_process(data):
|
||||||
|
x = data.x.unique()
|
||||||
|
y = data.y.unique()
|
||||||
|
|
||||||
|
|
||||||
|
theta = Sweep(key='theta', comments="The polar angle",
|
||||||
|
start=-70, stop=70, num=3,
|
||||||
|
unit='degree',
|
||||||
|
folded=True)
|
||||||
|
|
||||||
|
phi = Sweep('phi', comments="The azimutal angle",
|
||||||
|
values=[0, 45],
|
||||||
|
unit='degree',
|
||||||
|
folded=True)
|
||||||
|
|
||||||
|
emitter_plane = Sweep('emitter_plane', comments="The emitter plane",
|
||||||
|
start=0, stop=3, step=1)
|
||||||
|
|
||||||
|
emitter = Sweep(key='emitter', values=('Ti', 'Sr'))
|
||||||
|
|
||||||
|
levels = Sweep(key='level', values=('2p', '3d'), linked_to=emitter)
|
||||||
|
|
||||||
|
lmaxt = Sweep(key='lmaxt', values=(25, 29, 30))#, linked_to=emitter)
|
||||||
|
|
||||||
|
uij = Sweep(key='uij', values=(0.01, 0.02, 0.03))#, linked_to=lmaxt)
|
||||||
|
|
||||||
|
sweeps = [theta, phi, emitter_plane, emitter, levels, lmaxt, uij]
|
||||||
|
|
||||||
|
|
||||||
|
looper = Looper()
|
||||||
|
looper.pipeline = bar
|
||||||
|
data = looper.run(emitter, emitter_plane, uij, theta, levels, ncpu=1,
|
||||||
|
passindex=True)
|
||||||
|
|
||||||
|
print(data)
|
Loading…
Reference in New Issue