Custom Acquisitions#
General Guidelines#
It is possible to create custom acquisition classes that can be used with the LDAQ library. This is done by subclassing the LDAQ.acquisition_base.BaseAcquisition
class and implementing
the abstract methods. The following is a list of guidelines that should be followed when creating custom acquisition classes:
- class LDAQ.acquisition_base.BaseAcquisition[source]
Parent acquisition class that should be used when creating new child acquisition source class. Child class should override methods the following methods:
self.__init__()
self.set_data_source()
self.terminate_data_source()
self.read_data()
self.clear_buffer() (optional)
self.get_sample_rate() (optional)
For further information on how to override these methods, see the listed methods docstrings.
Additionally, the __init__() or set_data_source() methods should override or be able to set the following attributes:
self._channel_names_init - list of original data channels names from source
self._channel_names_video_init - list of original video channels names from source
self._channel_shapes_video_init - list of original video channels shapes from source
self.sample_rate = 0 - sample rate of acquisition source
- clear_buffer() None [source]
EDIT in child class (Optional).
The source buffer should be cleared with this method. It can either clear the buffer, or just read the data with self.read_data() and does not add/save data anywhere. By default, this method will read the data from the source and not add/save data anywhere.
Returns None.
- get_sample_rate() float [source]
EDIT in child class (Optional).
Returns sample rate of acquisition class.
- read_data() ndarray [source]
EDIT in child class.
This method only reads data from the source and transforms data into standard format used by other methods. It is called within self.acquire() method which properly handles acquiring data and saves it into pyTrigger ring buffer.
Must ALWAYS return a 2D numpy array of shape (n_samples, n_columns).
IMPORTANT: If some of the channels are videos (2D array - so shape is (n_samples, n_pixels_width, n_pixels_height)), then data has to be reshaped to shape (n_samples, n_pixels_width*n_pixels_height). Then data from multiple sources have to be concatenated into one array of shape (n_samples, n_cols), where cols is the combined number of pixel of all video sources and number of channels in data sources.
For an example where data source has 2 video sources with resolution 300x200 and 2 data channels, the final shape returned by this methods should be (n_samples, 300*200*2+2).
For video sources, the shape of the video is automatically stored in self.channel_shapes_video_init when self.set_data_source() is called. When data is retrieved from the source, it is reshaped to (n_samples, n_pixels_width, n_pixels_height).
- Returns:
2D numpy array of shape (n_samples, n_columns)
- Return type:
data (np.ndarray)
- terminate_data_source() None [source]
EDIT in child class.
Properly closes/disconnects acquisition source after the measurement. The method should be able to handle mutliple calls in a row.
Example#
In the example below, the source code of National Instrument acquisition class is shown.
1import numpy as np
2import time
3import copy
4
5try:
6 from PyDAQmx.DAQmxFunctions import *
7 from PyDAQmx.Task import Task
8 from nidaqmx._lib import lib_importer
9 from .daqtask import DAQTask
10except:
11 pass
12
13import typing
14
15from ctypes import *
16
17from .ni_task import NITask
18from ..acquisition_base import BaseAcquisition
19
20#TODO: remove pyDAQmx completely and use only nidaqmx
21class NIAcquisition(BaseAcquisition):
22 """National Instruments Acquisition class, compatible with any NI acquisition device that is supported by NI-DAQmx library.
23
24 To use this class, you need to install NI-DAQmx library found on this link:
25 https://www.ni.com/en/support/downloads/drivers/download.ni-daq-mx.html#494676
26
27 Installation instructions:
28
29 - Download NI-DAQmx from the link listed above.
30
31 - Install NI-DAQmx.
32 """
33
34 def __init__(self, task_name: typing.Union[str, object], acquisition_name: typing.Optional[str] = None) -> None:
35 """Initialize the acquisition task.
36
37 Args:
38 task_name (str, class object): Name of the task from NI Max or class object created with NITask() class using nidaqmx library.
39 acquisition_name (str, optional): Name of the acquisition. Defaults to None, in which case the task name is used.
40 """
41 super().__init__()
42
43 try:
44 DAQmxClearTask(taskHandle_acquisition)
45 except:
46 pass
47
48 try:
49 lib_importer.windll.DAQmxClearTask(taskHandle_acquisition)
50 except:
51 pass
52
53 self.task_terminated = True
54
55 self.task_base = task_name
56 if isinstance(task_name, str):
57 self.NITask_used = False
58 self.task_name = task_name
59 elif isinstance(task_name, NITask):
60 self.NITask_used = True
61 self.task_name = self.task_base.task_name
62 else:
63 raise TypeError("task_name has to be a string or NITask object.")
64
65 self.set_data_source() # the data source must be set to red the number of channels and sample rate
66 self.acquisition_name = self.task_name if acquisition_name is None else acquisition_name
67
68 self.sample_rate = self.Task.sample_rate
69 self._channel_names_init = self.Task.channel_list
70
71 self.terminate_data_source() # clear the data source, will be set up later
72
73 # if not self.NITask_used:
74 # glob_vars = globals()
75 # glob_vars['taskHandle_acquisition'] = self.Task.taskHandle
76
77 # set default trigger, so the signal will not be trigered:
78 self.set_trigger(1e20, 0, duration=1.0)
79
80 def clear_task(self):
81 """Clear a task."""
82 if hasattr(self, "Task"):
83 self.Task.clear_task(wait_until_done=False)
84 time.sleep(0.1)
85 del self.Task
86 else:
87 pass
88
89 def terminate_data_source(self):
90 """Properly closes the data source.
91 """
92 self.task_terminated = True
93 self.clear_task()
94
95 def read_data(self):
96 """Reads data from device buffer and returns it.
97
98 Returns:
99 np.ndarray: numpy array with shape (n_samples, n_channels)
100 """
101 self.Task.acquire(wait_4_all_samples=False)
102 return self.Task.data.T
103
104 def clear_buffer(self):
105 """
106 Clears the buffer of the device.
107 """
108 self.Task.acquire_base()
109
110 def set_data_source(self):
111 """Sets the acquisition device to properly start the acquisition. This function is called before the acquisition is started.
112 It is used to properly initialize the device and set the data source channels and virtual channels.
113 """
114 if self.task_terminated:
115 if self.NITask_used:
116 channels_base = copy.deepcopy(self.task_base.channels)
117 self.Task = NITask(self.task_base.task_name, self.task_base.sample_rate, self.task_base.settings_file)
118 self.task_name = self.task_base.task_name
119
120 for channel_name, channel in channels_base.items():
121 self.Task.add_channel(
122 channel_name,
123 channel['device_ind'],
124 channel['channel_ind'],
125 channel['sensitivity'],
126 channel['sensitivity_units'],
127 channel['units'],
128 channel['serial_nr'],
129 channel['scale'],
130 channel['min_val'],
131 channel['max_val'])
132 else:
133 self.Task = DAQTask(self.task_base)
134
135 self.task_terminated = False
136
137 if self.NITask_used:
138 if not hasattr(self.Task, 'task'):
139 self.Task.initiate()
140
141 super().set_data_source()
142
143 def run_acquisition(self, run_time=None, run_in_background=False):
144 """
145 Runs acquisition. This is the method one should call to start the acquisition.
146
147 Args:
148 run_time (float): number of seconds for which the acquisition will run.
149 run_in_background (bool): if True, acquisition will run in a separate thread.
150
151 Returns:
152 None
153 """
154 if self.NITask_used:
155 BaseAcquisition.all_acquisitions_ready = False
156 self.is_ready = False
157 self.is_running = True
158
159 if run_time is None:
160 self._set_trigger_instance()
161 else:
162 self.update_trigger_parameters(duration=run_time, duration_unit='seconds')
163
164 self.set_data_source()
165 glob_vars = globals()
166 glob_vars['taskHandle_acquisition'] = self.Task.taskHandle
167
168 super().run_acquisition(run_time, run_in_background=run_in_background)