Source code for LDAQ.national_instruments.generation

from __future__ import annotations

import numpy as np

try:
    import nidaqwrapper
    from nidaqwrapper import AOTask
    _NIDAQWRAPPER_AVAILABLE = True
except ImportError:
    _NIDAQWRAPPER_AVAILABLE = False

from ..generation_base import BaseGeneration


[docs] class NIGeneration(BaseGeneration): """ Signal generation for NI analog output devices via nidaqwrapper.AOTask. This is a thin wrapper: channel configuration and timing are owned by the AOTask object passed in. NIGeneration is responsible only for lifecycle management (start, generate, safe-stop) and signal storage. Parameters ---------- task : AOTask | str Either a configured ``nidaqwrapper.AOTask`` instance, or the name of an NI MAX task (str) which is resolved at construction time via ``nidaqwrapper.get_task_by_name()``. signal : numpy.ndarray | None, optional Signal to output. Shape ``(n_samples, n_channels)`` or ``(n_samples,)`` for a single channel. Can also be set later with ``set_generation_signal()``. Default is None. generation_name : str | None, optional Human-readable name for this generation source. Defaults to the task name reported by the AOTask. Raises ------ ImportError If ``nidaqwrapper`` is not installed. TypeError If ``task`` is neither an ``AOTask`` instance nor a ``str``. ValueError If a string task name cannot be resolved to an NI MAX task. Examples -------- >>> ao = AOTask("my_ao_task", sample_rate=10_000) >>> ao.add_channel(...) >>> gen = NIGeneration(ao, signal=my_signal) >>> gen = NIGeneration("MyNIMaxTask", signal=my_signal) """ def __init__( self, task: AOTask | str, signal: np.ndarray | None = None, generation_name: str | None = None, ) -> None: if not _NIDAQWRAPPER_AVAILABLE: raise ImportError( "nidaqwrapper is not installed. " "Install it before using NIGeneration." ) super().__init__() if isinstance(task, AOTask): self._ao_task: AOTask = task elif isinstance(task, str): ni_task = nidaqwrapper.get_task_by_name(task) if ni_task is None: raise ValueError( f"NI MAX task '{task}' not found. " "Verify the task name in NI MAX." ) self._ao_task = AOTask.from_task(ni_task) else: raise TypeError( f"task must be an AOTask instance or a str task name, " f"got {type(task).__name__!r}." ) self.generation_name = ( generation_name if generation_name is not None else self._ao_task.task_name ) # Internal flag so terminate_data_source() is idempotent. self._task_active: bool = False # Cache channel count at construction so terminate_data_source() does # not rely on the attribute name being accessible at shutdown time. self._n_channels: int = self._ao_task.number_of_ch # Populated by set_generation_signal(); used to size the zeros burst # in terminate_data_source() even if self.signal is later cleared. self._n_samples_signal: int = 10 # safe fallback before any signal is set self.signal: np.ndarray | None = None if signal is not None: self.set_generation_signal(signal) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def set_generation_signal(self, signal: np.ndarray) -> None: """ Store the signal that will be written on each ``generate()`` call. nidaqmx expects shape ``(n_channels, n_samples)`` for multi-channel output. If a 2-D array with shape ``(n_samples, n_channels)`` is provided it is transposed automatically. 1-D arrays are kept as-is. Parameters ---------- signal : numpy.ndarray Output signal. Shape ``(n_samples, n_channels)`` or ``(n_samples,)`` for a single channel. """ if signal.ndim == 2: # Convert from (n_samples, n_channels) → (n_channels, n_samples). self.signal = signal.T else: self.signal = signal # Record sample count from the user-facing shape (first dim is always # n_samples in the input convention) so terminate can replicate it. self._n_samples_signal = signal.shape[0]
[docs] def set_data_source(self) -> None: """ Start the AOTask so it is ready to accept ``generate()`` calls. Safe to call multiple times; subsequent calls are no-ops while the task is already active. """ if not self._task_active: self._ao_task.start(start_task=True) self._task_active = True
[docs] def generate(self) -> None: """ Write the stored signal to the analog output hardware. Raises ------ ValueError If no signal has been set via the constructor or ``set_generation_signal()``. """ if self.signal is None: raise ValueError( "No signal set. Call set_generation_signal() before generate()." ) self._ao_task.generate(self.signal)
[docs] def terminate_data_source(self) -> None: """ Write zeros to all output channels, then stop the NI task. Writing zeros first ensures the physical outputs are left at a safe 0 V state rather than holding the last generated value. The task handle is kept alive (``stop()`` rather than ``clear_task()``) so that ``set_data_source()`` can restart it in a subsequent cycle. Idempotent: calling this method on an already-terminated task is a no-op. """ if not self._task_active: return # Drive outputs to 0 V before stopping. Use cached counts so this # does not depend on any mutable state being consistent at shutdown. if self._n_channels > 1: zeros = np.zeros((self._n_channels, self._n_samples_signal)) else: zeros = np.zeros(self._n_samples_signal) self._ao_task.generate(zeros) # Stop without destroying the handle so the task can be restarted. self._ao_task.task.stop() self._task_active = False