import numpy as np
import time
import multiprocessing as mp
import threading as th
import dill as pickle
import psutil
import os
from ..acquisition_base import BaseAcquisition
[docs]
class SimulatedAcquisition(BaseAcquisition):
"""
Simulated acquisition class that can be used when no source is present.
"""
def __init__(self, acquisition_name=None, multi_processing=False):
"""
Args:
acquisition_name (str, optional): Name of the acquisition. Defaults to None, in which case the name "Simulator" is used.
multi_processing (bool, optional): If True, the data generation process is run in a separate process. Defaults to True.
If False, data generation is executed in separate thread.
"""
super().__init__()
self.acquisition_name = 'Simulator' if acquisition_name is None else acquisition_name
self._channel_names_init = [] # list of original data channels names from source
self._channel_names_video_init = [] # list of original video channels names from source
self._channel_shapes_video_init = [] # list of original video channels shapes from source
# Thread-safe flag for tracking child process/thread state
self._process_lock = th.Lock()
self.child_process_started = False
# TODO: fix multiprocessing feature in the future
if multi_processing == True:
raise ValueError("Multi-processing is currently not supported. Use multi_processing=False.")
self.multi_processing = multi_processing
def __del__(self):
"""If class is deleted, stop the data generation process.
"""
with self._process_lock:
if self.child_process_started:
self.stop_event.set()
self.process.join()
self.child_process_started = False
[docs]
def set_simulated_data(self, fun_or_array, channel_names=None, sample_rate=None, args=()):
"""sets simulated data to be returned by read_data() method.
This should also update self._channel_names_init list.
NOTE: The function 'fun' should also include all library imports needed for its execution if multiprocessing=True. This is due to serialization limitations of the function
of 'dill' library in order to be able to pass the function to the child process. For example, if the function 'fun' uses numpy, it should be imported.
Args:
fun_or_array (function, np.ndarray): function that returns numpy array with shape (n_samples, n_channels) or numpy array with shape (n_samples, n_channels) that
will be repeated in a loop.
channel_names (list, optional): list of channel names. Defaults to None, in which case the names "channel_0", "channel_1", ... are used.
sample_rate (int, optional): sample rate of the simulated data. Defaults to None, in which case the sample rate of 1000 Hz is used.
args (tuple, optional): arguments for the function. Defaults to ().
Example:
>>> def simulate_signal(t, f1, f2):
>>> import numpy as np
>>> sig1 = np.sin(2*np.pi*f1*t) + np.random.rand(len(t))*0.3
>>> sig2 = np.cos(2*np.pi*f2*t) + np.random.rand(len(t))*0.3
>>> return np.array([sig1, sig2]).T
>>>
>>> acq_simulated = LDAQ.simulator.SimulatedAcquisition(acquisition_name='sim')
>>> acq_simulated.set_simulated_data(simulate_signal, channel_names=["ch1", "ch2"], sample_rate=100000, args=(84, 120))
>>> acq_simulated.run_acquisition(5.0)
"""
self._channel_names_init = [] # list of original data channels names from source
self._channel_names_video_init = [] # list of original video channels names from source
self._channel_shapes_video_init = [] # list of original video channels shapes from source
self._channel_names_init = channel_names
self.sample_rate = 1000 if sample_rate is None else sample_rate
if isinstance(fun_or_array, np.ndarray):
data = fun_or_array
self.simulated_function = data
self._args = ()
elif callable(fun_or_array): # function
fun = fun_or_array
self.simulated_function = fun
self._args = args
time_array = np.arange(self.sample_rate)/self.sample_rate
data = fun(time_array, *self._args)
else:
raise ValueError("fun_or_array must be either function or numpy array.")
if data.ndim == 2:
if channel_names is None:
self._channel_names_init = [f"channel_{i}" for i in range(data.shape[1])]
if data.shape[1] != len(self._channel_names_init):
raise ValueError("Number of channels in data and channel_names does not match.")
else:
raise ValueError("Data must be 2D array.")
self.set_data_source(initiate_data_source=False)
self.set_trigger(1e20, 0)
[docs]
def set_simulated_video(self, fun_or_array, channel_name_video=None, sample_rate=None, args=()):
"""sets simulated video to be returned by read_data() method.
This should also update self._channel_names_video_init and self._channel_shapes_video_init lists.
NOTE: if simulator acqusition is running with multi_processing=True, The function 'fun' should also include all library imports needed for its execution.
This is due to serialization limitations of the function of 'dill' library in order to be able to pass the function to the child process.
For example, if the function 'fun' uses numpy, it should be imported.
Args:
fun_or_array (function, np.ndarray): function that returns numpy array with shape (n_samples, width, height) or numpy array with shape (n_samples, width, height) that
will be repeated in a loop.
channel_name_video (str, optional): name of the video channel. Defaults to None, in which case the name "video" is used.
sample_rate (int, optional): sample rate of the simulated data. Defaults to None, in which case the sample rate of 30 Hz is used.
args (tuple, optional): arguments for the function. Defaults to ().
"""
self._channel_names_init = [] # list of original data channels names from source
self._channel_names_video_init = [] # list of original video channels names from source
self._channel_shapes_video_init = [] # list of original video channels shapes from source
self.sample_rate = 30 if sample_rate is None else sample_rate
if isinstance(fun_or_array, np.ndarray):
data = fun_or_array
self.simulated_function = data
self._args = ()
elif callable(fun_or_array): # function
fun = fun_or_array
self.simulated_function = fun
self._args = args
time_array = np.arange(self.sample_rate)/self.sample_rate
data = fun(time_array, *self._args)
else:
raise ValueError("fun_or_array must be either function or numpy array.")
if data.ndim == 3:
if channel_name_video is None:
self._channel_names_video_init = ["video_channel"]
else:
self._channel_names_video_init = [channel_name_video]
self._channel_shapes_video_init = [data.shape[1:]]
else:
raise ValueError("Data must be 3D array.")
self.set_data_source(initiate_data_source=False)
self.set_trigger(1e20, 0)
[docs]
def set_data_source(self, initiate_data_source=True):
"""
Initializes simulated data source
"""
if initiate_data_source:
with self._process_lock:
if not self.child_process_started:
if self.multi_processing:
self._set_data_source_multiprocessing()
else:
self._set_data_source_threading()
super().set_data_source()
def _set_data_source_multiprocessing(self):
"""
Initialize multiprocessing-based data source.
Note: Called from set_data_source() which holds _process_lock.
"""
# Create a Pipe for communication between processes
self.parent_conn, self.child_conn = mp.Pipe()
# Event to signal stop of generation of simulated data:
self.stop_event = mp.Event()
# serialize function using pickle:
ser_simulated_fun = pickle.dumps(self.simulated_function)
self.child_process_started = True
self.process = mp.Process(target=self.data_generator_multiprocessing, args=(self.child_conn, self.stop_event, self.sample_rate, ser_simulated_fun, self._args))
self.process.start()
def _set_data_source_threading(self):
"""
Initialize threading-based data source.
Note: Called from set_data_source() which holds _process_lock.
"""
self.lock_retrieve_data = th.Lock()
self.stop_event = th.Event()
self.child_process_started = True
self.process = th.Thread(target=self.data_generator_threading, args=(self.stop_event, self.sample_rate, self.simulated_function, self._args))
self.process.start()
[docs]
def terminate_data_source(self):
"""
Terminates simulated data source
"""
with self._process_lock:
if self.child_process_started: # TODO: add logic to check if something has changed in the data source
# if yes, then reset the data source, otherwise do not terminate it
self.stop_event.set()
# Wait for the process to finish
self.process.join()
self.child_process_started = False
[docs]
def read_data(self):
"""Reads data from simulated data source.
Returns:
np.ndarray: data from serial port with shape (n_samples, n_channels).
"""
time.sleep(0.05)
if self.multi_processing:
data = self._read_data_multiprocessing()
else:
data = self._read_data_threading()
return data
def _read_data_threading(self):
"""Reads data from buffer when threading is used."""
with self.lock_retrieve_data:
if len(self.buffer) > 0:
data = np.vstack(self.buffer)
self.buffer.clear()
else:
data = np.empty((0, self.n_channels_trigger))
return data
def _read_data_multiprocessing(self):
"""Reads data from buffer when multiprocessing is used."""
# Send a request for data
self.parent_conn.send('get_data')
# Receive the data
data = self.parent_conn.recv()
if data.shape[0] > 0:
return data
else:
return np.empty((0, len(self.n_channels_trigger)))
[docs]
def clear_buffer(self):
"""
Clears serial buffer.
"""
self.read_data()
[docs]
def data_generator_threading(self, stop_event, sample_rate, fun_or_arr, fun_args):
"""
This function runs in a separate process and generates data (2D numpy arrays),
and maintains a buffer of generated data.
"""
time_start = time.time()
time_previous = time_start
time_add = 0
if callable(fun_or_arr):
function = fun_or_arr
is_fun = True
else:
data_loop = fun_or_arr
is_fun = False
N = 0 # samples generated so far
self.buffer = []
while not stop_event.is_set():
time.sleep(0.01)
time_now = time.time()
time_elapsed = time_now - time_previous
samples_to_read = int(time_elapsed * sample_rate)
time_array = np.arange(samples_to_read)/sample_rate + time_add
if len(time_array) == 0:
continue
if is_fun:
data = function(time_array, *fun_args)
else:
idx = np.arange(N, N + samples_to_read) % len(data_loop)
data = data_loop[idx]
if data.ndim == 3:
data = data.reshape((-1, data.shape[1]*data.shape[2]))
time_previous = time_now
time_add = time_array[-1] + 1/sample_rate
# Simulate data generation (using random numbers here)
with self.lock_retrieve_data:
self.buffer.append(data)
N += samples_to_read
# Sleep for a bit to simulate time it takes to generate data
[docs]
@staticmethod
def data_generator_multiprocessing(connection, stop_event, sample_rate, ser_function, fun_args):
"""
This function runs in a separate process and generates data (2D numpy arrays),
and maintains a buffer of generated data.
"""
import time
import numpy as np
#deserialize function using pickle:
function = pickle.loads(ser_function)
time_start = time.time()
time_previous = time_start
time_add = 0
buffer = []
while not stop_event.is_set():
time_now = time.time()
time_elapsed = time_now - time_previous
samples_to_read = int(time_elapsed * sample_rate)
time_array = np.arange(samples_to_read)/sample_rate + time_add
if len(time_array) == 0:
continue
data = function(time_array, *fun_args)
if data.ndim == 3:
data = data.reshape((-1, data.shape[1]*data.shape[2]))
time_previous = time_now
time_add = time_array[-1] + 1/sample_rate
# Simulate data generation (using random numbers here)
buffer.append(data)
# Sleep for a bit to simulate time it takes to generate data
time.sleep(0.01)
# Check if there is a request for data
if connection.poll():
request = connection.recv()
if request == 'get_data':
if len(buffer) > 0:
# Send the entire buffer as a numpy array
connection.send(np.vstack(buffer))
# Clear the buffer
buffer.clear()
else:
connection.send( np.array([]) )