mcutils/xray/dataReduction.py

155 lines
5.2 KiB
Python

from __future__ import print_function,division
import logging as log
log.basicConfig(level=log.INFO)
import numpy as np
np.seterr(all='ignore')
import os
def subtractReferences(i,idx_ref, useRatio = False):
""" given data in i (first index is shot num) and the indeces of the
references (idx_ref, array of integers) it interpolates the closest
reference data for each shot and subtracts it (or divides it, depending
on useRatio = [True|False]; """
iref=np.empty_like(i)
idx_ref = np.squeeze(idx_ref)
idx_ref = np.atleast_1d(idx_ref)
if idx_ref.shape[0] == 1:
return i-i[idx_ref]
# references before first ref are "first ref"
iref[:idx_ref[0]] = i[idx_ref[0]]
# references after last ref are "last ref"
iref[idx_ref[-1]:] = i[idx_ref[-1]]
_ref = 0
for _i in range(idx_ref[0],idx_ref[-1]):
idx_ref_before = idx_ref[_ref]
idx_ref_after = idx_ref[_ref+1]
ref_before = i[idx_ref_before]
ref_after = i[idx_ref_after]
weight_before = float(_i-idx_ref_before)/(idx_ref_after-idx_ref_before)
weight_after = 1-weight_before
if weight_after == 1:
iref[_i] = ref_before
elif weight_before == 1:
iref[_i] = ref_after
else:
# normal reference for an on chi, the weighted average
iref[_i] = weight_before*ref_before + weight_after*ref_after
if _i>=idx_ref_after: _ref += 1
if useRatio:
i /= iref
else:
i -= iref
return i
def averageScanPoints(scan,data,isRef=None,lpower=None,useRatio=False,\
funcForEveraging=np.nanmean):
""" given scanpoints in 'scan' and corresponding data in 'data'
average all data corresponding the exactly the same scanpoint.
If the values in scan are coming from a readback, rounding might be
necessary.
No normalization is done inside this function
if isRef is provided must be a boolean array of the same shape as 'scan'
is there is at least one scanpoint marked as True, the data are
subtracted/divided by the interpolated reference
if lpower is provided the data is divided by it (how it is done depends
if one uses the ratio or not
funcForEveraging: is usually np.nanmean or np.nanmedian. it can be any
function that support axis=0 as keyword argument
"""
if isRef is None: isRef = np.zeros( data.shape[0], dtype=bool )
assert data.shape[0] == isRef.shape[0]
# subtract reference only is there is at least one
if isRef.sum()>0:
data = subtractReferences(data,np.argwhere(isRef), useRatio=useRatio)
else:
data = data.copy(); # create local copy
# normalize signal for laser intensity if provided
if lpower is not None:
assert lpower.shape[0] == data.shape[0]
# expand lpower to allow broadcasting
shape = [data.shape[0],] + [1,]*(data.ndim-1)
lpower = lpower.reshape(shape)
if useRatio is False:
data /= lpower
else:
data = (data-1)/lpower+1
scan_pos = np.unique(scan)
shape_out = [len(scan_pos),] + list(data.shape[1:])
ret = np.empty(shape_out)
err = np.empty(shape_out)
dataInScanPoint = []
for i,t in enumerate(scan_pos):
shot_idx = (scan == t)
#if shot_idx.sum() > 0:
ret[i] = funcForEveraging(data[shot_idx],axis=0)
dataInScanPoint.append( data[shot_idx] )
err[i] = np.std(data[shot_idx], axis = 0)/np.sqrt(shot_idx.sum())
return dict(scan=scan_pos,data=ret,err=err,dataInScanPoint=dataInScanPoint)
def calcTimeResolvedSignal(scan,data,reference="min",monitor=None,q=None,**kw):
"""
reference: can be 'min', 'max', a float|integer or an array of booleans
q : is needed if monitor is a tuple|list (it is interpreted as
q-range normalization)
other keywords are passed to averageScanPoints
"""
if reference == "min":
isRef = (scan == scan.min())
elif reference == "max":
isRef = (scan == scan.max())
elif isinstance(reference,(float,int)):
isRef = (scan == reference)
else:
isRef = reference
# normalize if needed
if monitor is not None:
if isinstance(monitor,(tuple,list)):
assert q is not None
assert data.ndim == 2
idx = (q>= monitor[0]) & (q<= monitor[1])
monitor = np.nanmedian(data[:,idx],axis=1)
data = data/monitor
return averageScanPoints(scan,data,isRef=isRef,**kw)
def read_diff_av(folder,plot2D=False,save=None):
print("Never tested !!!")
basename = folder+"/"+"diff_av*"
files = glob.glob(basename)
files.sort()
if len(files) == 0:
print("No file found (basename %s)" % basename)
return None
temp = [os.path.basename(f[:-4]) for f in files]
delays = [f.split("_")[-1] for f in temp ]
diffav = collections.OrderedDict()
diffs = collections.OrderedDict()
for d,f in zip(delays,files):
data = np.loadtxt(f)
diffav[d]=data[:,1]
diffs[d] = np.loadtxt(folder+"/diffs_%s.dat"%d)[:,1:]
q =data[:,0]
t = np.asarray( [mc.strToTime(ti) for ti in delays] )
if plot2D:
idx = t>0
i = np.asarray( diffav.values() )
plt.pcolor(np.log10(t[idx]),q,i[idx].T)
plt.xlabel(r"$\log_{10}(t)$")
plt.ylabel(r"q ($\AA^{-1}$)")
it=np.asarray(diffav.values())
if save:
tosave = np.vstack( (q,it) )
header = np.hstack( (len(it),t) )
tosave = np.vstack( (header,tosave.T) )
np.savetxt(folder + "/all_diffs_av_matrix.txt",tosave)
return q,it,diffs,t