From 71b1ed5b8b1188b4d49cf8e8f047f3c2730ff2c3 Mon Sep 17 00:00:00 2001 From: Guillaume Raffy Date: Fri, 19 Nov 2021 18:04:30 +0100 Subject: [PATCH] added image processing listener utilities copied from https://subversion.ipr.univ-rennes1.fr/repos/main/projects/libraries/python but they will need admting though --- src/lipase/improc/__init__.py | 1 + src/lipase/improc/graphing.py | 106 +++++++++++++++ src/lipase/improc/improlis.py | 238 ++++++++++++++++++++++++++++++++++ src/lipase/improc/scene2d.py | 166 ++++++++++++++++++++++++ 4 files changed, 511 insertions(+) create mode 100644 src/lipase/improc/__init__.py create mode 100644 src/lipase/improc/graphing.py create mode 100644 src/lipase/improc/improlis.py create mode 100644 src/lipase/improc/scene2d.py diff --git a/src/lipase/improc/__init__.py b/src/lipase/improc/__init__.py new file mode 100644 index 0000000..3130f1a --- /dev/null +++ b/src/lipase/improc/__init__.py @@ -0,0 +1 @@ +from .improlis import MovieProcessDebugger diff --git a/src/lipase/improc/graphing.py b/src/lipase/improc/graphing.py new file mode 100644 index 0000000..382351a --- /dev/null +++ b/src/lipase/improc/graphing.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +""" + installation des modules nécéssaires sur macos x : + sudo port install opencv +python27 + sudo port install py-pyqt4 + sudo port install py-matplotlib + +""" +from matplotlib import pyplot +from matplotlib.backends.backend_pdf import PdfPages + +def setLatexLookingFonts(): + import matplotlib + matplotlib.rcParams['mathtext.fontset'] = 'stix' + matplotlib.rcParams['font.family'] = 'STIXGeneral' + #matplotlib.pyplot.title(r'ABC123 vs $\mathrm{ABC123}^{123}$') + +class Signal: + def __init__(self, signal, name=None): + self.m_signalValues = signal + self.m_name = name + +class Point2D(object): + def __init__(self, x, y): + self.m_x = x + self.m_y = y + @property + def x(self): + return self.m_x + @property + def y(self): + return self.m_y + +class ScatterPlot(object): + def __init__(self, xAxisDesc = None, yAxisDesc = None): + self.m_points = [] + self.m_xAxisDesc = xAxisDesc + self.m_yAxisDesc = yAxisDesc + def append(self, point2d ): + self.m_points.append(point2d) + @property + def xAxisDesc(self): + return self.m_xAxisDesc + @property + def yAxisDesc(self): + return self.m_yAxisDesc + + def setXAxisDesc(self, description): + self.m_xAxisDesc = description + def setYAxisDesc(self, description): + self.m_yAxisDesc = description + def __iter__(self): + return iter(self.m_points) + +def saveScatterPlot( scatterPlot, pdfFileName): + setLatexLookingFonts() + fig = pyplot.figure() + pyplot.subplot(1,1,1) + x = [] + y = [] + for point in scatterPlot: + x.append( point.x ) + y.append( point.y ) + pyplot.scatter(x, y, marker='x') + if scatterPlot.xAxisDesc is not None: + pyplot.xlabel(scatterPlot.xAxisDesc) + if scatterPlot.yAxisDesc is not None: + pyplot.ylabel(scatterPlot.yAxisDesc) + + #print('saving to '+pdfFileName) + pp = PdfPages(pdfFileName) + pp.savefig( fig ) + pp.close() + pyplot.close(fig) + + +def saveGraph(signal, pdfFileName): + setLatexLookingFonts() + fig = pyplot.figure() + pyplot.subplot(1,1,1) + pyplot.plot(signal, marker='x') + + print('saving to '+pdfFileName) + pp = PdfPages(pdfFileName) + pp.savefig( fig ) + pp.close() + pyplot.close(fig) + +def saveMultiGraph(signals, pdfFileName): + setLatexLookingFonts() + fig = pyplot.figure() + pyplot.subplot(1,1,1) + for signal in signals: + print('plotting signal %s' % str(signal.m_signalValues.shape)) + pyplot.plot(signal.m_signalValues, label=signal.m_name) + print('saving to '+pdfFileName) + pyplot.legend() + pp = PdfPages(pdfFileName) + pp.savefig( fig ) + pp.close() + pyplot.close(fig) + + + + + diff --git a/src/lipase/improc/improlis.py b/src/lipase/improc/improlis.py new file mode 100644 index 0000000..eb0458c --- /dev/null +++ b/src/lipase/improc/improlis.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +""" + installation des modules nécéssaires sur macos x : + sudo port install opencv +python27 + sudo port install py-pyqt4 + sudo port install py-matplotlib + +""" +#from PyQt4.QtGui import * +#from PyQt4.QtCore import * +#import sys + +import cv2 +#from cv2 import cv +import numpy +import os +import sys +#from matplotlib import pyplot as plt +#from matplotlib.backends.backend_pdf import PdfPages + +sys.path.append('../Libraries/python') +import scene2d +import graphing + +class Line2D(object): + def __init__( self, point1, point2 ): + self.m_point1 = point1 + self.m_point2 = point2 + +class IImageProcessListener(object): + """ + an abstract base class that handle events that happen during an image processing. This provides a flexible way to debug image processing + """ + def __init__(self): + self.m_imageIndex = 0 + def onSignal(self, signal, signalName): + """ + a new signal (1d array) has just been computed + """ + assert( False ) + def onImage(self, image, imageName): + """ + a new image has just been computed + """ + assert( False ) + +class NullImageProcessListener(IImageProcessListener): + def __init__(self): + IImageProcessListener.__init__( self ) + def onSignal(self, signal, signalName): + pass + def onBaseImage(self, image, imageName=''): + pass + def onImage(self, image, imageName): + pass + def onPoint(self, point, layerPath, label=None ): + pass + def onLine(self, line, layerPath, label=None ): + pass + def onCircle(self, circle, layerPath, label=None ): + pass + +class ImageProcessDebugger(IImageProcessListener): + def __init__(self, outputFolder='./debug'): + IImageProcessListener.__init__( self ) + self.m_scene = None + self.m_baseImageName = None + self.setOutputFolder(outputFolder) + + def __del__(self): + #assert( self.m_scene is not None ) + if self.m_scene: + self.m_scene.saveAsSvg('%s/%s.svg' % (self.m_outputFolder, self.m_baseImageName)) + self.m_scene = None + def setOutputFolder(self, outputFolderPath): + if self.m_scene: + self.m_scene.saveAsSvg('%s/%s.svg' % (self.m_outputFolder, self.m_baseImageName)) + self.m_scene = None + self.m_outputFolder = outputFolderPath + pathParts = self.m_outputFolder.split('/') + path = '' + for i in range(len(pathParts)): + if i != 0: + path += '/' + path += pathParts[i] + try: + os.mkdir(path) + except (OSError): # this exception is raised if the folder already exists + pass + def onSignal(self, signal, signalName): + graphing.saveGraph(signal, '%s/%s.pdf' % (self.m_outputFolder, signalName)) + def onSignals(self, signals, signalName): + graphing.saveMultiGraph(signals, '%s/%s.pdf' % (self.m_outputFolder, signalName)) + def onImage(self, image, imageName): + #plt.subplot(1,1,1),plt.imshow(contourImage,cmap = 'gray') + #plt.title('Sobel X'), plt.xticks([]), plt.yticks([]) + #plt.show() + filePath = '%s/%s.tif' % (self.m_outputFolder, imageName) + saveImage( image, filePath ) + def onBaseImage(self, image, imageName=''): + assert(imageName != '') + self.m_baseImageName=imageName + filePath = '%s/%s.png' % (self.m_outputFolder, imageName) + saveImage( image, filePath ) + self.m_scene = scene2d.Scene() + assert( self.m_scene is not None ) + if self.m_scene: + self.m_scene.setBaseImage(scene2d.Image(filePath)) + def onPoint(self, point, layerPath, label=None ): + assert( self.m_scene ) + self.m_scene.getLayer(layerPath).addChild(scene2d.Point(point, label)) + def onLine(self, line, layerPath, label=None ): + assert( self.m_scene ) + self.m_scene.getLayer(layerPath).addChild(scene2d.Line(line, label)) + def onCircle(self, circle, layerPath, label=None ): + assert( self.m_scene ) + self.m_scene.getLayer(layerPath).addChild(scene2d.Circle(circle, label)) + + +class IMovieProcessListener(object): + """ + an abstract base class that handle events that happen durin a movie image processing. This provides a flexible way to debug imageprocessing + """ + def __init__(self, imageProcessListener ): + self.m_imageIndex = 0 + self.m_imageProcessListener = imageProcessListener + def onImageProcessingStart(self): + """ + the processing of a new image starts + """ + self.m_imageIndex += 1 + def onImageProcessingEnd(self): + """ + the processing of a new image ends + """ + pass + def onSignal(self, signal, signalName): + """ + a new signal (1d array) has just been computed + """ + self.m_imageProcessListener.onSignal( signal, signalName ) + def onImage(self, image, imageName): + """ + a new image has just been computed + """ + self.m_imageProcessListener.onImage( image, imageName ) + +class NullMovieProcessListener(IMovieProcessListener): + def __init__(self): + IMovieProcessListener.__init__( self, NullImageProcessListener() ) + def onSignal(self, signal, signalName): + pass + def onImage(self, image, imageName): + pass + +def saveImage(image, filePath): + print('%s original image type : %s range=(%f:%f)' % (filePath, str(image.dtype), image.min(), image.max())) + if image.dtype == numpy.bool: + cv2.imwrite(filePath, image.astype(numpy.uint8)*255) + elif image.dtype == numpy.uint16: + fileExt = filePath.split('.')[-1] + if fileExt == 'tif': + # tif file format supports 16 bits per pixel + cv2.imwrite(filePath, image) + else: + u8Image = cv2.normalize(image, alpha=0.0, beta=255.0, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U) + cv2.imwrite(filePath, u8Image) + elif image.dtype == numpy.float32 or image.dtype == numpy.float64: + print('image range : %d-%d' % (image.min(), image.max())) + u8Image = cv2.normalize(image, numpy.array(image.shape, dtype=numpy.uint8), alpha=0.0, beta=255.0, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U) + print('u8Image range : %d-%d' % (u8Image.min(), u8Image.max())) + cv2.imwrite(filePath, u8Image) + elif image.dtype == numpy.int32: + print('image range : %d-%d' % (image.min(), image.max())) + u8Image = cv2.normalize(image, numpy.array(image.shape, dtype=numpy.uint8), alpha=0.0, beta=255.0, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U) + print('u8Image range : %d-%d' % (u8Image.min(), u8Image.max())) + cv2.imwrite(filePath, u8Image) + else: + #assert( False ) + cv2.imwrite(filePath, image) + + + +class MovieProcessDebugger(IMovieProcessListener): + def __init__(self): + IMovieProcessListener.__init__( self, ImageProcessDebugger() ) + self.m_outputFolder='./debug' + self.m_scene = None + try: + os.mkdir(self.m_outputFolder) + except (OSError): # this exception is raised if the folder already exists + pass + def onImageProcessingStart(self): + """ + the processing of a new image starts + """ + IMovieProcessListener.onImageProcessingStart(self) + self.m_imageProcessListener.setOutputFolder( '%s/image%d' % (self.m_outputFolder, self.m_imageIndex) ) + + def onImageProcessingEnd(self): + IMovieProcessListener.onImageProcessingEnd(self) + self.m_imageProcessListener.setOutputFolder( None ) + def onSignal(self, signal, signalName): + self.m_imageProcessListener.onSignal( signal, signalName ) + def onSignals(self, signals, signalName): + self.m_imageProcessListener.onSignal( signals, signalName ) + def onImage(self, image, imageName): + self.m_imageProcessListener.onImage( image, imageName ) + def onBaseImage(self, image, imageName): + self.m_imageProcessListener.onBaseImage( image, imageName ) + def onPoint(self, point, layerPath, label=None ): + self.m_imageProcessListener.onPoint( image, point, layerPath, label ) + def onCircle(self, circle, layerPath, label=None ): + self.m_imageProcessListener.onPoint( image, circle, layerPath, label ) + + +class IImageProcessor(object): + + def __init__(self, movieProcessListener = NullMovieProcessListener()): + self.m_movieProcessListener = movieProcessListener + + def processImage(self, image): + assert( False ) # this method is not supposed to be called + + def get_image_process_listener(self): + return self.m_movieProcessListener + + +def findEdges(image): + sx = cv2.Sobel(image, cv2.CV_32F, dx=1, dy=0, ksize=3) + sy = cv2.Sobel(image, cv2.CV_32F, dx=0, dy=1, ksize=3) + return numpy.sqrt(sx*sx + sy*sy) + + + + + + diff --git a/src/lipase/improc/scene2d.py b/src/lipase/improc/scene2d.py new file mode 100644 index 0000000..3e5269c --- /dev/null +++ b/src/lipase/improc/scene2d.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +import cv2 + +class ISceneNodeVisitor(object): + def __init__(self): + pass + + def visitPoint(self, point): + assert( False ) + + def visitImage(self, image): + assert( False ) + + def visitLayer(self, layer): + assert( False ) + + def visitScene(self, scene): + assert( False ) + + def visitLine(self, line): + assert( False ) + + + +class ISceneNode(object): + def __init__(self): + pass + + def onVisit(self, visitor ): + assert( False ) + +class Point(ISceneNode): + def __init__(self, coords, label = None): + self.m_coords = coords + self.m_label = label + + def onVisit( self, visitor ): + visitor.visitPoint( self ) + +class Line(ISceneNode): + def __init__(self, line, label = None): + self.m_from = line[0] + self.m_to = line[1] + self.m_label = label + + def onVisit( self, visitor ): + visitor.visitLine( self ) + +class Circle(ISceneNode): + def __init__(self, circle, label = None): + self.m_center = (circle[0], circle[1]) + self.m_radius = circle[2] + self.m_label = label + + def onVisit( self, visitor ): + visitor.visitCircle( self ) + +class Image(ISceneNode): + def __init__(self, imageFilePath): + self.m_imageFilePath = imageFilePath + + def onVisit(self, visitor ): + visitor.visitImage( self ) + + def getFilePath(self): + return self.m_imageFilePath + + def getSize(self): + cv_img = cv2.imread(self.m_imageFilePath, cv2.IMREAD_ANYDEPTH) + return cv_img.shape + +class Layer(ISceneNode): + def __init__(self, layerName): + self.m_name = layerName + self.m_children = [] + self.m_layers = {} + + def getName(self): + return self.m_name + + def addChild(self, childNode): + self.m_children.append( childNode ) + + def onVisit(self, visitor ): + visitor.visitLayer( self ) + + def addLayer(self, layer): + print('adding layer %s' % layer.getName()) + self.m_layers[ layer.getName() ] = layer + + + +class Scene(ISceneNode): + def __init__(self): + self.m_image = None + self.m_rootLayer = Layer('root') + + def onVisit(self, visitor ): + visitor.visitScene( self ) + + def setBaseImage( self, image ): + self.m_image = image + + def getLayer(self, layerPath): + pathParts = layerPath.split('/') + parentLayer = self.m_rootLayer + for layerName in pathParts: + if layerName not in parentLayer.m_layers: + parentLayer.addLayer( Layer(layerName) ) + parentLayer = parentLayer.m_layers[layerName] + return parentLayer + + def saveAsSvg(self, filePath): + visitor = SvgExporter( filePath ) + self.onVisit(visitor) + +class SvgExporter(ISceneNodeVisitor): + def __init__(self, svgFilePath): + self.m_svgFilePath = svgFilePath + + def visitPoint(self, point): + radius = 1.0 + self.m_f.write('\n' % (point.m_coords[0], point.m_coords[1], radius) ) + if point.m_label is not None : + self.m_f.write('%s\n' % (point.m_coords[0], point.m_coords[1], point.m_label) ) + + def visitLine(self, line): + radius = 1.0 + self.m_f.write('\n' % (line.m_from[0], line.m_to[0], line.m_from[1], line.m_to[1]) ) + if line.m_label is not None : + self.m_f.write('%s\n' % (line.m_from[0], line.m_from[1], point.m_label) ) + + def visitCircle(self, circle): + radius = 1.0 + self.m_f.write('\n' % (circle.m_center[0], circle.m_center[1], circle.m_radius) ) + if circle.m_label is not None : + self.m_f.write('%s\n' % (circle.m_center[0], circle.m_center[1], point.m_label) ) + + def visitImage(self, image): + imageSize = image.getSize() + self.m_f.write('\n' % (image.getFilePath().split('/')[-1], imageSize[1], imageSize[0]) ) + + def visitLayer(self, layer): + self.m_f.write('\n' % layer.getName()) + for child in layer.m_children: + child.onVisit( self ) + for layer in layer.m_layers.itervalues(): + layer.onVisit( self ) + self.m_f.write('\n') + + def visitScene(self, scene): + with open(self.m_svgFilePath, 'wt') as self.m_f: + print('exporting scene2d as %s' % self.m_svgFilePath) + self.m_f.write('') + self.m_f.write('') + self.m_f.write('') + self.m_f.write('') + self.m_f.write('') + if scene.m_image is not None: + scene.m_image.onVisit(self) + + scene.m_rootLayer.onVisit(self) + #f.write('') + self.m_f.write('') + +