""" npy/npz/hdf5 file based storage; this modules adds the possibility to dump and load objects in files and a more convenient was of accessing the data via the .attributedict thanks to the DataStorage class """ import numpy as np import os import h5py import collections import logging log = logging.getLogger(__name__) # __name__ is "foo.bar" here def unwrapArray(a,recursive=True,readH5pyDataset=True): """ This function takes an object (like a dictionary) and recursively unwraps it solving many issues like the fact that many objects are packaged as 0d array This funciton has also some specific hack for handling h5py limit to handle for example the None object or the numpy unicode ... """ # is h5py dataset convert to array if isinstance(a,h5py.Dataset) and readH5pyDataset: a = a[...] if isinstance(a,h5py.Dataset) and a.shape == (): a = a[...] if isinstance(a,np.ndarray) and a.ndim == 0 : a = a.item() if isinstance(a,np.ndarray) and a.dtype.char == "S": a = a.astype(str) if recursive: if "items" in dir(a): # dict, h5py groups, npz file a = dict(a); # convert to dict, otherwise can't asssign values for key,value in a.items(): a[key] = unwrapArray(value) elif isinstance(a,list): for index in range(len(a)): a[index] = unwrapArray(a[index]) else: pass if isinstance(a,dict): a = DataStorage(a) # restore None that cannot be saved in h5py if isinstance(a,str) and a == "NONE_PYTHON_OBJECT": a = None # h5py can't save numpy unicode if isinstance(a,np.ndarray) and a.dtype.char == "S": a = a.astype(str) return a def dictToH5Group(d,group): """ helper function that transform (recursive) a dictionary into an hdf group by creating subgroups """ for key,value in d.items(): if isinstance(value,dict): group.create_group(key) dictToH5Group(value,group[key]) else: # h5py can't handle numpy unicode arrays if isinstance(value,np.ndarray) and value.dtype.char == "U": value = np.asarray([vv.encode('ascii') for vv in value]) # h5py can't save None if value is None: value="NONE_PYTHON_OBJECT" try: group[key] = value except TypeError: log.error("Can't save %s"%(key)) def dictToH5(h5,d): """ Save a dictionary into an hdf5 file TODO: add capability of saving list of array h5py is not capable of handling dictionaries natively""" h5 = h5py.File(h5,mode="w") # group = h5.create_group("/") dictToH5Group(d,h5["/"]) h5.close() def h5ToDict(h5,readH5pyDataset=True): """ Read a hdf5 file into a dictionary """ with h5py.File(h5,"r") as h: ret = unwrapArray(h,recursive=True,readH5pyDataset=readH5pyDataset) return ret def npzToDict(npzFile): with np.load(npzFile) as npz: d = dict(npz) d = unwrapArray(d,recursive=True) return d def npyToDict(npyFile): d = unwrapArray( np.load(npyFile).item() ,recursive=True) return d def dictToNpz(npzFile,d): np.savez(npzFile,**d) def dictToNpy(npyFile,d): np.save(npyFile,d) def objToDict(o,recursive=True): """ convert a DictWrap to a dictionary (useful for saving); it should work for other objects too TODO: this function does not catch a list of DataStorage instances like objToDict( ( DataStorage(), DataStorage() ) ) is not converted !! """ if "items" not in dir(o): return o d = dict() for k,v in o.items(): try: d[k] = objToDict( v ) except Exception as e: log.info("In objToDict, could not convert key %s to dict, error was"%\ (k,e)) d[k] = v return d def read(fname): extension = os.path.splitext(fname)[1] log.info("Reading storage file %s"%fname) if extension == ".npz": return DataStorage(npzToDict(fname)) elif extension == ".npy": return DataStorage(npyToDict(fname)) elif extension == ".h5": return DataStorage(h5ToDict(fname)) else: raise ValueError("Extension must be h5, npy or npz, it was %s"%extension) def save(fname,d): # make sure the object is dict (recursively) this allows reading it # without the DataStorage module d = objToDict(d,recursive=True) extension = os.path.splitext(fname)[1] log.info("Saving storage file %s"%fname) try: if extension == ".npz": return dictToNpz(fname,d) elif extension == ".h5": return dictToH5(fname,d) elif extension == ".npy": return dictToNpy(fname,d) else: raise ValueError("Extension must be h5, npy or npz, it was %s"%extension) except Exception as e: log.exception("Could not save %s"%fname) class DataStorage(dict): """ Storage for dict like object. recursive : bool recursively convert dict-like objects to DataStorage It can save data to file (format npy,npz or h5) To initialize it: data = DataStorage( dict( a=(1,2,3),b="add"),filename='store.npz' ) data = DataStorage( a=(1,2,3), b="add" ) reads from file if it exists data = DataStorage( 'mysaveddata.npz' ) ; DOES NOT READ FROM FILE (even if it exists)!! data = DataStorage( filename = 'mysaveddata.npz' ); create empty storage (with default filename) data = DataStorage() """ def __init__(self,*args,filename='data_storage.npz',recursive=True,**kwargs): # self.filename = kwargs.pop('filename',"data_storage.npz") self.filename = filename # interpret kwargs as dict if there are if len(kwargs) != 0: fileOrDict = dict(kwargs) elif len(kwargs)==0 and len(args)>0: fileOrDict = args[0] else: fileOrDict = dict() d = dict(); # data dictionary if isinstance(fileOrDict,dict): d = fileOrDict elif isinstance(fileOrDict,str): if os.path.exists(fileOrDict): d = read(fileOrDict) else: self.filename=fileOrDict d = dict() else: raise ValueError("Invalid DataStorage definition") if recursive: for k in d.keys(): if not isinstance(d[k],DataStorage) and isinstance(d[k],dict): d[k] = DataStorage(d[k]) # allow accessing with .data, .delays, etc. for k,v in d.items(): setattr(self,k,v) # allow accessing as proper dict self.update( **dict(d) ) def __setitem__(self, key, value): setattr(self,key,value) super().__setitem__(key, value) def __setattr__(self, key, value): """ allows to add fields with data.test=4 """ super().__setitem__(key, value) super().__setattr__(key,value) def __delitem__(self, key): delattr(self,key) super().__delitem__(key) def __str__(self): keys = list(self.keys()) keys.sort() return "DataStorage obj containing: %s" % ",".join(keys) def __repr__(self): keys = list(self.keys()) keys.sort() nchars = max(map(len,keys)) fmt = "%%%ds %%s" % (nchars) s = ["DataStorage obj containing (sorted): ",] for k in keys: if isinstance(self[k],np.ndarray): value_str = "array, size %s, type %s"% ("x".join(map(str,self[k].shape)),self[k].dtype) elif isinstance(self[k],DataStorage): value_str = str(self[k])[:50] + "..." elif isinstance(self[k],(str,DataStorage)): value_str = self[k][:50] + "..." elif self[k] is None: value_str = "None" else: value_str = str(self[k]) s.append( fmt % (k,value_str) ) return "\n".join(s) def save(self,fname=None): if fname is None: fname = self.filename assert fname is not None save(fname,self)