import os import sys import numpy as np np.warnings.simplefilter('ignore') import time import matplotlib.pyplot as plt import h5py import collections import re from x3py import x3py import dispersiveXanes_alignment as alignment import mcutils as mc cmap = plt.cm.viridis if hasattr(plt.cm,"viridis") else plt.cm.gray kw_2dplot = dict( interpolation = "none", aspect = "auto", cmap = cmap ) g_exp = "mecl3616" g_exp = "xppl3716" g_bml = g_exp[:3] x3py.config.updateBeamline(g_bml) g_folder_init = g_exp+"_init_pars/" g_folder_out = g_exp+"_output/" g_folder_data = "/reg/d/psdm/"+g_bml+"/"+ g_exp +"/hdf5/" import socket hostname = socket.gethostname() if hostname == "x1": g_folder_data = "/home/marco/temp" if hostname == "apcluster0": g_folder_data = "/data/marcoc/singleShotXanes/"+ g_exp +"/hdf5/" # set defaults based on experiment if g_bml == "xpp": g_roi_height = 200 g_swapx = False g_swapy = False else: g_roi_height = 100 g_swapx = True g_swapy = False print("Working on experiment",g_exp,"(beamline %s)"%g_bml) print(" folder data →",g_folder_data) print(" folder init_pars →",g_folder_init) print(" folder outout →",g_folder_out) #g_folder = "/reg/d/psdm/xpp/xppl3716/ftc/hdf5/" def readDataset(fnameOrRun=7, force=False, doBkgSub=False): if isinstance(fnameOrRun,str) and (fnameOrRun[-3:]=="npz"): d = x3py.toolsVarious.DropObject() temp = np.load(fnameOrRun) spec1 = temp["spec1"] spec2 = temp["spec2"] nS = spec1.shape[0] d.spec1 = x3py.toolsDetectors.wrapArray("spec1",spec1,time=np.arange(nS)) d.spec2 = x3py.toolsDetectors.wrapArray("spec2",spec2,time=np.arange(nS)) else: if isinstance(fnameOrRun,int): fnameOrRun=g_folder_data+"/"+g_exp+"-r%04d.h5" % fnameOrRun d = x3py.Dataset(fnameOrRun,detectors=["opal0","opal1","fee_spec","opal2","ebeam"]) if g_bml == "xpp": d.spec1 = d.opal0 d.spec2 = d.opal1 else: d.spec1 = d.fee_spec d.spec2 = d.opal2 if not hasattr(d,"scan"): d.scan = x3py.toolsVarious.DropObject() d.scan.scanmotor0_values = [0,] return d def getCenter(img,axis=0,threshold=0.05): img = img.copy() img[img 0: ret,bestTransf = self.doShots(shots=shots[:nToFit],calib=calib,doFit=True,\ initpars=initpars,nSaveImg=nSaveImg,returnBestTransform=True); initpars = bestTransf; self.initAlign=bestTransf if nToFit < len(shots): ret2 = self.doShots(shots[nToFit:],calib=calib,initpars=initpars, doFit=False,nSaveImg=0) if ret is None: ret = ret2 else: ret = alignment.unravel_results( (ret,ret2) ) #print("Memory available 3",x3py.toolsOS.memAvailable()) self.results[calib] = ret #print("Memory available 4",x3py.toolsOS.memAvailable()) print("Calib cycle %d/%d -> %.3f (best FOM: %.2f)" % (ic,nC,self.scanpos[ic],np.nanmin(ret.fom))) return [self.results[c] for c in calibs] def save(self,fname="auto",overwrite=False): if len(self.results) == 0: print("self.results are empty, returning without saving") return if not os.path.isdir(g_folder_out): os.makedirs(g_folder_out) if fname == "auto": fname = g_folder_out+"/run%04d_analysis.npz" % self.run if os.path.exists(fname) and not overwrite: print("File %s exists, **NOT** saving, use overwrite=True is you want ..."%fname) return h = dict() h["roi1"] = (self.roi1.start,self.roi1.stop) h["roi2"] = (self.roi2.start,self.roi2.stop) if hasattr(self.data.scan,"scanmotor0"): h["scanmot0"] = self.data.scan.scanmotor0 else: h["scanmot0"] = 'notascan' h["scanpos0"] = self.data.scan.scanmotor0_values if hasattr(self.data.scan,"scanmotor1"): h["scanmot1"] = self.data.scan.scanmotor1 h["scanpos1"] = self.data.scan.scanmotor1_values h["results"] = self.results h["E"] = self.E np.savez(fname,**h) #h["transform"] = self.initAlign def load(self,fname="auto"): if fname == "auto": fname = g_folder_out+"/run%04d_analysis.npz" % self.run temp = np.load(fname) self.results = temp["results"].item() temp.close() def _auto_transform_name(self,run=None,calib=None): if run is None: run = self.run fname = g_folder_init+"/run%04d_transform" % run if calib is not None: fname = fname + "_c%03d" % calib return fname + ".npy" def saveTransform(self,fname="auto",calib=None,transform=None): if transform is None: transform = self.initAlign if fname == "auto": fname = self._auto_transform_name(calib=calib) print("Saving roi and transformation parameter to %s"%fname) alignment.saveAlignment(fname,self.initAlign,self.roi1,self.roi2,self.swap) def loadTransform(self,fname="auto", calib=None): if isinstance(fname,dict): raise FileNotFoundError if fname == "auto": fname = self._auto_transform_name(calib=calib) if isinstance(fname,int): fname = g_folder_init+"/run%04d_transform.npy" % fname if not os.path.exists(fname): print("Asked to read %s, but it does not exist"%fname) temp = np.load(fname).item() self.initAlign = temp["transform"] self.roi1 = temp["roi1"] self.roi2 = temp["roi2"] if 'swap' in temp: self.swap=temp['swap'] print("init transform and ROIs from %s"%fname) def clearCache(self): del self.roi1 del self.roi2 alignment.clearCache(); # nedded for multiprocessing can leave bad parameters in the cache def setDefaultTransform( self ): #dict( scalex=0.65,rotation=0.0,transx=90, iblur1=4.3,fix_iblur1=False ) t = alignment.g_fit_default_kw self.initAlign = t return t def quick_mec(run,ref=236,divideByRef=False,returnRes=False): """ useful to analyze the runs around 140 (done with the focusing """ ref_run = 236 h=h5py.File("mecl3616_output/run%04d_analysis.h5" %ref,"r") ref = np.nanmean(h["calibNone"]["ratio"][...],axis=0) r = AnalyzeRun(run,initAlign=ref,swapx=True,swapy=False) res=r.doShots(slice(5),doFit=False) ret = res["ratio"]/ref if divideByRef else res["ratio"] if returnRes: return ret,res else: return ret def quickAndDirty(run,nShots=300,returnAll=True,doFit=False): """ useful to analyze the runs around 140 (done with the focusing """ r = AnalyzeRun(run,swap=True,initAlign=g_folder_init+"/run0144_transform.npy") res=r.doShots(slice(nShots),doFit=doFit) o = alignment.unravel_results(res) ref = np.nanmedian(o["ratio"][:40],0) sam = np.nanmedian(o["ratio"][50:],0) if returnAll: return sam/ref,o["ratio"]/ref else: return sam/ref