Effective Realtime

Data Processing

using

LabJack T-Series

DAQs

Ben Montgomery, Paul Nakroshis

What Problem do we solve?

Streaming data from sensors is one of the most common tasks in experimental computational physics. Many solutions require proprietary interfaces or software to interact with them, and the rest tend to be exceptionally complex to interact with.

 

We have attempted to solve this problem by writing a simplified interface to an existing, performant solution with high complexity.

from datetime import datetime
import sys

from labjack import ljm


MAX_REQUESTS = 25  # The number of eStreamRead calls that will be performed.

# Open first found LabJack
# T7 device, Ethernet connection, Any identifier
handle = ljm.openS("T7", "ETHERNET", "ANY")

# Get device type, connection type, serial number, IP, port, max bytes / MB
info = ljm.getHandleInfo(handle)
deviceType = info[0]

# Stream Configuration
aScanListNames = ["AIN0", "AIN1"]  # Scan list names to stream
numAddresses = len(aScanListNames)
aScanList = ljm.namesToAddresses(numAddresses, aScanListNames)[0]
scanRate = 1000
scansPerRead = int(scanRate / 2)

try:
    # When streaming, negative channels and ranges can be configured for
    # individual analog inputs, but the stream has only one settling
    # time and resolution.

    # LabJack T7 and other devices configuration

    # Ensure triggered stream is disabled.
    ljm.eWriteName(handle, "STREAM_TRIGGER_INDEX", 0)

    # Enabling internally-clocked stream.
    ljm.eWriteName(handle, "STREAM_CLOCK_SOURCE", 0)

    # All negative channels are single-ended, AIN0 and AIN1 ranges are
    # +/-10 V, stream settling is 0 (default) and stream resolution index
    # is 0 (default).
    aNames = ["AIN_ALL_NEGATIVE_CH", "AIN0_RANGE", "AIN1_RANGE",
                "STREAM_SETTLING_US", "STREAM_RESOLUTION_INDEX"]
    aValues = [ljm.constants.GND, 10.0, 10.0, 0, 0]
    # Write the analog inputs' negative channels (when applicable), ranges,
    # stream settling time and stream resolution configuration.
    numFrames = len(aNames)
    ljm.eWriteNames(handle, numFrames, aNames, aValues)

    # Configure and start stream
    scanRate = ljm.eStreamStart(handle, scansPerRead, numAddresses,\
                                aScanList, scanRate)



    print("\nStream started with a scan rate of %0.0f Hz." % scanRate)

    start = datetime.now()
    totScans = 0
    totSkip = 0  # Total skipped samples

    i = 1
    while i <= MAX_REQUESTS:
        ret = ljm.eStreamRead(handle)

        aData = ret[0]
        scans = len(aData) / numAddresses
        totScans += scans

        # Count the skipped samples which are indicated by -9999 values.
        # Missed samples occur after a device's stream buffer overflows
        # and are reported after auto-recover mode ends.
        curSkip = aData.count(-9999.0)
        totSkip += curSkip

        i += 1

    end = datetime.now()

    print("\nTotal scans = %i" % (totScans))
    tt = (end - start).seconds \
         + float((end - start).microseconds) / 1000000
    print("Time taken = %f seconds" % (tt))
    print("LJM Scan Rate = %f scans/second" % (scanRate))
    print("Timed Scan Rate = %f scans/second" % (totScans / tt))
    print("Timed Sample Rate = %f samples/second" \
          % (totScans * numAddresses / tt))
    print("Skipped scans = %0.0f" % (totSkip / numAddresses))
except ljm.LJMError:
    ljme = sys.exc_info()[1]
    print(ljme)
except Exception:
    e = sys.exc_info()[1]
    print(e)

try:
    print("\nStop Stream")
    ljm.eStreamStop(handle)
except ljm.LJMError:
    ljme = sys.exc_info()[1]
    print(ljme)
except Exception:
    e = sys.exc_info()[1]
    print(e)

# Close handle
ljm.close(handle)
from labjackcontroller.labtools import LabjackReader

# Record data for 30 seconds, because why not.
duration = 30

# Record two channels, as labeled on the device.
channels = ["AIN0", "AIN1"]

# Record in 10v mode on both channels.
voltages = [10.0, 10.0]


# Instantiate a LabjackReader
my_lj = LabjackReader("T7", connection="ETHERNET")

# Actually collect data  at 1000 Hz
my_lj.collect_data(channels, voltages, duration, 1000)

# Bonus, print out our collected data.
print(my_lj.to_dataframe())

In any given task, we can group individual tasks into meta-steps:

  • Set up device
    • Convert names (“AIN0”) to addresses
    • Disable stream triggering and clocked sources, if desired and applicable
    • Write all this information to the device
  • Run the stream
    • Start stream
    • Loop through the stream scans, handling them fast enough s.t. no buffers overflow
    • Handle any skips or other errors.
    • Close the stream when done

Our Approach

Advantages

  • Simpler and more readable.
  • Our API implementation directly uses the C bindings of the Labjack, and integrates some performance improvements.
  • Automatically configures devices, no matter the model.
  • Can experimentally tell you at what rates the device can reliably sample at given your computer, your connection, and your Labjack's version.

Disadvantages

  • Lower income if paid to develop by the hour

A Sample Use Case

Setup Overview

  • Magnetically dampened torsion pendulum
    • Computational dampening in 3D
  • Temperature Sensor
  • Quadrant cell photodiode for movement detection
  • Data is collected by a T7 Pro

Experiment Needs

  • Monitor torsion behavior for later analysis
  • Back up data in real-time
  • Use data of the pendulum's movement to adjust magnetic dampening
from labjackcontroller.labtools import LabjackReader

from multiprocessing.managers import BaseManager
from multiprocessing import Process

import time



device_type = "T7"
connection_type = "ETHERNET"
duration = 18
channels = ["AIN0", "AIN1", "AIN2", "AIN3"]
voltages = [10.0, 10.0, 10.0, 10.0]


BaseManager.register('LabjackReader', LabjackReader)
manager = BaseManager()
manager.start()

# Instantiate a shared LabjackReader
my_lj = manager.LabjackReader(device_type, connection=connection_type)

# Find the maximum frequency that we can safely run at.
freq, packet_size = my_lj.find_max_freq(channels, voltages, num_seconds=5)

# Declare a data-gathering process
data_proc = Process(target=my_lj.collect_data,
                    args=(channels, voltages, duration, freq),
                    kwargs={'resolution': 0,
                            'sample_rate': packet_size})


# Declare a data backup process
backup_proc = Process(target=backup, args=(my_lj, "backup.pkl",
                                           duration))

# Start all threads, and join when finished.
data_proc.start()
backup_proc.start()

data_proc.join()
backup_proc.join()

Thank you!