import numpy as np
from ctypes import *
from ..acquisition_base import BaseAcquisition
try:
import PySpin
except:
pass
class IRFormatType:
LINEAR_10MK = 1
LINEAR_100MK = 2
RADIOMETRIC = 3
[docs]
class FLIRThermalCamera(BaseAcquisition):
"""
Acquisition class for FLIR thermal camera (A50).
This class is adapted from examples for thermal A50 camera provided by FLIR, found on their website (login required):
https://www.flir.com/support-center/iis/machine-vision/downloads/spinnaker-sdk-download/spinnaker-sdk--download-files/
Installation steps:
- Install Spinnaker SDK (i.e. SpinnakerSDK_FULL_3.1.0.79_x64.exe, found on provided link)
- Install PySpin (python wrapper for Spinnaker SDK). On the website listed above, there are multiple build wheels listed under "Lastest Windows Python Spinnaker SDK". Choose the one that matches your python version and architecture (i.e. spinnaker_python-3.1.0.79-cp310-cp310-win_amd64.zip for python 3.10 64-bit - this is also the version used for development of this class)
"""
def __init__(self, acquisition_name=None, channel_name_IR_cam="temperature_field", IRtype='LINEAR_10MK', subsampling_multiplier=1):
"""
Args:
acquisition_name (str, optional): Name of the class. Defaults to None.
channel_name_IR_cam (str, optional): Name of the temperature field channel. Defaults to "temperature_field".
IRtype (str, optional): format type of camera mode. Defaults to 'LINEAR_10MK'. Possible modes include:
- LINEAR_10MK: 10mK temperature resolution
- LINEAR_100MK: 100mK temperature resolution
- RADIOMETRIC: capture radiometric data and manually convert to temperature
(this requires calibration coefficients, currently some
calibration values are read from the camera)
subsampling_multiplier (int, optional) : take only each n-th frame. From the thermal camera. This effectively reduces sample rate (base sample rate is 30 Hz). This is useful if application cannot keep up with the camera. Defaults to 1.
"""
try:
PySpin # check if PySpin is imported
except:
raise Exception("PySpin is not installed. Please install it from the link provided in the class documentation.")
super().__init__()
self.acquisition_name = 'FLIR' if acquisition_name is None else acquisition_name
self.buffer_dtype = np.float16 # this is used when CustomPyTrigger instance is created
self._channel_names_video_init = [channel_name_IR_cam]
self._channel_shapes_video_init = [] # is set in set_data_source()
self.set_IRtype(IRtype)
self.camera_acq_started = False
self.set_data_source()
self.subsampling_multiplier = subsampling_multiplier
self.sample_rate = 30./subsampling_multiplier
# TODO: this can probably be set in thermal camera and read from it
# default camera fps is 30.
self.n_frame = 0
# channel in set trigger is actually pixel in flatten array:
self.set_trigger(1e20, 0, duration=1.0)
# TODO:
# - set sample rate (either subsample and only acquire every n-th frame or set camera fps)
# - adjust picture resolution
# - add regular camera source
[docs]
def set_IRtype(self, IRtype):
'''This function sest the IR type to be used by the camera.
Sets the IR type to either:
- LINEAR_10MK: 10mK temperature resolution
- LINEAR_100MK: 100mK temperature resolution
- RADIOMETRIC: capture radiometric data and manually convert to temperature (currently not recommended)
Args:
IRtype (str): IR type to be used by the camera (either LINEAR_10MK, LINEAR_100MK or RADIOMETRIC)
'''
avaliable_types = [
i for i in IRFormatType.__dict__.keys() if i[:1] != '_'
]
if IRtype not in avaliable_types:
raise ValueError(
f'IRtype must be one of the following: {avaliable_types}')
self.CHOSEN_IR_TYPE = getattr(IRFormatType, IRtype)
[docs]
def set_data_source(self):
"""
Properly sets acquisition source before measurement is started.
Should be set up in a way that it is able to be called multiple times in a row without issues.
"""
if hasattr(self, 'cam'):
pass
else:
image_shape_temperature = self._init_thermal_camera()
self._channel_shapes_video_init = [image_shape_temperature]
if not self.camera_acq_started:
self.cam.BeginAcquisition()
self.camera_acq_started = True
self.n_frame = 0
super().set_data_source()
[docs]
def read_data(self):
"""
This method only reads data from the source and transforms data into standard format used by other methods.
This method is called within self.acquire() method which properly handles acquiring data and saves it into
pyTrigger ring buffer.
Must return a 2D numpy array of shape (n_samples, n_columns).
"""
# TODO: add reading of multiple samples with one call
# read thermal camera data:
shape = self.channel_shapes[ 0 ] # NOTE: channel 0 is always thermal camera
data_thermal_camera = self._read_data_thermal_camera()
if not data_thermal_camera.shape[0] > 0:
return np.empty((0, shape[0]*shape[1]))
return data_thermal_camera.reshape(-1, shape[0]*shape[1]) # NOTE: change this when regular camera is added
[docs]
def terminate_data_source(self):
"""
Properly closes acquisition source after the measurement.
Returns None.
"""
if self.camera_acq_started:
# Image acquisition must be ended when no more images are needed.
self.cam.EndAcquisition()
self.camera_acq_started = False
[docs]
def get_sample_rate(self):
"""
Returns sample rate of acquisition class.
This function is also useful to compute sample_rate estimation if no sample rate is given
Returns self.sample_rate
"""
return self.sample_rate
[docs]
def clear_buffer(self):
"""
The source buffer should be cleared with this method. Either actually clears the buffer, or
just reads the data with self.read_data() and does not add/save data anywhere.
Returns None.
"""
self.read_data()
def _read_data_thermal_camera(self):
"""Reads and retrieves data from the thermal camera.
Returns:
np.ndarray: flattened array of pixel values
"""
image_result = self.cam.GetNextImage()
# Ensure image completion
if image_result.IsIncomplete():
return np.empty((0, self.n_channels_trigger))
self.n_frame += 1
if self.n_frame != self.subsampling_multiplier:
return np.empty((0, self.n_channels_trigger))
else:
self.n_frame = 0
# Getting the image data as a np array
image_data = image_result.GetNDArray()
if self.CHOSEN_IR_TYPE == IRFormatType.LINEAR_10MK:
# Transforming the data array into a temperature array, if streaming mode is set to TemperatueLinear10mK
image_Temp_Celsius_high = (image_data * 0.01) - 273.15
image_temp = image_Temp_Celsius_high
elif self.CHOSEN_IR_TYPE == IRFormatType.LINEAR_100MK:
# Transforming the data array into a temperature array, if streaming mode is set to TemperatureLinear100mK
image_Temp_Celsius_low = (image_data * 0.1) - 273.15
image_temp = image_Temp_Celsius_low
elif self.CHOSEN_IR_TYPE == IRFormatType.RADIOMETRIC:
# Transforming the data array into a pseudo radiance array, if streaming mode is set to Radiometric.
# and then calculating the temperature array (degrees Celsius) with the full thermography formula
J0, J1, B, R, Emiss, Tau, K2, F = (self.calib_dict["J0"], self.calib_dict["J1"], self.calib_dict["B"],
self.calib_dict["R"], self.calib_dict["Emiss"], self.calib_dict["Tau"],
self.calib_dict["K2"], self.calib_dict["F"])
image_Radiance = (image_data - J0) / J1
image_temp = (B / np.log(R / ( (image_Radiance / Emiss / Tau) - K2) + F) ) - 273.15
else:
raise Exception('Unknown IRFormatType')
if image_temp.shape[0] > 0:
image_result.Release()
return image_temp
def _init_thermal_camera(self):
"""Initializes the thermal camera.
NOTE: majority of code here was adapted from FLIR's example code.
"""
# Retrieve singleton reference to system object
self.system = PySpin.System.GetInstance()
# Get current library version
self.cam_list = self.system.GetCameras()
num_cameras = self.cam_list.GetSize()
# Finish if there are no cameras
if num_cameras == 0:
# Clear camera list before releasing system
self.cam_list.Clear()
# Release system instance
self.system.ReleaseInstance()
raise ValueError('No cameras detected!')
elif num_cameras > 1:
# Clear camera list before releasing system
self.cam_list.Clear()
# Release system instance
self.system.ReleaseInstance()
raise ValueError('More than one camera detected!')
elif num_cameras < 0:
raise ValueError('Something went wrong with camera detection!')
# we have exactly one camera:
self.cam = self.cam_list.GetByIndex(0)
# Initialize camera
self.cam.Init()
self.nodemap_tldevice = self.cam.GetTLDeviceNodeMap()
# Retrieve GenICam nodemap
self.nodemap = self.cam.GetNodeMap()
sNodemap = self.cam.GetTLStreamNodeMap()
# Se buffer handling mode:
node_bufferhandling_mode = PySpin.CEnumerationPtr(sNodemap.GetNode('StreamBufferHandlingMode'))
node_pixel_format = PySpin.CEnumerationPtr(self.nodemap.GetNode('PixelFormat'))
node_pixel_format_mono16 = PySpin.CEnumEntryPtr(node_pixel_format.GetEntryByName('Mono16'))
# set pixel format:
pixel_format_mono16 = node_pixel_format_mono16.GetValue()
node_pixel_format.SetIntValue(pixel_format_mono16)
if self.CHOSEN_IR_TYPE == IRFormatType.LINEAR_10MK: # LINEAR_10MK
# This section is to be activated only to set the streaming mode to TemperatureLinear10mK
node_IRFormat = PySpin.CEnumerationPtr(self.nodemap.GetNode('IRFormat'))
node_temp_linear_high = PySpin.CEnumEntryPtr(node_IRFormat.GetEntryByName('TemperatureLinear10mK'))
node_temp_high = node_temp_linear_high.GetValue()
node_IRFormat.SetIntValue(node_temp_high)
elif self.CHOSEN_IR_TYPE == IRFormatType.LINEAR_100MK: # LINEAR_100MK
# This section is to be activated only to set the streaming mode to TemperatureLinear100mK
node_IRFormat = PySpin.CEnumerationPtr(self.nodemap.GetNode('IRFormat'))
node_temp_linear_low = PySpin.CEnumEntryPtr(node_IRFormat.GetEntryByName('TemperatureLinear100mK'))
node_temp_low = node_temp_linear_low.GetValue()
node_IRFormat.SetIntValue(node_temp_low)
elif self.CHOSEN_IR_TYPE == IRFormatType.RADIOMETRIC: # RADIOMETRIC
# This section is to be activated only to set the streaming mode to Radiometric
node_IRFormat = PySpin.CEnumerationPtr(self.nodemap.GetNode('IRFormat'))
node_temp_radiometric = PySpin.CEnumEntryPtr(node_IRFormat.GetEntryByName('Radiometric'))
node_radiometric = node_temp_radiometric.GetValue()
node_IRFormat.SetIntValue(node_radiometric)
if not PySpin.IsAvailable(node_bufferhandling_mode) or not PySpin.IsWritable(node_bufferhandling_mode):
self.terminate_data_source()
raise ValueError('Unable to set stream buffer handling mode.')
# Retrieve entry node from enumeration node
node_newestonly = node_bufferhandling_mode.GetEntryByName('NewestOnly')
if not PySpin.IsAvailable(node_newestonly) or not PySpin.IsReadable(node_newestonly):
self.terminate_data_source()
raise ValueError('Unable to set stream buffer handling mode.')
# Retrieve integer value from entry node
node_newestonly_mode = node_newestonly.GetValue()
# Set integer value from entry node as new value of enumeration node
node_bufferhandling_mode.SetIntValue(node_newestonly_mode)
try:
node_acquisition_mode = PySpin.CEnumerationPtr(self.nodemap.GetNode('AcquisitionMode'))
if not PySpin.IsAvailable(node_acquisition_mode) or not PySpin.IsWritable(node_acquisition_mode):
self.terminate_data_source()
raise ValueError('Unable to set acquisition mode to continuous.')
# Retrieve entry node from enumeration node
node_acquisition_mode_continuous = node_acquisition_mode.GetEntryByName('Continuous')
if not PySpin.IsAvailable(node_acquisition_mode_continuous) or not PySpin.IsReadable(node_acquisition_mode_continuous):
self.terminate_data_source()
raise ValueError('Unable to set acquisition mode to continuous (entry retrieval).')
# Retrieve integer value from entry node
acquisition_mode_continuous = node_acquisition_mode_continuous.GetValue()
# Set integer value from entry node as new value of enumeration node
node_acquisition_mode.SetIntValue(acquisition_mode_continuous)
# Retrieve device serial number for filename
#
# *** NOTES ***
# The device serial number is retrieved in order to keep cameras from
# overwriting one another. Grabbing image IDs could also accomplish
# this.
device_serial_number = ''
node_device_serial_number = PySpin.CStringPtr(self.nodemap_tldevice.GetNode('DeviceSerialNumber'))
if PySpin.IsAvailable(node_device_serial_number) and PySpin.IsReadable(node_device_serial_number):
device_serial_number = node_device_serial_number.GetValue()
# Retrieve Calibration details
self.calib_dict = {}
CalibrationQueryR_node = PySpin.CFloatPtr(self.nodemap.GetNode('R'))
self.calib_dict['R'] = CalibrationQueryR_node.GetValue()
CalibrationQueryB_node = PySpin.CFloatPtr(self.nodemap.GetNode('B'))
self.calib_dict['B'] = CalibrationQueryB_node.GetValue()
CalibrationQueryF_node = PySpin.CFloatPtr(self.nodemap.GetNode('F'))
self.calib_dict['F'] = CalibrationQueryF_node.GetValue()
CalibrationQueryX_node = PySpin.CFloatPtr(self.nodemap.GetNode('X'))
self.calib_dict['X'] = CalibrationQueryX_node.GetValue()
CalibrationQueryA1_node = PySpin.CFloatPtr(self.nodemap.GetNode('alpha1'))
self.calib_dict['A1'] = CalibrationQueryA1_node.GetValue()
CalibrationQueryA2_node = PySpin.CFloatPtr(self.nodemap.GetNode('alpha2'))
self.calib_dict['A2'] = CalibrationQueryA2_node.GetValue()
CalibrationQueryB1_node = PySpin.CFloatPtr(self.nodemap.GetNode('beta1'))
self.calib_dict['B1'] = CalibrationQueryB1_node.GetValue()
CalibrationQueryB2_node = PySpin.CFloatPtr(self.nodemap.GetNode('beta2'))
self.calib_dict['B2'] = CalibrationQueryB2_node.GetValue()
CalibrationQueryJ1_node = PySpin.CFloatPtr(self.nodemap.GetNode('J1')) # Gain
self.calib_dict['J1'] = CalibrationQueryJ1_node.GetValue()
CalibrationQueryJ0_node = PySpin.CIntegerPtr(self.nodemap.GetNode('J0')) # Offset
self.calib_dict['J0'] = CalibrationQueryJ0_node.GetValue()
if self.CHOSEN_IR_TYPE == IRFormatType.RADIOMETRIC:
# Object Parameters. For this demo, they are imposed!
# This section is important when the streaming is set to radiometric and not TempLinear
# Image of temperature is calculated computer-side and not camera-side
# Parameters can be set to the whole image, or for a particular ROI (not done here)
Emiss = 0.97
TRefl = 293.15
TAtm = 293.15
TAtmC = TAtm - 273.15
Humidity = 0.55
Dist = 2
ExtOpticsTransmission = 1
ExtOpticsTemp = TAtm
R, B, F, X, A1, A2, B1, B2, J1, J0 = (self.calib_dict['R'], self.calib_dict['B'], self.calib_dict['F'], self.calib_dict['X'],
self.calib_dict['A1'], self.calib_dict['A2'], self.calib_dict['B1'], self.calib_dict['B2'],
self.calib_dict['J1'], self.calib_dict['J0'])
H2O = Humidity * np.exp(1.5587 + 0.06939 * TAtmC -
0.00027816 * TAtmC * TAtmC +
0.00000068455 * TAtmC * TAtmC * TAtmC)
Tau = X * np.exp(-np.sqrt(Dist) *
(A1 + B1 * np.sqrt(H2O))) + (1 - X) * np.exp(
-np.sqrt(Dist) * (A2 + B2 * np.sqrt(H2O)))
# Pseudo radiance of the reflected environment
r1 = ((1 - Emiss) / Emiss) * (R / (np.exp(B / TRefl) - F))
# Pseudo radiance of the atmosphere
r2 = ((1 - Tau) / (Emiss * Tau)) * (R / (np.exp(B / TAtm) - F))
# Pseudo radiance of the external optics
r3 = ((1 - ExtOpticsTransmission) /
(Emiss * Tau * ExtOpticsTransmission)) * (
R / (np.exp(B / ExtOpticsTemp) - F))
K2 = r1 + r2 + r3
self.calib_dict['K2'] = K2
self.calib_dict['Emiss'] = Emiss
self.calib_dict['Tau'] = Tau
self.node_width = PySpin.CIntegerPtr(self.nodemap.GetNode('Width'))
self.node_height = PySpin.CIntegerPtr(self.nodemap.GetNode('Height'))
self.offsetX = PySpin.CIntegerPtr(self.nodemap.GetNode('OffsetX'))
self.offsetY = PySpin.CIntegerPtr(self.nodemap.GetNode('OffsetY'))
image_shape = (self.node_height.GetMax() - self.offsetY.GetMax(), self.node_width.GetValue()-self.offsetX.GetMax())
return image_shape
except PySpin.SpinnakerException as ex:
raise Exception('Error: %s' % ex)
def _exit_thermal_camera(self):
self.cam.DeInit()
del self.cam
# Clear camera list before releasing system
self.cam_list.Clear()
# Release system instance
self.system.ReleaseInstance()