267 lines
12 KiB
Python
267 lines
12 KiB
Python
|
|
# encoding: utf8
|
|
# A Jython script with parameters.
|
|
# It is the duty of the scripting framework to harvest
|
|
# the 'name' parameter from the user, and then display
|
|
# the 'greeting' output parameter, based on its type.
|
|
from ij import IJ # pylint: disable=import-error
|
|
from ij import ImagePlus # pylint: disable=import-error
|
|
# from ij.process import ByteProcessor # pylint: disable=import-error
|
|
from ij.process import ImageStatistics # pylint: disable=import-error
|
|
from ij.plugin import ImageCalculator # pylint: disable=import-error
|
|
from ij.plugin import ZProjector # pylint: disable=import-error
|
|
from ij import WindowManager # pylint: disable=import-error
|
|
from catalog import Sequence, ImageCatalog
|
|
import telemos
|
|
from imageengine import IImageEngine, IImageProcessingDebugger, NullDebugger, StackImageFeeder, IHyperStack
|
|
# greeting = "Hello, " + name + "!"
|
|
from hdf5.hdf5_data import Group, DataSet, ElementType
|
|
from imagej.hdf5serializer import save_hdf5_file
|
|
|
|
import abc
|
|
|
|
ABC = abc.ABCMeta('ABC', (object,), {})
|
|
|
|
class ImageLogger(IImageProcessingDebugger):
|
|
|
|
def __init__(self):
|
|
IImageProcessingDebugger.__init__(self)
|
|
|
|
def on_image(self, image, image_id):
|
|
# todo : WindowManager
|
|
image.show()
|
|
|
|
|
|
class Lipase(object):
|
|
|
|
def __init__(self, catalog, debugger=NullDebugger()):
|
|
'''
|
|
:param IImageProcessingDebugger im_proc_debugger:
|
|
'''
|
|
self.catalog = catalog
|
|
self.debugger = debugger
|
|
|
|
def get_image_median_value(self, src_image):
|
|
'''
|
|
:param ImagePlus src_image:
|
|
'''
|
|
# https://imagej.nih.gov/ij/developer/api/ij/process/ImageStatistics.html
|
|
stats = ImageStatistics.getStatistics(src_image.getProcessor(), ImageStatistics.MEDIAN, None)
|
|
# print(stats)
|
|
# print(stats.pixelCount)
|
|
return stats.median
|
|
|
|
def get_image_mean_value(self, src_image):
|
|
'''
|
|
:param ImagePlus src_image:
|
|
'''
|
|
# https://imagej.nih.gov/ij/developer/api/ij/process/ImageStatistics.html
|
|
stats = ImageStatistics.getStatistics(src_image.getProcessor(), ImageStatistics.MEAN, None)
|
|
# print(stats)
|
|
# print(stats.pixelCount)
|
|
return stats.mean
|
|
|
|
def test_get_image_median_value(self):
|
|
image_file_path = '/Users/graffy/ownCloud/ipr/lipase/raw-images/res_soleil2018/GGH/GGH_2018_cin2_phiG_I_327_vis_-40_1/Pos0/img_000000000_DM300_327-353_fluo_000.tif'
|
|
image = IJ.openImage(image_file_path)
|
|
median_value = self.get_image_median_value(image)
|
|
print('median value : %d' % median_value)
|
|
|
|
def find_depth_index(self, src_image, white_sequence):
|
|
''' finds in the image sequence white_sequence the image that correlates the best to src_image
|
|
|
|
:param ImagePlus src_image:
|
|
:param Sequence white_sequence:
|
|
'''
|
|
self.debugger.on_image(src_image, 'src_image')
|
|
|
|
white_sequence.open_in_imagej()
|
|
|
|
print('image type : ', src_image.getType())
|
|
src_median_value = self.get_image_median_value(src_image)
|
|
normalized_src_processor = src_image.getProcessor().convertToFloat()
|
|
normalized_src_processor.multiply(1.0 / src_median_value)
|
|
best_diff = None
|
|
best_z_index = None
|
|
for z_index in range(white_sequence.num_slices):
|
|
white_image_file_path = white_sequence.get_image_file_path(channel_index=0, frame_index=0, z_index=z_index)
|
|
white_image = IJ.openImage(white_image_file_path)
|
|
# self.debugger.on_image(white_image, 'white_image')
|
|
white_median_value = self.get_image_median_value(white_image)
|
|
# white_to_src_factor = 1.0 / float(white_median_value)
|
|
normalized_white_processor = white_image.getProcessor().convertToFloat()
|
|
normalized_white_processor.multiply(1.0 / float(white_median_value))
|
|
|
|
normalized_src = ImagePlus() # IJ.openImage("http://imagej.nih.gov/ij/images/boats.gif")
|
|
normalized_src.setProcessor(normalized_src_processor)
|
|
# print(imp1)
|
|
normalized_white = ImagePlus() # IJ.openImage("http://imagej.nih.gov/ij/images/bridge.gif")
|
|
normalized_white.setProcessor(normalized_white_processor)
|
|
# print(imp2)
|
|
ic = ImageCalculator()
|
|
difference_image = ic.run("sub create float", normalized_src, normalized_white)
|
|
# self.debugger.on_image(imp3, 'diff')
|
|
|
|
# self.debugger.on_image(normalized_white, 'normalized_white')
|
|
# self.debugger.on_image(difference_image, 'difference_image')
|
|
|
|
# imp2mat = ImagePlusMatConverter()
|
|
# white_mat = imp2mat.toMat(white_image.getProcessor())
|
|
# white_mat = imread(white_image_file_path)
|
|
# print(white_image)
|
|
sqdiff_image = ic.run("mul create float", difference_image, difference_image)
|
|
mean_sqdiff = self.get_image_mean_value(sqdiff_image)
|
|
print('difference : ', mean_sqdiff)
|
|
if best_diff is None or mean_sqdiff < best_diff:
|
|
best_diff = mean_sqdiff
|
|
best_z_index = z_index
|
|
return best_z_index
|
|
|
|
def estimate_background(self, sequence):
|
|
channel_id = 'DM300_nofilter_vis'
|
|
# sequence.as_stack(channel_id).show()
|
|
visible_stack = sequence.as_hyperstack(selected_channel_ids=[channel_id])
|
|
visible_image_feeder = StackImageFeeder(visible_stack)
|
|
ie = IImageEngine.get_instance()
|
|
background_image = ie.compute_median(visible_image_feeder)
|
|
return background_image
|
|
|
|
def estimate_white(self, sequence, channel_id):
|
|
white_estimator = telemos.WhiteEstimator(open_size=75, close_size=75, average_size=75)
|
|
white_estimate = white_estimator.estimate_white([sequence], [channel_id], dark=None)
|
|
return white_estimate
|
|
|
|
def process_sequence(self, sequence_id):
|
|
'''
|
|
:param str sequence_id:
|
|
'''
|
|
raise NotImplementedError()
|
|
|
|
|
|
class IBackgroundEstimator(ABC):
|
|
|
|
@abc.abstractmethod
|
|
def estimate_background(self, visible_traps_sequence):
|
|
"""
|
|
:param IHyperStack visible_traps_sequence:
|
|
:rtype IImage: the estimated background image
|
|
"""
|
|
pass
|
|
|
|
class UserProvidedBackground(IBackgroundEstimator):
|
|
|
|
def __init__(self, background_image):
|
|
"""
|
|
:param IImage background_image: the image chosen as background (it is supposed to not contain any particle, only static objects (traps, lens spots, etc.))
|
|
"""
|
|
IBackgroundEstimator.__init__(self)
|
|
self.background_image = background_image
|
|
|
|
def estimate_background(self, visible_traps_sequence):
|
|
return self.background_image
|
|
|
|
class EmptyFrameBackgroundEstimator(IBackgroundEstimator):
|
|
|
|
def __init__(self, empty_frame_index):
|
|
"""
|
|
:param int empty_frame_index: the index of the frame that is chosen as background (it is supposed to not contain any particle, only static objects (traps, lens spots, etc.))
|
|
"""
|
|
IBackgroundEstimator.__init__(self)
|
|
self.empty_frame_index = empty_frame_index
|
|
|
|
def estimate_background(self, visible_traps_sequence):
|
|
background_estimate = visible_traps_sequence.get_image(frame_index=self.empty_frame_index).clone()
|
|
return background_estimate
|
|
|
|
|
|
class GlobulesAreaEstimator(object):
|
|
""" An image processing suite targetted to visible images of traps with moving particles (globules)
|
|
"""
|
|
|
|
def __init__(self, background_estimator, particle_threshold):
|
|
"""
|
|
:param IBackgroundEstimator background_estimator: the method that is used to estimate the background image.
|
|
:param float particle_threshold: if the absolute difference between the pixel and its corresponding value in the background image exceeds this, value, then this pixel is considered as a part of a particle
|
|
"""
|
|
self.background_estimator = background_estimator
|
|
self.particle_threshold = particle_threshold
|
|
|
|
def detect_particles(self, visible_traps_sequence):
|
|
"""
|
|
:param IHyperStack visible_traps_sequence:
|
|
:rtype hdf5_data.Group:
|
|
"""
|
|
background_image = self.background_estimator.estimate_background(visible_traps_sequence)
|
|
ie = IImageEngine.get_instance()
|
|
# particles_stack = ie.create_hyperstack(width=background_image.width, height=background_image.height, num_slices=1, num_frames=visible_traps_sequence.num_frames(), num_channels=1, pixel_type=PixelType.F32)
|
|
|
|
results = Group('')
|
|
globules_area = DataSet('globules_area_ratio', (visible_traps_sequence.num_frames(), ), ElementType.F32)
|
|
frame_indices = DataSet('frame index', (visible_traps_sequence.num_frames(), ), ElementType.U32)
|
|
results.add_dataset(globules_area)
|
|
results.add_dataset(frame_indices)
|
|
|
|
frame_area = background_image.get_width() * background_image.get_height()
|
|
for frame_index in range(visible_traps_sequence.num_frames()):
|
|
particle_image = ie.subtract(visible_traps_sequence.get_image(frame_index=frame_index), background_image)
|
|
abs_particle_image = ie.abs( particle_image )
|
|
is_particle = ie.threshold(abs_particle_image, self.particle_threshold)
|
|
measured_mean_value = is_particle.get_mean_value()
|
|
particle_pixel_value = 255.0
|
|
num_pixels = is_particle.get_width() * is_particle.get_height()
|
|
num_particle_pixels = int(measured_mean_value * num_pixels / particle_pixel_value)
|
|
print("num_particle_pixels: %d " % num_particle_pixels)
|
|
globules_area_ratio = float(num_particle_pixels)/frame_area
|
|
globules_area[(frame_index, )] = globules_area_ratio
|
|
frame_indices[(frame_index, )] = frame_index
|
|
# results.frames[frame_index] = GlobulesAreaEstimator.FrameResult(globules_area_ratio=globules_area_ratio)
|
|
# particles_stack.set_image(frame_index=frame_index, image=particle_image)
|
|
return results
|
|
|
|
def test_find_white():
|
|
|
|
raw_images_root = '/Users/graffy/ownCloud/ipr/lipase/raw-images'
|
|
catalog = ImageCatalog(raw_images_root)
|
|
print(catalog)
|
|
# catalog.sequences['GGH_2018_cin2_phiG_I_327_vis_-40_1/Pos0'].open_in_imagej()
|
|
# catalog.sequences['res_soleil2018/GGH/GGH_2018_cin2_phiG_I_327_vis_-40_1/Pos2'].open_in_imagej()
|
|
# catalog.sequences['DARK_40X_60min_1 im pae min_1/Pos0'].open_in_imagej()
|
|
# catalog.sequences['white_24112018_1/Pos0'].open_in_imagej()
|
|
|
|
lipase = Lipase(catalog, debugger=ImageLogger())
|
|
sequence = lipase.catalog.sequences['res_soleil2018/GGH/GGH_2018_cin2_phiG_I_327_vis_-40_1/Pos2']
|
|
white_seq = sequence.get_white()
|
|
channel_index = sequence.get_channel_index('DM300_327-353_fluo')
|
|
src_image_file_path = sequence.get_image_file_path(channel_index=channel_index, frame_index=0, z_index=0)
|
|
src_image = IJ.openImage(src_image_file_path)
|
|
# src_image = IJ.openImage(src_image_file_path)
|
|
depth_index = lipase.find_depth_index(src_image, white_seq) # pylint: disable=unused-variable
|
|
|
|
|
|
def run_script():
|
|
with open('/tmp/test_result.txt', 'w') as f:
|
|
f.write('%d' % {True: 0, False: 1}[False])
|
|
test_find_white()
|
|
|
|
if False:
|
|
lipase = Lipase(ImageCatalog('/Users/graffy/ownCloud/ipr/lipase/raw-images'), debugger=ImageLogger())
|
|
lipase.process_sequence('res_soleil2018/GGH/GGH_2018_cin2_phiG_I_327_vis_-40_1/Pos2')
|
|
|
|
if False:
|
|
sequence = lipase.catalog.sequences['GGH_2018_cin2_phiG_I_327_vis_-40_1/Pos0']
|
|
src_image_file_path = sequence.get_image_file_path(channel_index=sequence.get_channel_index('DM300_327-353_fluo'), frame_index=3)
|
|
src_image = IJ.openImage(src_image_file_path) # pylint: disable=unused-variable
|
|
|
|
# dark_image = IJ.openImage(raw_images_root + '/GGH_2018_cin2_phiG_I_327_vis_-40_1/Pos0/img_000000000_DM300_327-353_fluo_000.tif')
|
|
# src_image.show()
|
|
# assert src_image
|
|
with open('/tmp/test_result.txt', 'w') as f:
|
|
f.write('%d' % {True: 0, False: 1}[True])
|
|
|
|
|
|
# If a Jython script is run, the variable __name__ contains the string '__main__'.
|
|
# If a script is loaded as module, __name__ has a different value.
|
|
if __name__ in ['__builtin__', '__main__']:
|
|
run_script()
|
|
# test_get_image_median_value()
|