mcutils/xray/dataReduction.py

300 lines
11 KiB
Python

from __future__ import print_function,division
import logging
log = logging.getLogger(__name__)
import numpy as np
np.seterr(all='ignore')
from . import utils
from . import storage
import os
def subtractReferences(i,idx_ref, useRatio = False):
""" given data in i (first index is shot num) and the indeces of the
references (idx_ref, array of integers) it interpolates the closest
reference data for each shot and subtracts it (or divides it, depending
on useRatio = [True|False];
Note: it works in place (i.e. it modifies i) """
iref=np.empty_like(i)
idx_ref = np.squeeze(idx_ref)
idx_ref = np.atleast_1d(idx_ref)
if idx_ref.shape[0] == 1:
return i-i[idx_ref]
# references before first ref are "first ref"
iref[:idx_ref[0]] = i[idx_ref[0]]
# references after last ref are "last ref"
iref[idx_ref[-1]:] = i[idx_ref[-1]]
_ref = 0
for _i in range(idx_ref[0],idx_ref[-1]):
idx_ref_before = idx_ref[_ref]
idx_ref_after = idx_ref[_ref+1]
ref_before = i[idx_ref_before]
ref_after = i[idx_ref_after]
weight_before = float(_i-idx_ref_before)/(idx_ref_after-idx_ref_before)
weight_after = 1-weight_before
if weight_after == 1:
iref[_i] = ref_before
elif weight_before == 1:
iref[_i] = ref_after
else:
# normal reference for an on chi, the weighted average
iref[_i] = weight_before*ref_before + weight_after*ref_after
if _i>=idx_ref_after: _ref += 1
log.debug("SubtractRederence For image %d : %d-%d"%(_i,idx_ref_before,idx_ref_after))
if useRatio:
i /= iref
else:
i -= iref
return i
def averageScanPoints(scan,data,isRef=None,lpower=None,useRatio=False,\
funcForAveraging=np.nanmean):
""" given scanpoints in 'scan' and corresponding data in 'data'
average all data corresponding the exactly the same scanpoint.
If the values in scan are coming from a readback, rounding might be
necessary.
No normalization is done inside this function
if isRef is provided must be a boolean array of the same shape as 'scan'
is there is at least one scanpoint marked as True, the data are
subtracted/divided by the interpolated reference
if lpower is provided the data is divided by it (how it is done depends
if one uses the ratio or not
funcForAveraging: is usually np.nanmean or np.nanmedian. it can be any
function that support axis=0 as keyword argument
"""
data = data.astype(np.float)
avData = np.nanmedian( data , axis = 0 )
if isRef is None: isRef = np.zeros( data.shape[0], dtype=bool )
assert data.shape[0] == isRef.shape[0]
# subtract reference only is there is at least one
if isRef.sum()>0:
# create a copy (subtractReferences works in place)
diff = subtractReferences(data.copy(),np.argwhere(isRef), useRatio=useRatio)
avNeg = funcForAveraging(data[isRef],axis=0)
else:
diff = data
avNeg = np.zeros_like(avData)
# normalize signal for laser intensity if provided
if lpower is not None:
lpower = utils.reshapeToBroadcast(lpower,data)
if useRatio is False:
diff /= lpower
else:
diff = (data-1)/lpower+1
scan_pos = np.unique(scan)
shape_out = [len(scan_pos),] + list(diff.shape[1:])
ret = np.empty(shape_out)
err = np.empty(shape_out)
data_abs = np.empty(shape_out)
diffsInScanPoint = []
chi2_0 = []
for i,t in enumerate(scan_pos):
shot_idx = (scan == t)
# select data for the scan point
diff_for_scan = diff[shot_idx]
diffsInScanPoint.append( diff_for_scan )
# calculate average
ret[i] = funcForAveraging(diff_for_scan,axis=0)
data_abs[i] = funcForAveraging(data[shot_idx],axis=0)
# calculate std
noise = np.nanstd(diff[shot_idx], axis = 0)
# calculate chi2 of different repetitions
chi2 = np.power( (diff_for_scan - ret[i])/noise,2)
# sum over all axis but first
for _ in range(diff_for_scan.ndim-1):
chi2 = np.nansum( chi2, axis=-1 )
# store chi2_0
chi2_0.append( chi2/ret[i].size )
# store error of mean
err[i] = noise/np.sqrt(shot_idx.sum())
ret = dict(scan=scan_pos,data=ret,dataUnmasked=ret.copy(),err=err,
errUnmasked=err.copy(),chi2_0=chi2_0,diffsInScanPoint=diffsInScanPoint,
dataAbsAvNeg = avNeg, dataAsAbs=ret+avNeg,
dataAbsAvAll=avData,dataAbsAvScanPoint=data_abs,dataAbs=data.copy())
ret = storage.DataStorage(ret)
return ret
def calcTimeResolvedSignal(scan,data,reference="min",monitor=None,q=None,
saveTxt=True,folder="./",**kw):
"""
reference: can be 'min', 'max', a float|integer or an array of booleans
q : is needed if monitor is a tuple|list
monitor : normalization vector (if it is interpreted a list it is
interpreted as q-range normalization)
saveTxt : will save txt outputfiles (diff_av_*) in folder
other keywords are passed to averageScanPoints
"""
if reference == "min":
isRef = (scan == scan.min())
elif reference == "max":
isRef = (scan == scan.max())
elif isinstance(reference,(float,int)):
isRef = (scan == reference)
else:
isRef = reference
# normalize if needed
if monitor is not None:
if isinstance(monitor,(tuple,list)):
assert q is not None
assert data.ndim == 2
idx = (q>= monitor[0]) & (q<= monitor[1])
monitor = np.nanmedian(data[:,idx],axis=1)
monitor = utils.reshapeToBroadcast(monitor,data)
data = data/monitor
ret = averageScanPoints(scan,data,isRef=isRef,**kw)
if q is not None: ret["q"] = q
return ret
def errorMask(data,threshold=5):
""" Q-by-Q mask !
Very simple but effective mask for zinger like noise
The noise is in general lower when using nanmean instead than
nanmedian but nanmean does not mask out 'spikes'.
This mask mitigate this effect by using nanmedian for the q points
that have an higher than usual error (proxy for spikes ...)
tested with mi1245/dec2016/tiox/tiox1/run3
"""
assert data.data.ndim == 2
idx_mask = []
for iscan in range(len(data.diffsInScanPoint)):
temp = data.diffsInScanPoint[iscan]
# sqrt(len(temp)) = sqrt(numOfDiffs); it is needed to estimate error of single Diff
idx = np.abs(temp-np.median(temp,axis=0)) > threshold*data.err[iscan]*np.sqrt(len(temp))
idx_mask.append( idx )
log.debug("errorMask mask, scanpoint: %s, fraction of q points filtered out (average) %.4e [max %.4e])"%\
(data.scan[iscan],idx.sum()/idx.size,max(np.sum(idx,axis=1)/idx.shape[1])) )
if "masks" not in data: data['masks'] = dict()
data['masks']['error'] = idx_mask
return data
def chi2Mask(data,threshold=2):
"""
The noise is in general lower when using nanmean instead than
nanmedian but nanmean does not mask out 'spikes'.
This mask mitigate this effect by using nanmedian for the q points
that have an higher than usual error (proxy for spikes ...)
tested with mi1245/dec2016/tiox/tiox1/run3
"""
idx_mask = []
for iscan in range(len(data.diffsInScanPoint)):
idx = data.chi2_0[iscan] > threshold
# expand along other axis (q ...)
idx = utils.reshapeToBroadcast(idx,data.diffsInScanPoint[iscan])
idx_mask.append(idx)
log.debug("Chi2 mask, scanpoint: %s, curves filtereout out %d/%d (%.2f%%)"%\
(data.scan[iscan],idx.sum(),len(idx),idx.sum()/len(idx)*100) )
if "masks" not in data: data['masks'] = dict()
data['masks']['chi2'] = idx_mask
return data
def applyMasks(data,which='all',funcForAveraging=np.nanmean):
# don't do anything if no mask is defined
if 'masks' not in data: return data
if which == 'all': which = list(data['masks'].keys())
totmask = []
for iscan in range(len(data.diffsInScanPoint)):
mask = data['masks'][which[0]][iscan]
for w in which[1:]:
mask = np.logical_and(mask,data['masks'][w][iscan])
mask = np.squeeze(mask)
totmask.append(mask); # store for later
# check is a q-by-q mask
if mask.shape == data.diffsInScanPoint[iscan].shape:
temp = np.ma.MaskedArray(data=data.diffsInScanPoint[iscan],mask=mask)
data.data[iscan] = funcForAveraging( temp,axis=0 )
else:
data.data[iscan] = funcForAveraging( data.diffsInScanPoint[iscan][~mask],axis=0)
data['mask'] = totmask
return data
def saveTxt(folder,data,delayToStr=True,basename='auto',info="",**kw):
""" data must be a DataStorage instance """
# folder ends usually with sample/run so use the last two subfolders
if basename == 'auto':
basename = "_".join(folder.rstrip("/").split("/")[-2:]) + "_"
q = data.q if "q" in data else np.arange(data.data.shape[-1])
# save one file with all average diffs
fname = "%s/%sdiff_av_matrix.txt" % (folder,basename)
utils.saveTxt(fname,q,data.data,headerv=data.scan,**kw)
# save error bars in the matrix form
fname = "%s/%sdiff_av_matrix_err.txt" % (folder,basename)
utils.saveTxt(fname,q,data.err,headerv=data.scan,**kw)
for iscan,scan in enumerate(data.scan):
scan = utils.timeToStr(scan) if delayToStr else "%+10.5e" % scan
# try retreiving info on chi2
try:
chi2_0 = data.chi2_0[iscan]
info_delay = [ "# rep_num : chi2_0 , discarded by chi2masking ?", ]
for irep,value in enumerate(chi2_0):
info_delay.append( "# %d : %.3f" % (irep,value))
if 'chi2' in data.masks: info_delay[-1] += " %s"%str(data.masks['chi2'][iscan][irep])
info_delay = "\n".join(info_delay)
if info != '': info_delay = "%s\n%s" % (info,info_delay)
except AttributeError:
info_delay = info
# save one file per timedelay with average diff (and err)
fname = "%s/%sdiff_av_%s.txt" % (folder,basename,scan)
if 'mask' in data:
tosave = np.vstack( (data.data[iscan],data.err[iscan],
data.dataUnmasked[iscan],data.errUnmasked[iscan] ) )
columns = 'q diffmask errmask diffnomask errnomask'.split()
else:
tosave = np.vstack( (data.data[iscan],data.err[iscan] ) )
columns = 'q diff err'.split()
utils.saveTxt(fname,q,tosave,info=info_delay,columns=columns)
# save one file per timedelay with all diffs for given delay
fname = "%s/%sdiffs_%s.txt" % (folder,basename,scan)
utils.saveTxt(fname,q,data.diffsInScanPoint[iscan],info=info_delay,**kw)
def read_diff_av(folder,plot2D=False,save=None):
print("Never tested !!!")
basename = folder+"/"+"diff_av*"
files = glob.glob(basename)
files.sort()
if len(files) == 0:
print("No file found (basename %s)" % basename)
return None
temp = [os.path.basename(f[:-4]) for f in files]
delays = [f.split("_")[-1] for f in temp ]
diffav = collections.OrderedDict()
diffs = collections.OrderedDict()
for d,f in zip(delays,files):
data = np.loadtxt(f)
diffav[d]=data[:,1]
diffs[d] = np.loadtxt(folder+"/diffs_%s.dat"%d)[:,1:]
q =data[:,0]
t = np.asarray( [mc.strToTime(ti) for ti in delays] )
if plot2D:
idx = t>0
i = np.asarray( diffav.values() )
plt.pcolor(np.log10(t[idx]),q,i[idx].T)
plt.xlabel(r"$\log_{10}(t)$")
plt.ylabel(r"q ($\AA^{-1}$)")
it=np.asarray(diffav.values())
if save:
tosave = np.vstack( (q,it) )
header = np.hstack( (len(it),t) )
tosave = np.vstack( (header,tosave.T) )
np.savetxt(folder + "/all_diffs_av_matrix.txt",tosave)
return q,it,diffs,t