Ben Montgomery, Paul Nakroshis
Streaming data from sensors is one of the most common tasks in experimental computational physics. Many solutions require proprietary interfaces or software to interact with them, and the rest tend to be exceptionally complex to interact with.
We have attempted to solve this problem by writing a simplified interface to an existing, performant solution with high complexity.
from datetime import datetime
import sys
from labjack import ljm
MAX_REQUESTS = 25 # The number of eStreamRead calls that will be performed.
# Open first found LabJack
# T7 device, Ethernet connection, Any identifier
handle = ljm.openS("T7", "ETHERNET", "ANY")
# Get device type, connection type, serial number, IP, port, max bytes / MB
info = ljm.getHandleInfo(handle)
deviceType = info[0]
# Stream Configuration
aScanListNames = ["AIN0", "AIN1"] # Scan list names to stream
numAddresses = len(aScanListNames)
aScanList = ljm.namesToAddresses(numAddresses, aScanListNames)[0]
scanRate = 1000
scansPerRead = int(scanRate / 2)
try:
# When streaming, negative channels and ranges can be configured for
# individual analog inputs, but the stream has only one settling
# time and resolution.
# LabJack T7 and other devices configuration
# Ensure triggered stream is disabled.
ljm.eWriteName(handle, "STREAM_TRIGGER_INDEX", 0)
# Enabling internally-clocked stream.
ljm.eWriteName(handle, "STREAM_CLOCK_SOURCE", 0)
# All negative channels are single-ended, AIN0 and AIN1 ranges are
# +/-10 V, stream settling is 0 (default) and stream resolution index
# is 0 (default).
aNames = ["AIN_ALL_NEGATIVE_CH", "AIN0_RANGE", "AIN1_RANGE",
"STREAM_SETTLING_US", "STREAM_RESOLUTION_INDEX"]
aValues = [ljm.constants.GND, 10.0, 10.0, 0, 0]
# Write the analog inputs' negative channels (when applicable), ranges,
# stream settling time and stream resolution configuration.
numFrames = len(aNames)
ljm.eWriteNames(handle, numFrames, aNames, aValues)
# Configure and start stream
scanRate = ljm.eStreamStart(handle, scansPerRead, numAddresses,\
aScanList, scanRate)
print("\nStream started with a scan rate of %0.0f Hz." % scanRate)
start = datetime.now()
totScans = 0
totSkip = 0 # Total skipped samples
i = 1
while i <= MAX_REQUESTS:
ret = ljm.eStreamRead(handle)
aData = ret[0]
scans = len(aData) / numAddresses
totScans += scans
# Count the skipped samples which are indicated by -9999 values.
# Missed samples occur after a device's stream buffer overflows
# and are reported after auto-recover mode ends.
curSkip = aData.count(-9999.0)
totSkip += curSkip
i += 1
end = datetime.now()
print("\nTotal scans = %i" % (totScans))
tt = (end - start).seconds \
+ float((end - start).microseconds) / 1000000
print("Time taken = %f seconds" % (tt))
print("LJM Scan Rate = %f scans/second" % (scanRate))
print("Timed Scan Rate = %f scans/second" % (totScans / tt))
print("Timed Sample Rate = %f samples/second" \
% (totScans * numAddresses / tt))
print("Skipped scans = %0.0f" % (totSkip / numAddresses))
except ljm.LJMError:
ljme = sys.exc_info()[1]
print(ljme)
except Exception:
e = sys.exc_info()[1]
print(e)
try:
print("\nStop Stream")
ljm.eStreamStop(handle)
except ljm.LJMError:
ljme = sys.exc_info()[1]
print(ljme)
except Exception:
e = sys.exc_info()[1]
print(e)
# Close handle
ljm.close(handle)
from labjackcontroller.labtools import LabjackReader
# Record data for 30 seconds, because why not.
duration = 30
# Record two channels, as labeled on the device.
channels = ["AIN0", "AIN1"]
# Record in 10v mode on both channels.
voltages = [10.0, 10.0]
# Instantiate a LabjackReader
my_lj = LabjackReader("T7", connection="ETHERNET")
# Actually collect data at 1000 Hz
my_lj.collect_data(channels, voltages, duration, 1000)
# Bonus, print out our collected data.
print(my_lj.to_dataframe())
In any given task, we can group individual tasks into meta-steps:
from labjackcontroller.labtools import LabjackReader
from multiprocessing.managers import BaseManager
from multiprocessing import Process
import time
device_type = "T7"
connection_type = "ETHERNET"
duration = 18
channels = ["AIN0", "AIN1", "AIN2", "AIN3"]
voltages = [10.0, 10.0, 10.0, 10.0]
BaseManager.register('LabjackReader', LabjackReader)
manager = BaseManager()
manager.start()
# Instantiate a shared LabjackReader
my_lj = manager.LabjackReader(device_type, connection=connection_type)
# Find the maximum frequency that we can safely run at.
freq, packet_size = my_lj.find_max_freq(channels, voltages, num_seconds=5)
# Declare a data-gathering process
data_proc = Process(target=my_lj.collect_data,
args=(channels, voltages, duration, freq),
kwargs={'resolution': 0,
'sample_rate': packet_size})
# Declare a data backup process
backup_proc = Process(target=backup, args=(my_lj, "backup.pkl",
duration))
# Start all threads, and join when finished.
data_proc.start()
backup_proc.start()
data_proc.join()
backup_proc.join()