dispersiveXanes/xppl37_spectra.py

320 lines
13 KiB
Python
Raw Normal View History

2016-11-25 11:05:19 +01:00
import numpy as np
import matplotlib.pyplot as plt
import copy
import argparse
import collections
import dispersiveXanes_alignment as alignment
import dispersiveXanes_utils as utils
2016-11-25 11:05:19 +01:00
import xanes_analyzeRun
parser = argparse.ArgumentParser(description='Process argv')
parser.add_argument('--run', type=int,default=82,help='which run to analyze')
parser.add_argument('--force', action="store_true",help='force calculation')
args = parser.parse_args()
2016-11-25 11:05:19 +01:00
profile_ret = collections.namedtuple("profile_ret",["run","p1","p2","calibs"])
2016-11-25 11:05:19 +01:00
nice_colors = ["#1b9e77", "#d95f02", "#7570b3"]
gradual_colors = ['#014636', '#016c59', '#02818a', '#3690c0', '#67a9cf', '#a6bddb', '#d0d1e6']#, '#ece2f0']
2016-11-25 11:05:19 +01:00
def calcSpectraForRun(run=82,calibs="all",realign=False,init="auto",alignCalib=0,force=False):
2017-06-07 14:13:59 +02:00
""" Calculate spectra (spec1, spec2) for run based on alignment.
Alignment can be 'forced' with realign=True.
In such a case
init = run for initial alignment
alignCalib = calibcycle for alignment
"""
if init=="auto": init=run
if isinstance(run,int):
r = xanes_analyzeRun.AnalyzeRun(run,initAlign=init)
else:
r = run
if realign:
r.doShots(slice(20),calib=refCalib,doFit=True)
2016-11-25 11:05:19 +01:00
r.saveTransform()
# next line is used to force calculations in case of realignment
fname = 'auto' if not realign else "thisfiledoesnotexists"
if force: fname = "thisfiledoesnotexists"
if len(r.results) == 0:
try:
r.load(fname)
print("Loading previously saved results")
except FileNotFoundError:
r.analyzeScan(calibs=calibs,nImagesToFit=0,nSaveImg=4)
r.save(overwrite=True)
# cannot take the output from r.results because it might have been calculated for
# a bigger range than asked for.
if isinstance(calibs,int):
calibsForOut = (calibs,)
elif isinstance(calibs,slice):
calibsForOut = list(range(r.nCalib))[calibs]
elif calibs == "all":
calibsForOut = list(r.results.keys())
calibsForOut.sort()
else:
calibsForOut = calibs
2017-06-07 14:13:59 +02:00
# focused data have one single calibcycle ...
if len(r.results) > 1:
p1 = [r.results[calib].p1 for calib in calibsForOut]
p2 = [r.results[calib].p2 for calib in calibsForOut]
else:
calib = list(r.results.keys())[0]
idx = r.results[calib].fom < 0.5
p1 = [ r.results[calib].p1[idx], r.results[calib].p1[~idx] ]
p2 = [ r.results[calib].p2[idx], r.results[calib].p2[~idx] ]
return profile_ret( run =r, p1 =p1, p2=p2,calibs=calibsForOut)
2016-11-25 11:05:19 +01:00
def calcSpectraForRefAndSample(run=82,refCalibs=slice(None,None,2),forceSpectraCalculation=False):
""" Function to calculate the Spectra with and without (ref) the sample.
It can analyze two kinds of runs:
* Single run with that alternates IN and OUT (like run 82)
in this case use something like:
calcSpectraForRefAndSample(82,refCalibs=slice(None,None,2)
or
calcSpectraForRefAndSample(82,refCalibs=(0,2,4,6))
* Multiple runs (one with reference and one with sample) like run 155 and 156
in this case use something like:
calcSpectraForRefAndSample( run=(155,156) )
where the first is the reference run and the second the one with the sample.
in this second case the refCalibs does not play a role
use forceSpectraCalculation = True, to re-read the images
"""
if isinstance(run,int):
run = xanes_analyzeRun.AnalyzeRun(run)
if isinstance(refCalibs,slice): refCalibs = list(range(run.nCalib))[refCalibs]
if isinstance(refCalibs,int): refCalibs = [refCalibs,]
sampleCalibs = [c+1 for c in refCalibs]
# need a single call (for sample and ref) to save all calibcycles
2017-06-07 14:13:59 +02:00
data= calcSpectraForRun(run,calibs=refCalibs+sampleCalibs,\
force=forceSpectraCalculation);
2017-06-07 14:13:59 +02:00
# for focused runs
if len(data.run.results) == 1:
ref = profile_ret(run=data.run,p1=[data.p1[0],],p2=[data.p2[0],],calibs=[0,])
sample = profile_ret(run=data.run,p1=[data.p1[1],],p2=[data.p2[1],],calibs=[0,])
else:
ref = calcSpectraForRun(run,calibs=refCalibs)
sample = calcSpectraForRun(run,calibs=sampleCalibs)
elif isinstance(run,(list,tuple)):
refRun = xanes_analyzeRun.AnalyzeRun(run[0])
sampleRun = xanes_analyzeRun.AnalyzeRun(run[1],initAlign=run[0])
refCalibs = [0,]
sampleCalibs = [0,]
ref = calcSpectraForRun(refRun,calibs=refCalibs,\
force=forceSpectraCalculation)
sample = calcSpectraForRun(sampleRun,calibs=sampleCalibs,\
force=forceSpectraCalculation)
return ref,sample
def calcRef(r1,r2,calibs=None,threshold=0.05):
""" r1 and r2 are list of 2d arrays (nShots,nPixels) for each calibcycle """
if not isinstance(r1,(tuple,list)): r1 = (r1,)
if not isinstance(r2,(tuple,list)): r2 = (r2,)
if calibs is None: calibs = list(range(len(r1)))
if not isinstance(calibs,(tuple,list)): calibs = (calibs,)
2016-11-25 11:05:19 +01:00
out = collections.OrderedDict()
out["ratioOfAverage"] = dict()
out["medianOfRatios"] = dict()
for p1,p2,n in zip(r1,r2,calibs):
out["ratioOfAverage"][n] = utils.ratioOfAverage(p1,p2,threshold=threshold)
out["medianOfRatios"][n] = utils.medianRatio(p1,p2,threshold=threshold)
2016-11-25 11:05:19 +01:00
# add curves with all calib together
p1 = np.vstack(r1)
p2 = np.vstack(r2)
n = ",".join(map(str,calibs))
ref1 = utils.ratioOfAverage(p1,p2,threshold=threshold)
ref2 = utils.medianRatio(p1,p2,threshold=threshold)
out["ratioOfAverage"][n] = utils.ratioOfAverage(p1,p2,threshold=threshold)
out["medianOfRatios"][n] = utils.medianRatio(p1,p2,threshold=threshold)
out["ratioOfAverage"]['all'] = out["ratioOfAverage"][n]
out["medianOfRatios"]['all'] = out["medianOfRatios"][n]
return out
2016-11-25 11:05:19 +01:00
def showDifferentRefs(run=82,refCalibs=slice(None,None,2),threshold=0.05):
2016-11-25 11:05:19 +01:00
""" example plots showing how stable are the different ways of taking spectra """
prof = calcSpectraForRun(run,calibs=refCalibs)
refs = calcRef(prof.p1,prof.p2,calibs=prof.calibs)
2016-11-25 11:05:19 +01:00
kind_of_av = list(refs.keys())
fig,ax=plt.subplots(len(kind_of_av)+1,1,sharex=True,sharey=True)
E = prof.run.E
2016-11-25 11:05:19 +01:00
calibs = list(refs[kind_of_av[0]].keys())
for ikind,kind in enumerate(kind_of_av):
for calib in calibs:
if isinstance(calib,int):
ax[ikind].plot(E,refs[kind][calib],label="calib %s"%calib)
else:
if calibs == 'all': continue
2016-11-25 11:05:19 +01:00
ax[ikind].plot(E,refs[kind][calib],label="calib %s"%calib,lw=2,color='k',alpha=0.7)
ax[-1].plot(E,refs[kind][calib],label="calib all, %s"%kind,lw=1.5,color=nice_colors[ikind],alpha=0.8)
for ikind,kind in enumerate(kind_of_av): ax[ikind].set_title("Run %d, %s"%(run,kind))
ax[0].set_ylim(0.88,1.12)
ax[0].set_ylim(0.88,1.12)
ax[-2].legend()
ax[-1].legend()
for a in ax: a.grid()
def calcAbs(ref,sample=None,threshold=0.05,refKind="medianOfRatios",merge_calibs=True):
""" example of use
ratio = calcAbsForRun(82)
ratio = calcAbsForRun( (155,156) )
"""
if sample is None: sample=ref
temp = calcRef(ref.p1,ref.p2,threshold=threshold)
ref = temp[refKind]['all']
if merge_calibs:
p1 = np.vstack( sample.p1 )
p2 = np.vstack( sample.p2 )
p1,p2 = utils.maskLowIntensity(p1,p2,threshold=threshold)
ratio = p2/p1
ratio = ratio/ref
Abs = -np.log10(ratio)
else:
Abs = []
p1 = []
p2 = []
for _p1,_p2 in zip(sample.p1,sample.p2):
_p1,_p2 = utils.maskLowIntensity(_p1,_p2,threshold=threshold)
ratio = _p2/_p1
ratio = ratio/ref
_abs = -np.log10(ratio)
p1.append(_p1)
p2.append(_p2)
Abs.append(_abs)
return p1,p2,Abs
def calcAbsForRun(run=82,refCalibs=slice(None,None,2),threshold=0.05,refKind="medianOfRatios",forceSpectraCalculation=False,merge_calibs=True):
""" example of use
ratio = calcAbsForRun(82)
ratio = calcAbsForRun( (155,156) )
"""
ref,sample = calcSpectraForRefAndSample(run,refCalibs=refCalibs,forceSpectraCalculation=forceSpectraCalculation)
E = ref.run.E
p1,p2,Abs = calcAbs(ref,sample,threshold=threshold,refKind=refKind,merge_calibs=merge_calibs)
return E,p1,p2,Abs
def showSpectra(run=82,shots=slice(5),calibs=0,averageEachCalib=False,
normalization="auto",shifty=1,xlim=(7060,7180),showAv=True):
""" averageEachCalib: if True, plot only one (averaged) spectrum per calibcycle
normalization: if "auto", the max of the spectra that will be plotted will be used
"""
r = xanes_analyzeRun.AnalyzeRun(run=run)
r.load()
calibsSaved = list(r.results.keys()); calibsSaved.sort()
res = [ r.results[c].p2 for c in calibsSaved ]
if isinstance(calibs,slice): res=res[calibs]
if isinstance(calibs,int): res=[res[calibs],]
avCalibs = [ np.nanmedian(spectra,axis=0) for spectra in res ]
if averageEachCalib:
res = avCalibs
showAv = False; # it does not make sense to plot it twice !
fig,ax = plt.subplots(len(res),1,sharex=True,sharey=True,squeeze=False)
if normalization == "auto": normalization = np.nanmax( [temp[shots] for temp in res] )
for (av,spectra,a) in zip(avCalibs,res,ax[:,0]):
spectra_norm = spectra[shots]/normalization
av_norm = av/normalization
for i,spectrum in enumerate(spectra_norm):
color = gradual_colors[i%len(gradual_colors)]
a.axhline(i*shifty,ls='--',color=color)
if showAv: a.fill_between(r.E,i*shifty,av_norm+i*shifty,color='#d95f0e',alpha=0.4)
print(i)
a.plot(r.E,spectrum+i*shifty,color=color,lw=2)
ax[0][0].set_xlim(*xlim)
ax[0][0].set_title("Run %d"%run)
if not averageEachCalib: ax[0][0].set_ylim(0,shifty*(len(spectra_norm)))
color_ss = '#08519c'
color_av = '#238b45'
color_av_all = '#d95f0e'
def showAbs(run=82,shots=slice(5),normalization="auto",shifty=1,xlim=(7080,7180),showAv=True,showAvOverAll=True,smoothWidth=0,threshold=0.01,filterShot=0.1):
""" normalization: if "auto", the max of the spectra that will be plotted will be used
filterShot = means that it filters out the filterShot*100 percentile
"""
E = alignment.defaultE
_,p1,p2,abs = calcAbsForRun(run=run,threshold=threshold)
p1_sum = p1.sum(-1)
if filterShot>0:
idx = p1_sum>np.percentile(p1_sum,filterShot*100)
p1 = p1[idx]
p2 = p2[idx]
abs = abs[idx]
p1_av = np.nanmean(p1,axis=0)
p2_av = np.nanmean(p2,axis=0)
# somehow nanmedian screws up when array is too big ... so using nanmean
abs_av = np.nanmean(abs,axis=0)
p1 = p1[shots]; p2=p2[shots]; abs = abs[shots]
if smoothWidth > 0: abs = smoothSpectra(E,abs,res=smoothWidth)
fig,ax = plt.subplots(1,2,sharex=True,sharey=True,squeeze=False)
ax = ax[0]
if normalization == "auto": normalization = np.nanmax( p1 )
for ishot,(s1,s2,a) in enumerate(zip(p1,p2,abs)):
s1_norm = s1/normalization
s2_norm = s2/normalization
color = gradual_colors[ishot%len(gradual_colors)]
ax[0].axhline(ishot*shifty,ls='--',color=color)
ax[1].axhline(ishot*shifty,ls='--',color=color)
if showAvOverAll:
if ishot == 0: ax[0].fill_between(E,ishot*shifty,p1_av/normalization+ishot*shifty,color=color_av_all,alpha=0.6)
ax[1].plot(E,abs_av+ishot*shifty,color=color_av_all,lw=2,zorder=20)
if showAv:
ax[1].plot(E,np.nanmedian(abs,0)+ishot*shifty,color=color_av,lw=2,zorder=10)
ax[0].plot(E,s1_norm+ishot*shifty,ls = '-' ,color='0.8',lw=2)
ax[0].plot(E,s2_norm+ishot*shifty,ls = '-' ,color='0.3',lw=2)
ax[1].plot(E,a+ishot*shifty,color=color_ss,lw=2)
ax[0].set_xlim(*xlim)
ax[0].set_title("Run %s"%str(run))
ax[1].set_ylabel("Sample Absorption")
ax[0].set_ylabel("Normalized Spectra")
ax[0].set_ylim(0,shifty*(p1.shape[0]))
print("STD of (average over shown shots) - (average over all): %.3f"%np.nanstd(np.nanmedian(abs,0)-abs_av))
ax[1].set_title("STD of (average_{shown}) - (average_{all}): %.3f"%np.nanstd(np.nanmedian(abs,0)-abs_av))
def showAbsWithSweep(run=(155,156),first=0,period=150,nSpectra=10,**kwards):
shots = slice(first,first+period,int(period/nSpectra))
showAbs(run=run,shots=shots,**kwards)
def smoothSpectra(E,abs_spectra,res=0.5,skip_close=5):
from scipy import integrate
if isinstance(abs_spectra,np.ma.MaskedArray): abs_spectra = abs_spectra.filled()
if abs_spectra.ndim == 1: abs_spectra=abs_spectra[np.newaxis,:]
out = np.empty_like(abs_spectra)
for ispectrum,spectrum in enumerate(abs_spectra):
idx = np.isfinite(spectrum)
Eclean = E[idx]
spectrum_clean = spectrum[idx]
for i in range(len(E)):
g = 1/np.sqrt(2*np.pi)/res*np.exp(-(Eclean-E[i])**2/2/res**2)
tointegrate = g*spectrum_clean
out[ispectrum,i] = integrate.simps(tointegrate,x=Eclean)
out[ispectrum][~idx]=np.nan
temp = out[ispectrum].copy()
for i,o in enumerate(temp):
if np.any( np.isnan(temp[i-skip_close:i+skip_close] ) ): out[ispectrum][i] = np.nan
return out
def doLongCalc():
#calcSpectraForRefAndSample(82,forceSpectraCalculation=True)
# scanning requires a lower level call
r = xanes_analyzeRun.AnalyzeRun(84)
r.analyzeScan(nShotsPerCalib="all",calibs="all",nSaveImg=2,calibsToFit='even',nImagesToFit=3)
r.save(overwrite=True)
#calcSpectraForRefAndSample(84,forceSpectraCalculation=False)
calcSpectraForRefAndSample(96,forceSpectraCalculation=True)
calcSpectraForRefAndSample((155,156),forceSpectraCalculation=True)
#r = calcSpectra(run,refCalibs=refCalib,force=force)
2016-11-25 11:05:19 +01:00
if __name__ == "__main__":
pass
#main(args.run,force=args.force)