Source code for LDAQ.national_instruments.acquisition

from __future__ import annotations

import numpy as np

from ..acquisition_base import BaseAcquisition

_NIDAQWRAPPER_AVAILABLE = False
try:
    from nidaqwrapper import AITask, get_task_by_name
    _NIDAQWRAPPER_AVAILABLE = True
except ImportError:
    pass


[docs] class NIAcquisition(BaseAcquisition): """Acquisition class for National Instruments devices via nidaqwrapper. A thin wrapper around ``nidaqwrapper.AITask`` that satisfies the ``BaseAcquisition`` contract. Supports both programmatic tasks (``AITask`` objects) and tasks defined in NI MAX (task name strings). Parameters ---------- task : nidaqwrapper.AITask or str Either a fully configured ``AITask`` instance or the name of a task saved in NI MAX. When a string is supplied the task is loaded via ``nidaqwrapper.get_task_by_name()``. acquisition_name : str or None, optional Human-readable name for this acquisition source. Defaults to the underlying NI task name when ``None``. Raises ------ ImportError If the ``nidaqwrapper`` package is not installed. TypeError If ``task`` is not an ``AITask`` instance or a string. Examples -------- Wrap a programmatically created task: >>> ai = AITask("my_task", sample_rate=10_000) >>> ai.add_channel(...) >>> acq = NIAcquisition(ai) Load a task saved in NI MAX: >>> acq = NIAcquisition("MyNIMaxTask") """ def __init__( self, task: AITask | str, acquisition_name: str | None = None, ) -> None: """ Initialize NIAcquisition. Parameters ---------- task : nidaqwrapper.AITask or str Source task: either an ``AITask`` object or an NI MAX task name. acquisition_name : str or None, optional Name for this acquisition source. Uses the task name when None. Raises ------ ImportError If ``nidaqwrapper`` is not installed. TypeError If ``task`` is neither an ``AITask`` nor a string. """ if not _NIDAQWRAPPER_AVAILABLE: raise ImportError( "nidaqwrapper is not installed. " "Install it before using NIAcquisition." ) super().__init__() if isinstance(task, AITask): self._ai_task: AITask = task elif isinstance(task, str): ni_task = get_task_by_name(task) if ni_task is None: raise ValueError(f"NI MAX task '{task}' not found.") self._ai_task = AITask.from_task(ni_task) else: raise TypeError( f"task must be an AITask instance or a string (NI MAX task name), " f"got {type(task).__name__!r}." ) self.acquisition_name = ( acquisition_name if acquisition_name is not None else self._ai_task.task_name ) self.sample_rate = self._ai_task.sample_rate self._channel_names_init = list(self._ai_task.channel_list) self._task_active = False self._set_all_channels() # populate channel_names_all before set_trigger self.set_trigger(1e20, 0, duration=1.0)
[docs] def set_data_source(self) -> None: """Start the underlying AITask in preparation for acquisition. Calls ``super().set_data_source()`` at the end to satisfy the ``BaseAcquisition`` contract. """ if self._task_active: return self._ai_task.start() self._task_active = True super().set_data_source()
[docs] def terminate_data_source(self) -> None: """Stop the NIDAQmx task. Safe to call multiple times; subsequent calls when the task is already stopped are silently ignored. """ if not self._task_active: return self._ai_task.task.stop() self._task_active = False
[docs] def read_data(self) -> np.ndarray: """Read all currently available samples from the device buffer. Returns ------- np.ndarray 2-D array of shape ``(n_samples, n_channels)``. Returns an empty array of shape ``(0, n_channels)`` when no data is available. """ n_channels = len(self._channel_names_init) raw = self._ai_task.acquire(n_samples=None) if raw is None or raw.size == 0: return np.empty((0, n_channels)) return raw
[docs] def clear_buffer(self) -> None: """Read and discard all data currently in the device buffer.""" self._ai_task.acquire(n_samples=None)