Source code for LDAQ.digilent.acquisition
import numpy as np
import time
import sys
# Analog Discovery 2:
from . import dwfconstants as dwfc
from ctypes import *
from ..acquisition_base import BaseAcquisition
[docs]
class WaveFormsAcquisition(BaseAcquisition):
"""
This is a class for acquiring data from Digilent Analog Discovery 2, using WaveForms SDK.
To use this class, you need to install WaveForms found on this link:
https://digilent.com/shop/software/digilent-waveforms/
Installation instructions:
- Download WaveForms from the link listed above.
- Install WaveForms.
"""
def __init__(self, acquisition_name=None, channels=[0, 1], sample_rate=10000,
channel_names=None, device_number=None):
"""Initiates WaveForms acquisition class.
Args:
acquisition_name (str, optional): name of the acquisition. Defaults to None, in which case 'AD2' is used.
channels (list, optional): list of channels used in acquisition. Defaults to [0, 1], in which case both available channels of Analog Discovery 2 will be used.
sample_rate (int, optional): sample rate used for acquisition. Defaults to 10000.
channel_names (list, optional): List of strings of channel names. Defaults to None, in which case channel names will be named 'CH0' and 'CH1'.
device_number (int, optional): device number to which this class should connect to (in case there are multiple Waveforms devices).
Defaults to None, in which case the first available device will be used.
"""
super().__init__()
self.acquisition_name = 'AD2' if acquisition_name is None else acquisition_name
self._channel_names_init = channel_names if channel_names is not None else [f'CH{i}' for i in channels]
self.channel_idx = channels
self.sample_rate = sample_rate
self.device_number = device_number if device_number is not None else -1
if sys.platform.startswith("win"):
self.dwf = cdll.dwf
elif sys.platform.startswith("darwin"):
self.dwf = cdll.LoadLibrary("/Library/Frameworks/dwf.framework/dwf")
else:
self.dwf = cdll.LoadLibrary("libdwf.so")
self.hdwf = c_int(0) # device handle
# tracking of lost data and corrupted data:
self.cLost = c_int()
self.cCorrupted = c_int()
self.fLost = 0
self.fCorrupted = 0
self.configure_channels() # configure channel range
self.set_data_source()
self.set_trigger(1e20, 0, duration=1.0)
[docs]
def configure_channels(self, input_range=None):
"""Specify min and max value range for each channel.
Args:
input_range (dict): dictionary with channel index as key and tuple of min and max values as value. channel indices
have to be the same as self.channel_idx (or channels input parameters in init)
For example: {0:(-10, 10), 1:(-5, 5)}
-> channel 0 has range -10 to 10 V and channel 1 has range -5 to 5 V.
By default, all channels have range -10 to 10 V.
"""
if input_range is None:
if not hasattr(self, 'input_range'):
input_range = {idx:(-10, 10) for idx in self.channel_idx}
self.input_range = input_range
# based on which channels are used:
for idx in self.channel_idx:
val_min, val_max = self.input_range[idx]
ch_range = val_max - val_min
ch_offset = (val_max + val_min)/2
# enable channel:
self.dwf.FDwfAnalogInChannelEnableSet(self.hdwf, c_int(idx), c_bool(True))
# set range:
self.dwf.FDwfAnalogInChannelRangeSet(self.hdwf, c_int(idx), c_double(ch_range))
# set offset:
self.dwf.FDwfAnalogInChannelOffsetSet(self.hdwf, c_int(idx), c_double(ch_offset))
[docs]
def read_data(self):
"""
This method only reads data from the source and transforms data into standard format used by other methods.
This method is called within self.acquire() method which properly handles acquiring data and saves it into
pyTrigger ring buffer.
Must return a 2D numpy array of shape (n_samples, n_columns).
"""
sts = c_byte() # acquisition status
cAvailable = c_int() # number of samples available
self.dwf.FDwfAnalogInStatus(self.hdwf, c_int(1), byref(sts))
if (sts == dwfc.DwfStateConfig or sts == dwfc.DwfStatePrefill or sts == dwfc.DwfStateArmed) :
# Acquisition not yet started.
return np.empty((0, self.n_channels))
self.dwf.FDwfAnalogInStatusRecord(self.hdwf, byref(cAvailable), byref(self.cLost), byref(self.cCorrupted))
if self.cLost.value :
self.fLost = 1
if self.cCorrupted.value :
self.fCorrupted = 1
if cAvailable.value==0: # no data available
return np.empty((0, self.n_channels))
arr = []
for i in self.channel_idx:
rgdSamples = (c_double*cAvailable.value)()
self.dwf.FDwfAnalogInStatusData(self.hdwf, c_int(i), byref(rgdSamples), cAvailable) # get channel 1 data
values = np.fromiter(rgdSamples, dtype =float)
arr.append(values)
arr = np.array(arr).T
return arr
[docs]
def set_data_source(self):
"""
Properly sets acquisition source before measurement is started.
Should be set up in a way that it is able to be called multiple times in a row without issues.
"""
if self.hdwf.value == dwfc.hdwfNone.value: # if device is not open
self.dwf.FDwfDeviceOpen(self.device_number, byref(self.hdwf))
self.dwf.FDwfAnalogInAcquisitionModeSet(self.hdwf, dwfc.acqmodeRecord)
self.dwf.FDwfAnalogInFrequencySet(self.hdwf, c_double(self.sample_rate))
self.dwf.FDwfAnalogInRecordLengthSet(self.hdwf, c_double(-1)) # -1 infinite record length
self.configure_channels()
#wait at least 2 seconds for the offset to stabilize
time.sleep(0.3)
# check if the device is running:
device_state = c_int()
self.dwf.FDwfAnalogInStatus(self.hdwf, c_bool(True), byref(device_state))
self.dwf.FDwfAnalogInConfigure(self.hdwf, c_int(0), c_int(1))
super().set_data_source()
[docs]
def terminate_data_source(self):
"""
Properly closes acquisition source after the measurement.
Returns None.
"""
self.dwf.FDwfDeviceCloseAll()
self.hdwf = c_int(0)
[docs]
def get_sample_rate(self):
"""
Returns sample rate of acquisition class.
This function is also useful to compute sample_rate estimation if no sample rate is given
Returns self.sample_rate
"""
return self.sample_rate
[docs]
def clear_buffer(self):
"""
The source buffer should be cleared with this method. Either actually clears the buffer, or
just reads the data with self.read_data() and does not add/save data anywhere.
Returns None.
"""
self.read_data()