Commit 5fef0b50 authored by kulvait's avatar kulvait

Initial commit

DEN file processing
parent d4b5f7f0
Pipeline #163 canceled with stages
playground
__pycache__
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 26 15:18:35 2018
In this file, we work with row major image data indexing them (i,j)=(row,col)
@author: Vojtech Kulvait
"""
import numpy as np
import os
import cv2
from pathlib import Path
#Get frame from DEN file
#Can get a subframe, where row_from is the first row index
#row_to is not included
#col_from is the first col index
#col_to is not included
def getFrame(file, i, row_from=None, row_to=None, col_from=None, col_to=None):
# i is the slice index
header = np.fromfile(file, np.dtype('<i2'), 3)
rows = np.uint32(header[0])
columns = np.uint32(header[1])
if row_from is None:
row_from = 0
if row_to is None:
row_to = rows;
if col_from is None:
col_from = 0
if col_to is None:
col_to = columns
f = open(file, "rb")
f.seek(6+rows*columns*4*i, os.SEEK_SET)
data = np.fromfile(f, np.dtype('<f4'), rows*columns)
newdata = data.reshape((rows, columns))
newdata = newdata[row_from:row_to, col_from:col_to]
return(newdata)
def storeToDEN(file, data, force=False):
my_file = Path(file)
if not force and my_file.is_file():
print('File already exists, no data written')
return False
if not isinstance(data, np.ndarray):
print('Data has to be of type numpy.array')
return False
if len(data.shape) == 1:
print('Dimension = 1, expected >= 2')
return False
elif len(data.shape) == 2:
writeHeader(file, rows=data.shape[0], cols=data.shape[1], zdim=1, force=force)
writeFrame(file, 0, data, force=force)
return True
i = data.shape[2]
writeEmptyDEN(file, dimx=data.shape[0], dimy=data.shape[1], dimz=data.shape[2], force=force)
header = np.fromfile(file, np.dtype('<i2'), 3)
rows = np.uint32(header[0])
columns = np.uint32(header[1])
f = open(file, "wb")
header.tofile(f)
for frame in range(i):
newdata = np.array(data[:, :, frame], np.dtype('<f4'))
newdata = newdata.reshape((rows*columns, ))
# put header in front of image data
f.seek(6 + rows * columns * frame * 4, os.SEEK_SET)
newdata.tofile(f)
f.close()
return True
def writeFrame(file, i, data, force=False):
my_file = Path(file)
if not force and my_file.is_file():
print('File already exists, no data written')
return False
dim = data.shape
if len(dim) == 1:
print("Dimension = 1, expected 2")
return False
elif len(dim) == 3:
print("Dimension = 3, expected 2")
elif len(dim) == 2:
pass
else:
raise Exception('Image Dimension Mismatch')
header = np.fromfile(file, np.dtype('<i2'), 3)
rows = np.uint32(header[0])
columns = np.uint32(header[1])
f = open(file, "wb")
data = np.array(data, np.dtype('<f4'))
data = data.reshape((rows*columns, ))
# put header in front of image data
header.tofile(f)
f.seek(6 + rows * columns * i * 4, os.SEEK_SET)
data.tofile(f)
f.close()
return True
def writeEmptyDEN(file, dimx, dimy, dimz, force=False):
my_file = Path(file)
if not force and my_file.is_file():
print('File already exists, no header written')
return False
outfile = open(file, "w")
header = np.array([dimx, dimy, dimz])
header = np.array(header, dtype='<i2')
header.tofile(outfile)
outfile.close()
return True
def readHeader(file):
header = np.fromfile(file, np.dtype('<i2'), 3)
par = {}
par["rows"] = np.uint32(header[0])
par["cols"] = np.uint32(header[1])
par["zdim"] = np.uint32(header[2])
return(par)
#Trim frame to the specified dimensions
#Can get a subframe, where row_from is the first row index
#row_to is not included
#col_from is the first col index
#col_to is not included
def trimFrame(frame, row_from, row_to, col_from, col_to):
newdata = frame[row_from:row_to, col_from:col_to]
return(newdata)
#Estimate rigid transformation
#Most naive approach
def getRigidTransformation(frame1, frame2):
minvalue = min(np.percentile(frame1, 10), np.percentile(frame2, 10))
maxvalue = min(np.percentile(frame1, 90), np.percentile(frame2, 90))
frame1 = (frame1-minvalue)/(maxvalue-minvalue)
frame2 = (frame2-minvalue)/(maxvalue-minvalue)
frame1 = np.maximum(0, np.minimum(1, frame1))
frame2 = np.maximum(0, np.minimum(1, frame2))
warp_matrix = cv2.estimateRigidTransform((frame1*255+0.5).astype('uint8'),(frame2*255+0.5).astype('uint8'), False)
return warp_matrix
#Estimate of corelation coefficient
#Gets result after first step
#There is no function for direct computation
def estimateCC(frame1, frame2):
warp_mode = cv2.MOTION_EUCLIDEAN
warp_matrix = np.eye(2, 3, dtype=np.float32)
max_iterations = 1;
termination_eps = 1e-15;
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, max_iterations, termination_eps)
(cc, warp_matrix) = cv2.findTransformECC(frame1,frame2, warp_matrix, warp_mode, criteria)
return (cc, warp_matrix);
#Get optimal transformation
def getTransformECC(frame1, frame2, warp_mode=None, max_iterations=None, termination_eps=None, init_warp_matrix=None):
if warp_mode is None:
warp_mode = cv2.MOTION_EUCLIDEAN
if max_iterations is None:
max_iterations = 5000#Default of findTransformECC is 50
if termination_eps is None:
termination_eps = 1e-10;#Default of findTransformECC is 0.001
if init_warp_matrix is None:
if warp_mode == cv2.MOTION_HOMOGRAPHY:
init_warp_matrix = np.eye(3, 3, dtype=np.float32)
else :
init_warp_matrix = np.eye(2, 3, dtype=np.float32)
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, max_iterations, termination_eps)
(cc, warp_matrix) = cv2.findTransformECC (frame1, frame2, init_warp_matrix, warp_mode, criteria)
return (cc, warp_matrix);
#Calculate the x and y gradients using Sobel operator
#Then using Euclidean norm of both
def getGradient(frame) :
grad_x = cv2.Sobel(frame,cv2.CV_32F,1,0,ksize=3)
grad_y = cv2.Sobel(frame,cv2.CV_32F,0,1,ksize=3)
return np.sqrt(np.add(np.square(grad_x), np.square(grad_y)))
#Calculate Laplacian
def getLaplacian(frame):
return(cv2.Laplacian(frame, cv2.CV_32F, ksize=23))
#Apply the same algorithm on gradient data
def gradientTransformECC(frame1, frame2, warp_mode=None, max_iterations=None, termination_eps=None, init_warp_matrix=None):
g1 = getGradient(frame1)
g2 = getGradient(frame2)
return getTransformECC(g1, g2, warp_mode, max_iterations, termination_eps, init_warp_matrix)
#Apply the same algorithm on laplacian data
def laplacianTransformECC(frame1, frame2, warp_mode=None, max_iterations=None, termination_eps=None, init_warp_matrix=None):
l1 = getLaplacian(frame1)
l2 = getLaplacian(frame2)
return getTransformECC(l1, l2, warp_mode, max_iterations, termination_eps, init_warp_matrix)
#For solving the problem that we have warp_matrix in the coordinates (j,i)
#here (x,y)=(col, row)
#We want the warp_matrix to operate on the data relative to (j',i')
#(i,j) = (i'-offset_row, j'-offset_col)
#We have that (Ax)' = Ax' - Aoffset + offset
#Here offset is (offset_row, offset_col, 0)
#So the operator that acts on (i',j') space has modified third column.
def shiftWarpMatrix(warp_matrix, offset_row, offset_col):
output_matrix = np.copy(warp_matrix)
offsetVector = np.array([offset_col, offset_row, 0], dtype=np.float32)
thirdRowAdd = -np.matmul(warp_matrix, offsetVector) + offsetVector[0:warp_matrix.shape[0]]
output_matrix[:,2] = output_matrix[:,2] + thirdRowAdd
return output_matrix;
#Perform the 'backtransformation' of the frame to the template space
#It seems that this function takes (x, y) indexing (cols, rows)
#Interpolation mode see https://docs.opencv.org/3.1.0/da/d54/group__imgproc__transform.html#gga5bb5a1fea74ea38e1a5445ca803ff121aa5521d8e080972c762467c45f3b70e6c
def warpFrame(frame, warp_matrix, warp_mode, interpolation_mode = None):
if interpolation_mode is None:
interpolation_mode = cv2.INTER_LANCZOS4
(rows, cols) = frame.shape
if warp_mode == cv2.MOTION_HOMOGRAPHY:
frameAligned = cv2.warpPerspective(frame, warp_matrix, (cols, rows), flags=interpolation_mode + cv2.WARP_INVERSE_MAP)
else:
frameAligned = cv2.warpAffine(frame, warp_matrix, (cols, rows), flags=interpolation_mode + cv2.WARP_INVERSE_MAP)
return frameAligned;
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
In this file, we work with row major image data indexing them (i,j)=(row,col)
This file is for the vizualization of the perfusion data.
@author: Vojtech Kulvait
"""
import DEN
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import numpy as np
from numpy.polynomial import Legendre as L
#Get the vector with coefficients extracted from given position
#Coordinates are supplied in a vector [x, y, z]
def getCoefs(coefFiles, coords):
coefs = [DEN.getFrame(a, coords[2]) for a in coefFiles]
coefs = [a[coords[1], coords[0]] for a in coefs]
return coefs
#In the fileList, there are coefficients to the Legendre polynomials in a DEN format
def showLegendreTAC(coefFiles, recoFiles, x, y , z, startInterval = None, endInterval = None, sweepTime=None, pauseSize=None, legend_x=None, legend_y=None, legend=None, outputFile=None):
print(x)
if startInterval is None:
startInterval = 4117
if endInterval is None:
endInterval = 56000
if startInterval > endInterval:
raise ValueError('Start interval %f needs to be less than end interval %f which is not.'%(startInterval, endInterval))
if pauseSize is None:
pauseSize = 2089
if sweepTime is None:
sweepTime = 4100
if legend_x is None:
legend_x = "Time [ms]"
if legend_y is None:
legend_y = "Attenuation [HU]"
if legend is None:
legend = "TAC at [%d, %d, %d]"%(x, y, z)
totalTime = sweepTime+pauseSize
cfs = getCoefs(coefFiles, [x,y,z])
val = getCoefs(recoFiles, [x,y,z])
valueTimes = np.arange(sweepTime / 2, sweepTime / 2 + totalTime*10, totalTime)
valueTimes = np.maximum(startInterval, valueTimes)
valueTimes = np.minimum(endInterval, valueTimes)
p = L(cfs, domain=[startInterval, endInterval])
xx, yy = p.linspace()
plt.plot(xx, yy/2.7e-5-1000, lw=2, label="Legendre reconstructions, degree 5")
plt.plot(valueTimes, np.add(np.divide(val, 2.7e-5), -1000), lw=2, label="Static reconstructions")
plt.legend()
plt.xlabel(legend_x)
plt.ylabel(legend_y)
plt.title(legend)
if outputFile is not None:
plt.savefig(outputFile)
def truncate_colormap(cmap, minval=0.0, maxval=1.0, n=100):
new_cmap = colors.LinearSegmentedColormap.from_list(
'trunc({n},{a:.2f},{b:.2f})'.format(n=cmap.name, a=minval, b=maxval),
cmap(np.linspace(minval, maxval, n)))
return new_cmap
#Get color map that is usually used to visualize perfusion
def getPerfusionMap():
cmap = plt.get_cmap('nipy_spectral')
new_cmap = truncate_colormap(cmap, 0, 0.93)
return new_cmap;
def visualizeSlice(file, z, legend_x=None, legend_y=None, legend=None, xrange=None, yrange=None, outputFile=None, vmin=None, vmax=None, qmin=None, qmax=None):
f = DEN.getFrame(file, z)
if xrange is None:
xrange = range(f.shape[1])
if yrange is None:
yrange = range(f.shape[0])
f = f[yrange, :]
f = f[:, xrange]
if vmin is None:
if qmin is None:
qmin = 0.0
vmin = np.percentile(f, qmin)
if vmax is None:
if qmax is None:
qmax = 100.0
vmax = np.percentile(f, qmax)
cm=getPerfusionMap();
plt.imshow(f, vmin=vmin, cmap=cm, vmax=vmax)
plt.colorbar()
if legend_x is not None:
plt.xlabel(legend_x)
if legend_y is not None:
plt.ylabel(legend_y)
if legend is not None:
plt.title(legend)
if outputFile is not None:
plt.savefig(outputFile, dpi=300)
This diff is collapsed.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 12 18:23:46 2018
DEN and DICOM IO manipulation
@author: Vojtech Kulvait
@author: Enno Schieferecke
"""
import numpy as np
import os
import cv2
# Get frame from DEN file
# Can get a subframe, where row_from is the first row index
# row_to is not included
# col_from is the first col index
# col_to is not included
def getFrame(fileName, i, row_from=None, row_to=None, col_from=None, col_to=None):
# i is the slice index
header = np.fromfile(file, np.dtype('<i2'), 3)
rows = np.uint32(header[0])
columns = np.uint32(header[1])
if row_from is None:
row_from = 0
if row_to is None:
row_to = rows
if col_from is None:
col_from = 0
if col_to is None:
col_to = columns
f = open(fileName, "rb")
f.seek(6 + rows * columns * 4 * i, os.SEEK_SET)
data = np.fromfile(f, np.dtype('<f4'), rows * columns)
newdata = data.reshape((rows, columns))
newdata = newdata[row_from:row_to, col_from:col_to]
return(newdata)
def storeNdarrayAsDEN(fileName, dataFrame, force=False):
if not force and os.path.exists(fileName):
raise IOError('File already exists, no data written')
if not isinstance(dataFrame, np.ndarray):
raise TypeError('Object dataFrame has to be of type numpy.array')
if len(dataFrame.shape) == 1:
print('Dimension = 1, expected >= 2')
return False
elif len(dataFrame.shape) == 2:
dataFrame = np.expand_dims(dataFrame, axis=2)
elif len(dataFrame.shape) > 3:
raise ValueError(
'Dimension of dataFrame should be 2 or 3 but is %d.' % len(dataFrame.shape))
shape = dataFrame.shape#Now len is for sure 3
writeEmptyDEN(fileName, dimx=shape[0], dimy=shape[1], dimz=shape[2], force=force)#No effect
header = np.fromfile(fileName, np.dtype('<i2'), 3)
rows = np.uint32(header[0])
columns = np.uint32(header[1])
f = open(file, "wb")
header.tofile(f)
for frame in range(i):
newdata = np.array(data[:, :, frame], np.dtype('<f4'))
newdata = newdata.reshape((rows * columns, ))
# put header in front of image data
f.seek(6 + rows * columns * frame * 4, os.SEEK_SET)
newdata.tofile(f)
f.close()
return True
def writeFrame(fileName, k, data, force=False):
if not force and os.path.exists(fileName):
raise IOError('File %s already exists, no header written' % fileName)
shape = data.shape
if len(shape) != 2:
raise ValueError('Dimension of data should be 2 %d.' % len(shape))
header = np.fromfile(file, np.dtype('<i2'), 3)
rows = np.uint32(header[0])
columns = np.uint32(header[1])
if shape[0] != rows or shape[1] != columns:
raise ValueError('There is dimension mismatch between frame (%d, %d) and expected (rows, cols) = (%d, %d) according to header.' % (rows, columns, shape[0], shape[1]))
f = open(file, "wb+")
data = np.array(data, np.dtype('<f4'))
data = data.reshape((rows * columns, ))
f.seek(6 + rows * columns * k * 4, os.SEEK_SET)
data.tofile(f)
f.close()
def writeEmptyDEN(fileName, dimx, dimy, dimz, force=False):
if not force and os.path.exists(fileName):
raise IOError('File %s already exists, no header written' % fileName)
outfile = open(file, "w")
header = np.array([dimx, dimy, dimz])
header = np.array(header, dtype='<i2')
header.tofile(outfile)
fileSize = dimx*dimy*dimz*4+6
outfile.seek(fileSize-1)
outfile.write('\0')
outfile.close()
def readHeader(file):
header = np.fromfile(file, np.dtype('<i2'), 3)
par = {}
par["rows"] = np.uint32(header[0])
par["cols"] = np.uint32(header[1])
par["zdim"] = np.uint32(header[2])
return(par)
# Trim frame to the specified dimensions
# Can get a subframe, where row_from is the first row index
# row_to is not included
# col_from is the first col index
# col_to is not included
def trimFrame(frame, row_from, row_to, col_from, col_to):
newdata = frame[row_from:row_to, col_from:col_to]
return(newdata)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created 04 2019
Functions for ECC based image registration of DEN files
@author: Vojtech Kulvait
"""
import numpy as np
import os
# Estimate rigid transformation
# Most naive approach
def getRigidTransformation(frame1, frame2):
minvalue = min(np.percentile(frame1, 10), np.percentile(frame2, 10))
maxvalue = min(np.percentile(frame1, 90), np.percentile(frame2, 90))
frame1 = (frame1 - minvalue) / (maxvalue - minvalue)
frame2 = (frame2 - minvalue) / (maxvalue - minvalue)
frame1 = np.maximum(0, np.minimum(1, frame1))
frame2 = np.maximum(0, np.minimum(1, frame2))
warp_matrix = cv2.estimateRigidTransform((frame1 * 255 + 0.5).astype('uint8'), (frame2 * 255 + 0.5).astype('uint8'), False)
return warp_matrix
# Estimate of corelation coefficient
# Gets result after first step
# There is no function for direct computation
def estimateCC(frame1, frame2):
warp_mode = cv2.MOTION_EUCLIDEAN
warp_matrix = np.eye(2, 3, dtype=np.float32)
max_iterations = 1
termination_eps = 1e-15
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, max_iterations, termination_eps)
(cc, warp_matrix) = cv2.findTransformECC(frame1, frame2, warp_matrix, warp_mode, criteria)
return (cc, warp_matrix)
# Get optimal transformation
def getTransformECC(frame1, frame2, warp_mode=None, max_iterations=None, termination_eps=None, init_warp_matrix=None):
if warp_mode is None:
warp_mode = cv2.MOTION_EUCLIDEAN
if max_iterations is None:
max_iterations = 5000 # Default of findTransformECC is 50
if termination_eps is None:
termination_eps = 1e-10
# Default of findTransformECC is 0.001 if init_warp_matrix is None
if warp_mode == cv2.MOTION_HOMOGRAPHY:
init_warp_matrix = np.eye(3, 3, dtype=np.float32)
else:
init_warp_matrix = np.eye(2, 3, dtype=np.float32)
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, max_iterations, termination_eps)
(cc, warp_matrix) = cv2.findTransformECC(frame1, frame2, init_warp_matrix, warp_mode, criteria)
return (cc, warp_matrix)
# Calculate the x and y gradients using Sobel operator
# Then using Euclidean norm of both
def getGradient(frame):
grad_x = cv2.Sobel(frame, cv2.CV_32F, 1, 0, ksize=3)
grad_y = cv2.Sobel(frame, cv2.CV_32F, 0, 1, ksize=3)
return np.sqrt(np.add(np.square(grad_x), np.square(grad_y)))
# Calculate Laplacian
def getLaplacian(frame):
return (cv2.Laplacian(frame, cv2.CV_32F, ksize=23))
# Apply the same algorithm on gradient data
def gradientTransformECC(frame1, frame2, warp_mode=None, max_iterations=None, termination_eps=None, init_warp_matrix=None):
g1 = getGradient(frame1)
g2 = getGradient(frame2)
return getTransformECC(g1, g2, warp_mode, max_iterations, termination_eps,init_warp_matrix)
# Apply the same algorithm on laplacian data
def laplacianTransformECC(frame1, frame2, warp_mode=None, max_iterations=None, termination_eps=None, init_warp_matrix=None):
l1 = getLaplacian(frame1)
l2 = getLaplacian(frame2)
return getTransformECC(l1, l2, warp_mode, max_iterations, termination_eps, init_warp_matrix)
# For solving the problem that we have warp_matrix in the coordinates(j, i)
# here(x, y) = (col, row)
# We want the warp_matrix to operate on the data relative to(j ',i')
#(i, j) =(i '-offset_row, j' - offset_col)
# We have that(Ax) ' = Ax' - Aoffset + offset
# Here offset is(offset_row, offset_col, 0)
# So the operator that acts on(i ',j') space has modified third column.
def shiftWarpMatrix(warp_matrix, offset_row, offset_col):
output_matrix = np.copy(warp_matrix)
offsetVector = np.array([offset_col, offset_row, 0], dtype=np.float32)
thirdRowAdd = -np.matmul(warp_matrix, offsetVector) + offsetVector[0:warp_matrix.shape[0]]
output_matrix[:, 2] = output_matrix[:, 2] + thirdRowAdd
return output_matrix
# Perform the 'backtransformation' of the frame to the template space
# It seems that this function takes(x, y) indexing(cols, rows)
# Interpolation mode see
#
#https://docs.opencv.org/3.1.0/da/d54/group__imgproc__transform.html#gga5bb5a1fea74ea38e1a5445ca803ff121aa5521d8e080972c762467c45f3b70e6c
def warpFrame(frame, warp_matrix, warp_mode, interpolation_mode=None):
if interpolation_mode is None:
interpolation_mode = cv2.INTER_LANCZOS4
(rows, cols) = frame.shape
if warp_mode == cv2.MOTION_HOMOGRAPHY:
frameAligned = cv2.warpPerspective(frame, warp_matrix, (cols, rows), flags=interpolation_mode + cv2.WARP_INVERSE_MAP)
else:
frameAligned = cv2.warpAffine(frame, warp_matrix, (cols, rows), flags=interpolation_mode + cv2.WARP_INVERSE_MAP)
return frameAligned
from setuptools import setup
#Following https://uoftcoders.github.io/studyGroup/lessons/python/packages/lesson/
setup(
# Needed to silence warnings (and to be a worthwhile package)
name='denpy',
url='https://gitlab.stimulate.ovgu.de/vojtech.kulvait/denpy',
author='Vojtech Kulvait',
author_email='vojtech.kulvait@ovgu.de',
# Needed to actually package something
packages=['denpy'],
# Needed for dependencies
install_requires=['numpy','pydicom'],
# *strongly* suggested for sharing
version='1.0',
# The license can be anything you like
license='GPL3',
description='Python package for DEN and DICOM processing for perfusion vizualization.',
# We will also need a readme eventually (there will be a warning)
# long_description=open('README.txt').read(),
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment