Source code for LDAQ.national_instruments.acquisition
import numpy as np
import time
import copy
try:
from PyDAQmx.DAQmxFunctions import *
from PyDAQmx.Task import Task
from nidaqmx._lib import lib_importer
from .daqtask import DAQTask
except:
pass
import typing
from ctypes import *
from .ni_task import NITask
from ..acquisition_base import BaseAcquisition
#TODO: remove pyDAQmx completely and use only nidaqmx
[docs]
class NIAcquisition(BaseAcquisition):
"""National Instruments Acquisition class, compatible with any NI acquisition device that is supported by NI-DAQmx library.
To use this class, you need to install NI-DAQmx library found on this link:
https://www.ni.com/en/support/downloads/drivers/download.ni-daq-mx.html#494676
Installation instructions:
- Download NI-DAQmx from the link listed above.
- Install NI-DAQmx.
"""
def __init__(self, task_name: typing.Union[str, object], acquisition_name: typing.Optional[str] = None) -> None:
"""Initialize the acquisition task.
Args:
task_name (str, class object): Name of the task from NI Max or class object created with NITask() class using nidaqmx library.
acquisition_name (str, optional): Name of the acquisition. Defaults to None, in which case the task name is used.
"""
super().__init__()
try:
DAQmxClearTask(taskHandle_acquisition)
except:
pass
try:
lib_importer.windll.DAQmxClearTask(taskHandle_acquisition)
except:
pass
self.task_terminated = True
self.task_base = task_name
if isinstance(task_name, str):
self.NITask_used = False
self.task_name = task_name
elif isinstance(task_name, NITask):
self.NITask_used = True
self.task_name = self.task_base.task_name
else:
raise TypeError("task_name has to be a string or NITask object.")
self.set_data_source() # the data source must be set to red the number of channels and sample rate
self.acquisition_name = self.task_name if acquisition_name is None else acquisition_name
self.sample_rate = self.Task.sample_rate
self._channel_names_init = self.Task.channel_list
self.terminate_data_source() # clear the data source, will be set up later
# if not self.NITask_used:
# glob_vars = globals()
# glob_vars['taskHandle_acquisition'] = self.Task.taskHandle
# set default trigger, so the signal will not be trigered:
self.set_trigger(1e20, 0, duration=1.0)
[docs]
def clear_task(self):
"""Clear a task."""
if hasattr(self, "Task"):
self.Task.clear_task(wait_until_done=False)
time.sleep(0.1)
del self.Task
else:
pass
[docs]
def terminate_data_source(self):
"""Properly closes the data source.
"""
self.task_terminated = True
self.clear_task()
[docs]
def read_data(self):
"""Reads data from device buffer and returns it.
Returns:
np.ndarray: numpy array with shape (n_samples, n_channels)
"""
self.Task.acquire(wait_4_all_samples=False)
return self.Task.data.T
[docs]
def clear_buffer(self):
"""
Clears the buffer of the device.
"""
self.Task.acquire_base()
[docs]
def set_data_source(self):
"""Sets the acquisition device to properly start the acquisition. This function is called before the acquisition is started.
It is used to properly initialize the device and set the data source channels and virtual channels.
"""
if self.task_terminated:
if self.NITask_used:
channels_base = copy.deepcopy(self.task_base.channels)
self.Task = NITask(self.task_base.task_name, self.task_base.sample_rate, self.task_base.settings_file)
self.task_name = self.task_base.task_name
for channel_name, channel in channels_base.items():
self.Task.add_channel(
channel_name,
channel['device_ind'],
channel['channel_ind'],
channel['sensitivity'],
channel['sensitivity_units'],
channel['units'],
channel['serial_nr'],
channel['scale'],
channel['min_val'],
channel['max_val'])
else:
self.Task = DAQTask(self.task_base)
self.task_terminated = False
if self.NITask_used:
if not hasattr(self.Task, 'task'):
self.Task.initiate()
super().set_data_source()
[docs]
def run_acquisition(self, run_time=None, run_in_background=False):
"""
Runs acquisition. This is the method one should call to start the acquisition.
Args:
run_time (float): number of seconds for which the acquisition will run.
run_in_background (bool): if True, acquisition will run in a separate thread.
Returns:
None
"""
if self.NITask_used:
BaseAcquisition.all_acquisitions_ready = False
self.is_ready = False
self.is_running = True
if run_time is None:
self._set_trigger_instance()
else:
self.update_trigger_parameters(duration=run_time, duration_unit='seconds')
self.set_data_source()
glob_vars = globals()
glob_vars['taskHandle_acquisition'] = self.Task.taskHandle
super().run_acquisition(run_time, run_in_background=run_in_background)