mcutils/xray/storage.py

93 lines
2.6 KiB
Python
Raw Normal View History

""" hdf5 file based storage; this modules adds the possibility to dump dict as
hdf5 File """
import numpy as np
import os
import h5py
import collections
import logging as log
log.basicConfig(level=log.INFO)
def dictToH5Group(d,group):
""" helper function that transform (recursively) a dictionary into an
hdf group """
for key,value in d.items():
if not isinstance(value,(dict,collections.OrderedDict)):
# hacks for special s...
# h5py can't handle numpy unicode arrays
if isinstance(value,np.ndarray) and value.dtype.char == "U":
value = np.asarray([vv.encode('ascii') for vv in value])
# h5py can't save None
if value is None: value="NONE_PYTHON_OBJECT"
try:
group[key] = value
except TypeError:
log.error("Can't save %s"%(key))
else:
group.create_group(key)
dictToH5Group(value,group[key])
def dictToH5(h5,d):
""" Save a dictionary into an hdf5 file
h5py is not capable of handling dictionaries natively"""
h5 = h5py.File(h5,mode="w")
# group = h5.create_group("/")
dictToH5Group(d,h5["/"])
h5.close()
def h5dataToDict(h5):
""" Read a hdf5 group into a dictionary """
if isinstance(h5,h5py.Dataset):
temp = h5[...]
# hack for special s...
# unwrap 0d arrays
if isinstance(temp,np.ndarray) and temp.ndim == 0:
temp=temp.item()
# h5py can't handle None
if temp == "NONE_PYTHON_OBJECT": temp=None
# convert back from ascii to unicode
if isinstance(temp,np.ndarray) and temp.dtype.char == "S":
temp = temp.astype(str)
return temp
else:
ret = dict()
for k,v in h5.items(): ret[k] = h5dataToDict(v)
return ret
def h5ToDict(h5):
""" Read a hdf5 file into a dictionary """
with h5py.File(h5,"r") as h:
ret = h5dataToDict( h["/"] )
return ret
def npzToDict(npzFile):
with np.load(npzFile) as npz: d = dict(npz)
# unwrap 0d arrays
for key,value in d.items():
if isinstance(value,np.ndarray) and value.ndim == 0: d[key]=value.item()
return d
def dictToNpz(npzFile,d): np.savez(npzFile,**d)
def read(fname):
extension = os.path.splitext(fname)[1]
log.info("Reading storage file %s"%fname)
if extension == ".npz":
return npzToDict(fname)
elif extension == ".h5":
return h5ToDict(fname)
else:
raise ValueError("Extension must be h5 or npz, it was %s"%extension)
def save(fname,d):
extension = os.path.splitext(fname)[1]
log.info("Saving storage file %s"%fname)
if extension == ".npz":
return dictToNpz(fname,d)
elif extension == ".h5":
return dictToH5(fname,d)
else:
raise ValueError("Extension must be h5 or npz")