Source code for LDAQ.national_instruments.generation
import os
import numpy as np
import time
import copy
from typing import Optional, Union
try:
from .daqtask import DAQTask
from .ni_task import NITaskOutput
except:
pass
from ..generation_base import BaseGeneration
[docs]
class NIGeneration(BaseGeneration):
def __init__(self, task_name, signal=None, generation_name=None):
"""NI Generation class used for generating signals.
Args:
task_name (str, class object): Name of the task from NI Max or class object created with NITaskOutput() class using nidaqmx library.
signal (numpy.ndarray): Signal to be generated. Shape is ``(n_samples, n_channels)`` or ``(n_samples,)``.
generation_name (str, optional): Name of the generation class. Defaults to None, in which case the task name is used.
"""
super().__init__()
self.task_name = task_name
if signal is not None:
self.set_generation_signal(signal)
self.task_terminated = True
self.task_base = task_name
if isinstance(task_name, str):
self.NITask_used = False
elif isinstance(task_name, NITaskOutput):
self.NITask_used = True
else:
raise TypeError("task_name has to be a string or NITaskOutput object.")
self.set_data_source(initiate=False)
self.generation_name = task_name if generation_name is None else generation_name
[docs]
def set_generation_signal(self, signal):
"""sets signal that will be generated, and repeated in a loop.
Args:
signal (np.ndarray): numpy array with shape ``(n_samples, n_channels)`` or ``(n_samples,)``.
"""
self.signal = signal
if self.signal.ndim > 1:
self.signal = self.signal.T
[docs]
def set_data_source(self, initiate=True):
"""Sets the data source for the generation.
Args:
initiate (bool, optional): intitiate NI task. Defaults to True.
"""
if self.task_terminated:
if self.NITask_used:
channels_base = copy.deepcopy(self.task_base.channels)
self.Task = NITaskOutput(self.task_base.task_name, self.task_base.sample_rate)
self.task_name = self.task_base.task_name
self.Task.channels = channels_base
else:
self.Task = DAQTask(self.task_base)
self.task_terminated = False
if self.NITask_used and initiate:
if not hasattr(self.Task, 'task'):
self.Task.initiate()
[docs]
def terminate_data_source(self):
"""Terminates the data source for the generation.
"""
self.task_terminated = True
self.clear_task()
[docs]
def generate(self):
"""Generates the signal.
"""
if self.signal is None:
raise ValueError("No signal set for generation.")
self.Task.generate(self.signal, clear_task=False)
[docs]
def clear_task(self):
"""Clears NI output task.
"""
if hasattr(self, 'Task'):
self.Task.clear_task(wait_until_done=False)
# generate zeros
self.set_data_source()
if self.signal.ndim == 1:
zero_signal = np.zeros(self.signal.shape[0])
else:
zero_signal = np.zeros((self.signal.shape[0], 10))
self.Task.generate(zero_signal, clear_task=False)
self.Task.clear_task(wait_until_done=False)
self.task_terminated = True
del self.Task
# def run_generation(self):
# """Runs the signal generation.
# """
# self.is_running = True
# self.set_data_source()
# self.generate()