Source code for LDAQ.serial_communication.acquisition

import numpy as np
import time
from ctypes import *

# Serial communication:
import serial
import struct

from ..acquisition_base import BaseAcquisition

[docs] class SerialAcquisition(BaseAcquisition): """ General Purpose Class for Serial Communication. """ def __init__(self, port, baudrate, byte_sequence, timeout=1, start_bytes=b"", end_bytes=b"\n", write_start_bytes=None, write_end_bytes=None, pretest_time=None, sample_rate=None, channel_names = None, acquisition_name=None ): """ Initializes serial communication. Args: port (str): serial port (i.e. "COM1") baudrate (int): baudrate for serial communication byte_sequence (tuple): data sequence in each recived line via serial communication example: (("int16", 2), ("int32", 2), ("uint16", 3)) explanations: line consists of 2 16bit signed intigers, followed by 2 signed 32bit intigers, followed by 3 unsigned 16bit intigers. supported types: int8, uint8, int16, uint16, int32, uint32, float32, float64 timeout (int, optional): timeout for serial communication. Defaults to 1 ms. start_bytes (bstr, optional): received bytes via serial communication indicating the start of each line. For examples: b"\x00\x01\x00". Defaults to b"". end_bytes (bstr, optional): recieved bytes via serial communication indicating the end of each line. Defaults to b"\n". write_start_bytes (bstr, optional): bytes to be written at the beggining of acquisition. Defaults to None. write_end_bytes (bstr, optional): bytes to be written at the beggining of acquisition. Defaults to None. pretest_time (float, optional): time for which sample rate test is run for when class is created. If None, 10 seconds pretest is performed. Defaults to None. sample_rate (float, optional): Sample rate at which data is acquired. If None, then sample_rate pretest will be performed for 'pretest_time' seconds. Defaults to None. channel_names (list, optional): list of strings of channel names. Defaults to None, in which case channel names will be set to ["channel 1", "channel 2", ...]. acquisition_name (str, optional): name of the acquisition. Defaults to None, in which case acquisition_name will be set to "SerialAcquisition". """ super().__init__() if acquisition_name is None: self.acquisition_name = "SerialAcquisition" else: self.acquisition_name = acquisition_name self._channel_names_init = channel_names # list of original channels names from source self.port = port self.baudrate = baudrate self.byte_sequence = byte_sequence self.start_bytes_write = write_start_bytes self.end_bytes_write = write_end_bytes self.start_bytes = start_bytes self.end_bytes = end_bytes self.timeout = timeout self.unpack_string = b"" self.expected_number_of_bytes = 0 self.write_delay_ms = 10 # delay between serial writes in ms self.set_unpack_data_settings() # sets unpack_string, expected_number_of_bytes, n_channels self.set_channel_names() # sets channel names if none were given to the class self.n_channels_trigger = self.n_channels # number of channels used for triggering if self.start_bytes_write is not None: self.set_data_source(write_start_bytes=True) # initializes serial connection else: self.set_data_source(write_start_bytes=False) self.buffer = b"" # buffer to which recieved data is added # Estimate sample_rate: self.pretest_time = pretest_time if pretest_time is not None else 10. self.sample_rate = sample_rate if sample_rate is not None else self.get_sample_rate(run_pretest=True) # set default trigger, so the signal will not be trigered: self.set_trigger(1e20, 0, duration=1.0)
[docs] def set_data_source(self, write_start_bytes=False): """ Initializes serial connection, sets channels and virtual channels. Args: write_start_bytes (bool, optional): If True, then writes write_start_bytes to serial port (passed in init method). Defaults to False. """ # open terminal: if not hasattr(self, 'ser'): try: self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout) except serial.SerialException: print("Serial port is in use or has not been found.") elif not self.ser.is_open: self.ser.open() time.sleep(1.0) else: pass # Send commands over serial: if write_start_bytes: self.write_to_serial(self.start_bytes_write, self.write_delay_ms) time.sleep(0.5) self.ser.reset_input_buffer() # clears previous data self.buffer = b"" # reset class buffer super().set_data_source()
[docs] def terminate_data_source(self): """ Closes serial connection. """ self.buffer = b"" time.sleep(0.01) self.write_to_serial(self.end_bytes_write, self.write_delay_ms) time.sleep(0.1) self.ser.close()
[docs] def read_data(self): """reads data from serial port and returns it as numpy array with shape (n_samples, n_channels). Returns: np.ndarray: data from serial port with shape (n_samples, n_channels). """ # 1) read all data from serial self.buffer += self.ser.read_all() # 2) split data into lines parsed_lines = self.buffer.split(self.end_bytes + self.start_bytes) if len(parsed_lines) == 1 or len(parsed_lines) == 0: # not enough data return np.empty((0, self.n_channels)) # 3) decode full lines, convert data to numpy array data = [] for line in parsed_lines[:-1]: # last element probably does not contain all data if len(line) == self.expected_number_of_bytes - len(self.end_bytes+self.start_bytes): line_decoded = struct.unpack(self.unpack_string, line) data.append(line_decoded) else: #print(f"Expected nr. of bytes {self.expected_number_of_bytes}, line contains {len(line)}") pass data = np.array(data) if len(data) == 0: data = data.reshape(-1, self.n_channels) # 4) reset buffer with remaninig bytes: self.buffer = self.end_bytes + self.start_bytes + parsed_lines[-1] return data
[docs] def clear_buffer(self): """ Clears serial buffer. """ self.ser.read_all()
[docs] def set_unpack_data_settings(self): """ Converts byte_sequence to string passed to struct unpack method. """ self.convert_dict = { "uint8": ("B", 1), # (struct format, number of bytes) "int8": ("b", 1), "uint16": ("H", 2), "int16": ("h", 2), "uint32": ("L", 4), "int32": ("l", 4), "float32": ("f", 4), "float64": ("d", 8), } self.unpack_string = "<" # order of several bytes for 1 variable (see struct library) self.n_channels = 0 self.expected_number_of_bytes = len(self.start_bytes) + len(self.end_bytes) for seq in self.byte_sequence: dtype, n = seq for i in range(n): self.unpack_string += self.convert_dict[dtype][0] self.expected_number_of_bytes += self.convert_dict[dtype][1] self.n_channels += 1 return self.unpack_string
[docs] def set_channel_names(self): """ Sets default channel names if none were passed to the class. """ if self._channel_names_init is None: self.channel_names = [f"channel {i+1}" for i in range(self.n_channels)] else: if len(self._channel_names_init) != self.n_channels: self.channel_names = [f"channel {i+1}" for i in range(self.n_channels)] else: self.channel_names = self.channel_names
[docs] def write_to_serial(self, write_bytes, delay_ms=10): """ Writes data to serial port. Args: write_bytes (list, tuple, bytes, bytearray): bytes to be written to serial port. If list/tuple, then elements have to be of type byte/bytearray. Writes each encoded bstring with 'delay_ms' delay. delay_ms (int, optional): Delay between writing bytes. Defaults to 10 ms. """ delay_ms = delay_ms/1000. if write_bytes is None: pass else: if isinstance(write_bytes, list): if all(isinstance(b, (bytes, bytearray)) for b in write_bytes): for byte in write_bytes: self.ser.write(byte) time.sleep(delay_ms) else: raise TypeError("write_bytes have to be bytes or bytearray type.") elif isinstance(write_bytes, (bytes, bytearray)): self.ser.write(write_bytes) time.sleep(delay_ms) else: raise TypeError("write_bytes have to be bytes or bytearray type.")
[docs] def get_sample_rate(self, run_pretest=False): """Returns acquisition sample rate or estimates sample rate if run_pretest is True. Args: run_pretest (bool, optional): If True, then runs pretest to estimate sample rate. Defaults to False. Returns: float: estimated sample rate """ if run_pretest: self.set_data_source() time.sleep(0.1) # Run pretest: self.is_running = True self.buffer = b"" # pretest: print(f"Running pretest to estimate sample rate for {self.pretest_time} seconds...") time_start = time.time() n_cycles = 0 while True: self.buffer += self.ser.read_all() n_cycles += 1 if time.time()-time_start >= self.pretest_time: break time_end = time.time() self.buffer += self.ser.read_all() self.buffer2 = self.buffer # parse data: parsed_lines = self.buffer.split(self.end_bytes + self.start_bytes) if len(parsed_lines) == 1 or len(parsed_lines) == 0: # not enough data print(ValueError(f"No data has been transmitted. Sample rate {self.sample_rate} Hz will be assumed.")) self.sample_rate = 100 else: data = [] for line in parsed_lines[:-1]: # last element probably does not contain all data if len(line) == self.expected_number_of_bytes - len(self.end_bytes+self.start_bytes): line_decoded = struct.unpack(self.unpack_string, line) data.append(line_decoded) else: pass self.Trigger.N_acquired_samples = len(data) # calculate sample_rate: self.sample_rate = int( self.Trigger.N_acquired_samples / (time_end - time_start ) ) # this is overcomplicating things: :) t_cycle = (time_end - time_start )/n_cycles self.sample_rate = int( (self.Trigger.N_acquired_samples + t_cycle*self.sample_rate) / (time_end - time_start ) ) if self.sample_rate == 0: print("Something went wrong. Please check 'byte_sequence' input parameter if recieved byte sequence is correct.") # end acquisition: self.stop() self.terminate_data_source() print("Completed.") else: pass return self.sample_rate
class SerialAcquisitionSimple(BaseAcquisition): """ General Purpose Class for Serial Communication, meant to read characters and numbers from serial port. """ def __init__(self, port, baudrate, delimiter, timeout=1, start_character=b"", end_character=b"\r\n", write_start_character=None, write_end_character=None, pretest_time=None, sample_rate=None, channel_names = None, acquisition_name=None, verbose=False ): """ Initializes serial communication. Args: port (str): serial port (i.e. "COM1") baudrate (int): baudrate for serial communication delimiter (str, bstr): delimiter for serial communication timeout (int, optional): timeout for serial communication. Defaults to 1 ms. start_character (str, bstr,optional): received characters via serial communication indicating the start of each line. Defaults to b"". end_character (str, bstr, optional): recieved characters via serial communication indicating the end of each line. Defaults to b"\r\n". write_start_character (str, bstr, list, optional): bytes to be written at the beggining of acquisition. Can be a list of bytes or list of strings. Defaults to None. write_end_character (bstr, optional): bytes to be written at the beggining of acquisition. Can be a list of bytes or list of strings. Defaults to None. pretest_time (float, optional): time for which sample rate test is run for when class is created. If None, 10 seconds pretest is performed. Defaults to None. sample_rate (float, optional): Sample rate at which data is acquired. If None, then sample_rate pretest will be performed for 'pretest_time' seconds. Defaults to None. channel_names (list, optional): list of strings of channel names. Defaults to None, in which case channel names will be set to ["channel 1", "channel 2", ...]. acquisition_name (str, optional): name of the acquisition. Defaults to None, in which case acquisition_name will be set to "SerialAcquisitionSimple". verbose (bool, optional): If True, then prints more information. Defaults to False. """ super().__init__() self.verbose = verbose if acquisition_name is None: self.acquisition_name = "SerialAcquisitionSimple" else: self.acquisition_name = acquisition_name self.pretest_time = pretest_time if pretest_time is not None else 10. self.expected_number_of_channels = 0 self.write_delay_ms = 10 # delay between serial writes in ms self._channel_names_init = channel_names # list of original channels names from source self.port = port self.timeout = timeout self.baudrate = baudrate self.delimiter = delimiter if type(delimiter) == bytes else delimiter.encode() self.start_character = start_character if type(start_character) == bytes else start_character.encode() self.end_character = end_character if type(end_character) == bytes else end_character.encode() self.start_character_write = self.process_end_characters(write_start_character) self.end_character_write = self.process_end_characters(write_end_character) self.set_channel_names() # sets channel names if none were given to the class self.n_channels_trigger = self.n_channels # number of channels used for triggering self.set_data_source() # initializes serial connection self.buffer = b"" # buffer to which recieved data is added # Estimate sample_rate: self.sample_rate = sample_rate if sample_rate is not None else self.get_sample_rate(run_pretest=True) # set default trigger, so the signal will not be trigered: self.set_trigger(1e20, 0, duration=1.0) @staticmethod def process_end_characters(write_end_character): """ Processes the end character(s) for writing. Converts strings to bytes or stores bytes as is. Parameters ---------- write_end_character : str, bytes, or list of str or bytes The character(s) to be processed. Returns ------- list_bytes : None or list The processed character(s) in bytes format, or None. Raises ------ ValueError Raised if write_end_character is not a string, bytes, or list of strings or bytes. """ list_bytes = None if type(write_end_character) == str: list_bytes = write_end_character.encode() elif type(write_end_character) == list: list_bytes = [] for char in write_end_character: if type(char) == str: list_bytes.append(char.encode()) elif type(char) == bytes: list_bytes.append(char) else: raise ValueError("write_end_character must be either string, bytes or list of bytes or strings.") return list_bytes def set_data_source(self): """ Initializes serial connection, sets channels and virtual channels. """ self.consecutive_wrong_nr_of_channels = 0 # number of consecutive wrong number of channels decoded # used to stop the measurment if acquisition is not set up properly # open terminal: if not hasattr(self, 'ser'): try: self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout) except serial.SerialException: print("Serial port is in use or has not been found.") elif not self.ser.is_open: self.ser.open() time.sleep(1.0) else: pass # Send commands over serial: self.write_to_serial(self.start_character_write, self.write_delay_ms) time.sleep(0.5) self.ser.reset_input_buffer() # clears previous data self.buffer = b"" # reset class buffer super().set_data_source() def terminate_data_source(self): """ Closes serial connection. """ self.buffer = b"" time.sleep(0.01) self.write_to_serial(self.end_character_write, self.write_delay_ms) time.sleep(0.1) self.ser.close() def read_data(self): """reads data from serial port and returns it as numpy array with shape (n_samples, n_channels). Returns: np.ndarray: data from serial port with shape (n_samples, n_channels). """ # 1) read all data from serial self.buffer += self.ser.read_all() # 2) split data into lines parsed_lines = self.buffer.split(self.end_character + self.start_character) if len(parsed_lines) == 1 or len(parsed_lines) == 0: # not enough data return np.empty((0, self.n_channels)) # 3) decode full lines, convert data to numpy array data = [] for line in parsed_lines[:-1]: # last element probably does not contain all data line_split = line.split(self.delimiter) if len(line_split) == self.expected_number_of_channels: self.consecutive_wrong_nr_of_channels = 0 try: line_decoded = [float(x.decode()) for x in line_split] # just assume everything is float except ValueError: if self.verbose: print(f"Could not convert line to float: {line_split}") continue data.append(line_decoded) elif line_split == [b'']: continue # ignore this line else: self.consecutive_wrong_nr_of_channels += 1 if self.verbose: print(f"Expected nr. of channels {self.expected_number_of_channels}, line contains {len(line_split)}") print(f"Line: {line_split}") pass if self.consecutive_wrong_nr_of_channels > 10: # after 10 wrong lines, stop the measurement self.stop() self.terminate_data_source() raise ValueError("Number of expected and decoded channels do not match. Check if the device is set up properly.") data = np.array(data) if len(data) == 0: data = data.reshape(-1, self.n_channels) # 4) reset buffer with remaining bytes: self.buffer = self.end_character + self.start_character + parsed_lines[-1] return data def clear_buffer(self): """ Clears serial buffer. """ self.ser.read_all() def set_channel_names(self): """ Sets default channel names if none were passed to the class. """ if self._channel_names_init is None: print("No channel names were given. Running pretest to determine number of channels and estimate sample rate...") self.n_channels, self.estimated_sample_rate = self.get_channels_and_sample_rate() self._channel_names_init = [f"channel {i+1}" for i in range(self.n_channels)] self.expected_number_of_channels = self.n_channels else: self.n_channels = len(self._channel_names_init) self.expected_number_of_channels = self.n_channels def write_to_serial(self, write_char, delay_ms=10): """ Writes data to serial port. Args: write_bytes (list, tuple, bytes, bytearray): bytes to be written to serial port. If list/tuple, then elements have to be of type byte/bytearray. Writes each encoded bstring with 'delay_ms' delay. delay_ms (int, optional): Delay between writing bytes. Defaults to 10 ms. """ delay_ms = delay_ms/1000. if write_char is None: pass else: if isinstance(write_char, list): if all(isinstance(b, (bytes, bytearray)) for b in write_char): for byte in write_char: self.ser.write(byte) time.sleep(delay_ms) else: raise TypeError("write_bytes have to be bytes or bytearray type.") elif isinstance(write_char, (bytes, bytearray)): self.ser.write(write_char) time.sleep(delay_ms) else: raise TypeError("write_bytes have to be bytes or bytearray type.") def get_sample_rate(self, run_pretest=False): """Returns acquisition sample rate or estimates sample rate if run_pretest is True. Args: run_pretest (bool, optional): If True, then runs pretest to estimate sample rate. Defaults to False. Returns: float: estimated sample rate """ if hasattr(self, "estimated_sample_rate"): sample_rate = self.estimated_sample_rate elif run_pretest: _, sample_rate = self.get_channels_and_sample_rate() else: sample_rate = self.sample_rate return sample_rate def get_channels_and_sample_rate(self): """Returns number of channels and estimated sample rate if run_pretest is True. Args: None Returns: tuple: number of channels (int), estimated sample rate (float) """ # open terminal: if not hasattr(self, 'ser'): try: self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout) except serial.SerialException: print("Serial port is in use.") elif not self.ser.is_open: self.ser.open() time.sleep(1.0) else: pass # Send commands over serial: self.write_to_serial(self.start_character_write, self.write_delay_ms) time.sleep(0.5) self.ser.reset_input_buffer() # clears previous data self.buffer = b"" # reset class buffer # Test: print(f"Running pretest for {self.pretest_time} seconds...") time_start = time.time() n_cycles = 0 while True: self.buffer += self.ser.read_all() n_cycles += 1 if time.time()-time_start >= self.pretest_time: break time_end = time.time() self.buffer += self.ser.read_all() self.buffer2 = self.buffer # parse data: parsed_lines = self.buffer.split(self.end_character + self.start_character) if len(parsed_lines) == 1 or len(parsed_lines) == 0: # not enough data print(ValueError(f"No data has been transmitted. Sample rate {self.sample_rate} Hz will be assumed.")) self.sample_rate = 100 else: data = [] for line in parsed_lines[1:-1]: # first and last element probably does not contain all data line_split = line.split(self.delimiter) line_decoded = [float(x.decode()) for x in line_split] # just assume everything is float data.append(line_decoded) N_acquired_samples = len(data) + 2 # first and last elements were avioded n_channels = len(data[len(data)//2]) # take length of the middle element # calculate sample_rate: sample_rate = N_acquired_samples / (time_end - time_start) t_cycle = (time_end - time_start )/n_cycles sample_rate = ( N_acquired_samples + t_cycle*sample_rate) / (time_end - time_start ) # this is overcomplicating things :) if sample_rate == 0: print("Something went wrong. Please check 'delimiter'.") # end acquisition: self.terminate_data_source() self.buffer = b"" return n_channels, sample_rate