From 327d2825c63674d25eacbb8f33905413f8c83189 Mon Sep 17 00:00:00 2001 From: Marco Cammarata Date: Thu, 1 Dec 2016 15:26:31 +0100 Subject: [PATCH] cleaning up the code, creating utils module --- alignment.py | 648 --------------------------------------------------- 1 file changed, 648 deletions(-) delete mode 100644 alignment.py diff --git a/alignment.py b/alignment.py deleted file mode 100644 index 30f0bea..0000000 --- a/alignment.py +++ /dev/null @@ -1,648 +0,0 @@ -from __future__ import print_function,division -from skimage import transform as tf -from scipy.ndimage import gaussian_filter1d as g1d -import mcutils as mc -import joblib -import collections -import numpy as np -import matplotlib.pyplot as plt -import time - -# /--------\ -# | | -# | UTILS | -# | | -# \--------/ - -#defaultE = np.arange(1024)*0.12+7060 -defaultE = (np.arange(1024)-512)*0.189+7123 -defaultE = 7063.5+0.1295*np.arange(1024); # done on nov 30 2016, based on xppl3716:r77 - -__i = np.arange(1024) -__x = (__i-512)/512 - -g_fit_default_kw = dict( - intensity = 1., - error_intensity = 0.02, - transx = 0, - error_transx = 3, - limit_transx = ( -400,400 ), - transy = 0, - error_transy = 3, - limit_transy = ( -50,50 ), - rotation = 0.00, - error_rotation = 0.005, - limit_rotation = (-0.06,0.06), - scalex = 1, - limit_scalex = (0.4,1.2), - error_scalex = 0.05, - scaley = 1, - error_scaley = 0.05, - limit_scaley = (0.8,1.2), - shear = 0.00, - error_shear = 0.001, - limit_shear = (-0.2,0.2), - igauss1cen = 512, - error_igauss1cen = 2., - fix_igauss1cen = True, - igauss1sig = 4000., - error_igauss1sig = 2., - fix_igauss1sig = True, - igauss2cen = 512, - error_igauss2cen = 2., - fix_igauss2cen = True, - igauss2sig = 4000., - error_igauss2sig = 2., - fix_igauss2sig = True, - iblur1 = 0, - limit_iblur1 = (0,20), - error_iblur1 = 0.02, - fix_iblur1 = True, - iblur2 = 0, - limit_iblur2 = (0,20), - error_iblur2 = 0.02, - fix_iblur2 = True - -) - - -def rebin1D(a,shape): - n0 = a.shape[0]//shape - sh = shape,n0 - return a[:n0*shape].reshape(sh).mean(1) - -cmap = plt.cm.viridis if hasattr(plt.cm,"viridis") else plt.cm.gray -kw_2dplot = dict( - interpolation = "none", - aspect = "auto", - cmap = cmap - ) - -fit_ret = collections.namedtuple("fit_ret",["init_pars","final_pars",\ - "final_transform1","final_transform2","im1","im2","E","p1","p1_sum","p2","p2_sum","fom","ratio","tneeded"] ) - -def calcFOM(p1,p2,ratio): - idx = ( p1>p1.max()/10 )# & (p2>p2.max()/10) - ratio = ratio[idx] - return ratio.std()/np.abs(ratio.mean()) - -def subtractBkg(imgs,nPix=100,bkg_type="line"): - if imgs.ndim == 2: imgs = imgs[np.newaxis,:] - imgs = imgs.astype(np.float) - if bkg_type == "line": - bkg = np.median(imgs[:,:nPix,:],axis=1) - imgs = imgs-bkg[:,np.newaxis,:] - elif bkg_type == "corner": - q1 = imgs[:,:nPix,:nPix].mean(-1).mean(-1) - imgs[:,:512,:512]-=q1[:,np.newaxis,np.newaxis] - q2 = imgs[:,:nPix,-nPix:].mean(-1).mean(-1) - imgs[:,:512,-512:]-=q2[:,np.newaxis,np.newaxis] - q3 = imgs[:,-nPix:,-nPix:].mean(-1).mean(-1) - imgs[:,-512:,-512:]-=q3[:,np.newaxis,np.newaxis] - q4 = imgs[:,-nPix:,:nPix].mean(-1).mean(-1) - imgs[:,-512:,:512]-=q4[:,np.newaxis,np.newaxis] - elif bkg_type is None: - if imgs.ndim == 2: imgs = imgs[np.newaxis,:].astype(np.float) - else: - print("Background subtraction '%s' Not impleted"%bkg_type) - return imgs - -def getCenterOfMax(img,axis=0,threshold=0.05): - img = img.copy() - img[imgp1.max()/10.) - ax[5].plot(E[idx],pr[idx]) - if res is not None: - ax[5].set_title("FOM: %.2f"%res.fom) - else: - ax[5].set_title("FOM: %.2f"% calcFOM(p1,p2,pr)) - if (save is not None) and (save is not False): plt.savefig(save,transparent=True,dpi=500) - return fig - -def plotRatios(r,shot='random',fig=None,E=defaultE,save=None): - if fig is None: fig = plt.subplots(2,1,sharex=True)[0] - ax = fig.axes - n = r.shape[0] - i = ax[0].imshow(r,extent=(E[0],E[-1],0,n),**kw_2dplot) - i.set_clim(0,1.2) - if shot == 'random' : shot = np.random.random_integers(0,n-1) - ax[1].plot(E,r[shot],label="Shot n %d"%shot) - ax[1].plot(E,np.nanmedian(r[:10],axis=0),label="median 10 shots") - ax[1].plot(E,np.nanmedian(r,axis=0),label="all shots") - ax[1].legend() - ax[1].set_ylim(0,1.5) - ax[1].set_xlabel("Energy") - ax[1].set_ylabel("Transmission") - ax[0].set_ylabel("Shot num") - if (save is not None) and (save is not False): plt.savefig(save,transparent=True,dpi=500) - -def plotSingleShots(r,nShots=10,fig=None,E=defaultE,save=None,ErangeForStd=(7090,7150)): - if fig is None: fig = plt.subplots(2,1,sharex=True)[0] - ax = fig.axes - for i in range(nShots): - ax[0].plot(E,r[i]+i) - ax[0].set_ylim(0,nShots+0.5) - av = (1,3,10,30,100) - good = np.nanmedian(r,0) - for i,a in enumerate(av): - m = np.nanmedian(r[:a],0) - idx = (E>ErangeForStd[0]) & (E0): i = g1d(i,iblur[0],axis=1) - if (iblur[1] is not None) and (iblur[1]>0): i = g1d(i,iblur[1],axis=0) - if igauss is not None: - if isinstance(igauss,(int,float)): - igauss = (__i[-1]/2,igauss); # if one parameter only, assume it is centered on image - g = mc.gaussian(__i,x0=igauss[0],sig=igauss[1],normalize=False) - i *= g - if show: plotShot(img,i) - return i - - -class SpecrometerTransformation(object): - def __init__(self,translation=(0,0),scale=(1,1),rotation=0,shear=0, - intensity=1,igauss=None,iblur=None): - self.affineTransform = getTransform(translation=translation, - scale = scale,rotation=rotation,shear=shear) - self.intensity=intensity - self.igauss = igauss - self.iblur = iblur - - def update(self,**kw): - # current transformation, necessary because skimage transformation do not support - # setting of attributes - names = ["translation","scale","rotation","shear"] - trans_dict = dict( [(n,getattr(self.affineTransform,n)) for n in names] ) - for k,v in kw.items(): - if hasattr(self,k): - setattr(self,k,v) - elif k in names: - trans_dict[k] = v - self.affineTransform = getTransform(**trans_dict) - - def transformImage(self,img,orderRotation=1,show=False): - return transformImage(img,self.affineTransform,iblur=self.iblur,\ - intensity=self.intensity,igauss=self.igauss,orderRotation=orderRotation,\ - show=show) - -def saveAlignment(fname,transform,roi1,roi2): - np.save(fname, dict( transform = transform, roi1=roi1, roi2 = roi2 ) ) - -def loadAlignment(fname): - return np.load(fname).item() - -def getBestTransform(results): - # try to see if it is unravelled - if not hasattr(results,"_fields"): results = unravel_results(results) - # find best based on FOM - idx = np.nanargmin(np.abs(results.fom)) - parnames = results.init_pars.keys() - bestPars = dict() - for n in parnames: - bestPars[n] = results.final_pars[n][idx] - if n.find("limit_") == 0: - if bestPars[n][0] is None: - bestPars[n] = None - else: - bestPars[n] = tuple(bestPars[n]) - return bestPars,np.nanmin(np.abs(results.fom)) - - - - - -def unravel_results(res,nSaveImg='all'): - final_pars = dict() - # res[0].fit_result is a list if we are trying to unravel retult from getShots - if isinstance(res[0].init_pars,list): - parnames = res[0].init_pars[0].get_keys() - else: - parnames = res[0].init_pars.keys() - final_pars = dict() - init_pars = dict() - for n in parnames: - if n.find("limit_") == 0: - final_pars[n] = np.vstack( [r.final_pars[n] for r in res]) - init_pars[n] = np.vstack( [r.init_pars[n] for r in res] ) - else: - final_pars[n] = np.hstack( [r.final_pars[n] for r in res]) - init_pars[n] = np.hstack( [r.init_pars[n] for r in res] ) - im1 = np.asarray( [r.im1 for r in res if r.im1.shape[0] != 0]) - im2 = np.asarray( [r.im2 for r in res if r.im2.shape[0] != 0]) - if nSaveImg != 'all': - im1 = im1[:nSaveImg].copy(); # copy is necessary to free the original memory - im2 = im2[:nSaveImg].copy(); # of original array - return fit_ret( - init_pars = init_pars, - final_pars = final_pars, - final_transform1 = [r.final_transform1 for r in res], - final_transform2 = [r.final_transform2 for r in res], - im1 = im1, - im2 = im2, - E = defaultE, - p1 = np.vstack( [r.p1 for r in res]), - p1_sum = np.hstack( [r.p1_sum for r in res]), - p2 = np.vstack( [r.p2 for r in res]), - p2_sum = np.hstack( [r.p2_sum for r in res]), - ratio = np.vstack( [r.ratio for r in res]), - fom = np.hstack( [r.fom for r in res] ), - tneeded = np.hstack( [r.tneeded for r in res]) - ) - - -def getTransform(translation=(0,0),scale=(1,1),rotation=0,shear=0): - t= tf.AffineTransform(scale=scale,rotation=rotation,shear=shear,\ - translation=translation) - return t - -def findTransform(p1,p2,ttype="affine"): - return tf.estimate_transform(ttype,p1,p2) - - -#__i = np.arange(2**12)/2**11 -#def _transformToIntensitylDependence(transform): -# c = 1. -# if hasattr(transform,"i_a"): -# c += transform.i_a*_i -# return c - - - -# /--------\ -# | | -# | FIT | -# | | -# \--------/ - - -def transformIminuit(im1,im2,init_transform=dict(),show=False,verbose=True,zeroThreshold=0.05,doFit=True): - import iminuit - assert im1.dtype == im2.dtype - t0 = time.time() - # create local copy we can mess up with - im1_toFit = im1.copy() - im2_toFit = im2.copy() - - # set anything below the 5% of the max to zero (one of the two opal is noisy) - im1_toFit[im1_toFitnp.nanmax(p1)/10. - fom = calcFOM(p1,p2,r) - return fit_ret( - init_pars = init_params, - final_pars = final_params, - final_transform1 = t1, - final_transform2 = t2, - im1 = i1, - im2 = i2, - E = defaultE, - p1 = p1, - p1_sum = p1.sum(), - p2 = p2, - p2_sum = p2.sum(), - ratio = r, - fom = fom, - tneeded = time.time()-t0 - ) - - -# /--------\ -# | | -# | MANUAL | -# | ALIGN | -# | | -# \--------/ -class GuiAlignment(object): - def __init__(self,im1,im2,autostart=False): - self.im1 = im1 - self.im2 = im2 - self.f,self.ax=plt.subplots(1,2) - self.ax[0].imshow(im1,aspect="auto") - self.ax[1].imshow(im2,aspect="auto") - self.transform = None - if autostart: - return self.start() - else: - print("Zoom first then use the .start method") - - def OnClick(self,event): - if event.button == 1: - if self._count % 2 == 0: - self.im1_p.append( (event.xdata,event.ydata) ) - print("Added",event.xdata,event.ydata,"to im1") - else: - self.im2_p.append( (event.xdata,event.ydata) ) - print("Added",event.xdata,event.ydata,"to im2") - self._count += 1 - elif event.button == 2: - # neglect middle click - return - elif event.button == 3: - self.done = True - return - else: - return - - def start(self): - self._nP = 0 - self._count = 0 - self.im1_p = [] - self.im2_p = [] - self.done = False - cid_up = self.f.canvas.mpl_connect('button_press_event', self.OnClick) - print("Press right button to finish") - while not self.done: - if self._count % 2 == 0: - print("Select point %d for left image"%self._nP) - else: - print("Select point %d for right image"%self._nP) - self._nP = self._count //2 - plt.waitforbuttonpress() - self.im1_p = np.asarray(self.im1_p) - self.im2_p = np.asarray(self.im2_p) - self.transform = findTransform(self.im1_p,self.im2_p) - self.transform.intensity = 1. - return self.transform - - def show(self): - if self.transform is None: - print("Do the alignment first (with .start()") - return - # just to save some tipying - im1 = self.im1; im2 = self.im2 - im1_new = transformImage(im1,self.transform) - f,ax=plt.subplots(1,3) - ax[0].imshow(im1,aspect="auto") - ax[1].imshow(im1_new,aspect="auto") - ax[2].imshow(im2,aspect="auto") - - def save(self,fname): - if self.transform is None: - print("Do the alignment first (with .start()") - return - else: - np.save(fname,self.transform) - - def load(self,fname): - self.transform = np.load(fname).item() - - -def getAverageTransformation(out): - res = append_results(out) - # get average parameters - tx = np.median(res["transx"]) - ty = np.median(res["transy"]) - sx = np.median(res["scalex"]) - sy = np.median(res["scaley"]) - r = np.median(res["rotation"]) - sh = np.median(res["shear"]) - inten = np.median(res["intensity"]) - t = getTransform( (tx,ty),(sx,sy),r,sh,intensity=inten ) - return t - -def checkAverageTransformation(out,imgs1): - t,inten = getAverageTransformation(out) - res["ratio_av"] = [] - for shot,values in out.items(): - i = transformImage(imgs1[shot],t) - p1 = np.nansum(i,axis=0) - r = p1/values.p2 - res["ratio_av"].append( r ) - res["ratio_av"] = np.asarray(res["ratio_av"]) - return res - - -g_lastpars = None - -def clearCache(): - globals()["g_lastpars"]=None - -def doShot(i1,i2,init_pars,doFit=True,show=False): - #if g_lastpars is not None: init_pars = g_lastpars - r = transformIminuit(i1,i2,init_pars,show=show,verbose=False,doFit=doFit) - return r - - -def doShots(imgs1,imgs2,initpars,nJobs=16,doFit=False,returnBestTransform=False,nSaveImg='all'): - clearCache() - N = imgs1.shape[0] - pool = joblib.Parallel(backend="threading",n_jobs=nJobs) \ - (joblib.delayed(doShot)(imgs1[i],imgs2[i],initpars,doFit=doFit) for i in range(N)) - res = unravel_results(pool,nSaveImg=nSaveImg) - if returnBestTransform: - bestT,fom = getBestTransform(res) - print("FOM for best alignment %.2f"%fom) - return res,bestT - else: - return res -# out = collections.OrderedDict( enumerate(pool) ) -# return out - -# /--------\ -# | | -# | TEST | -# | ALIGN | -# | | -# \--------/ - - - -def testDiling(N=100,roi=slice(350,680),doGUIalignment=False,nJobs=4,useIminuit=True): - import xppll37_mc - globals()["g_lastpars"]=None - d = xppll37_mc.readDilingDataset() - im1 = xppll37_mc.subtractBkg(d.opal1[:N])[:,roi,:] - im2 = xppll37_mc.subtractBkg(d.opal2[:N])[:,roi,:] - N = im1.shape[0]; # redefie N in case it reads less than N - # to manual alignment - if doGUIalignment: - a = GuiAlignment(im1[0],im2[0]) - input("Ok to start") - init_pars = a.start() - np.save("gui_align_transform.npy",init_pars) - else: - init_pars = np.load("gui_align_transform.npy").item() - pool = joblib.Parallel(backend="threading",n_jobs=nJobs,verbose=20) \ - (joblib.delayed(doShot)(im1[i],im2[i],init_pars,useIminuit=useIminuit) for i in range(N)) - out = collections.OrderedDict( enumerate(pool) ) - return out - -if __name__ == '__main__': - pass