Devices API Reference
This page provides detailed API documentation for XClock device classes and related types.
Overview
The devices module provides the core abstractions and implementations for controlling DAQ devices as clock generators.
from xclock.devices import ClockDaqDevice, LabJackT4, DummyDaqDevice
from xclock.devices import ClockChannel, EdgeType
Abstract Base Class
ClockDaqDevice
Abstract base class that all device implementations must inherit from.
class ClockDaqDevice(ABC):
"""
Abstract base class for all clock DAQ devices.
Attributes:
handle: Device handle or connection identifier
base_clock_frequency_hz: Base clock frequency of the device
"""
Class Methods
get_available_output_clock_channels()
@staticmethod
@abstractmethod
def get_available_output_clock_channels() -> tuple[str, ...]
Returns a tuple of channel names that can be used for clock output.
Returns:
tuple[str, ...]: Channel names (e.g.,("FIO0", "FIO1", "FIO2"))
Example:
channels = LabJackT4.get_available_output_clock_channels()
print(channels) # ("FIO0", "FIO1", ..., "EIO3")
get_available_input_start_trigger_channels()
@staticmethod
@abstractmethod
def get_available_input_start_trigger_channels() -> tuple[str, ...]
Returns a tuple of channel names that can be used as trigger inputs.
Returns:
tuple[str, ...]: Trigger channel names (e.g.,("DIO4",))
Example:
triggers = LabJackT4.get_available_input_start_trigger_channels()
print(triggers) # ("DIO4",)
Instance Methods
__init__()
def __init__(self)
Initialize the device and establish connection.
Raises:
XClockException: If device initialization fails
Example:
try:
device = LabJackT4()
except XClockException as e:
print(f"Failed to initialize: {e}")
get_added_clock_channels()
@abstractmethod
def get_added_clock_channels(self) -> list[ClockChannel]
Returns a list of currently configured clock channels.
Returns:
list[ClockChannel]: List of configured clocks
Example:
device = LabJackT4()
device.add_clock_channel(100, "FIO0")
device.add_clock_channel(60, "FIO1")
clocks = device.get_added_clock_channels()
print(len(clocks)) # 2
get_unused_clock_channel_names()
@abstractmethod
def get_unused_clock_channel_names(self) -> list[str]
Returns a list of channel names that are available (not yet configured).
Returns:
list[str]: Unused channel names
Example:
device = LabJackT4()
device.add_clock_channel(100, "FIO0")
unused = device.get_unused_clock_channel_names()
print("FIO0" in unused) # False
print("FIO1" in unused) # True
add_clock_channel()
@abstractmethod
def add_clock_channel(
self,
clock_tick_rate_hz: int | float,
channel_name: str | None = None,
number_of_pulses: int | None = None,
duration_s: float | None = None,
enable_clock_now: bool = False,
) -> ClockChannel
Configure a new clock channel.
Parameters:
clock_tick_rate_hz(int | float): Desired clock frequency in Hzchannel_name(str | None): Output channel name, or None for auto-selectnumber_of_pulses(int | None): Number of pulses to generate, or None for continuousduration_s(float | None): Duration in seconds (auto-calculates pulses)enable_clock_now(bool): If True, start this clock immediately
Returns:
ClockChannel: Configured clock channel object
Raises:
XClockException: If channel is invalid or already in useXClockValueError: If parameters are invalid
Notes:
duration_sandnumber_of_pulsesare mutually exclusiveIf both are None, clock runs continuously
Actual frequency may differ from requested (see
ClockChannel.actual_sample_rate_hz)
Example:
device = LabJackT4()
# Continuous clock
clock1 = device.add_clock_channel(
clock_tick_rate_hz=100,
channel_name="FIO0",
)
# Pulsed clock with exact count
clock2 = device.add_clock_channel(
clock_tick_rate_hz=60,
channel_name="FIO1",
number_of_pulses=1000,
)
# Pulsed clock with duration
clock3 = device.add_clock_channel(
clock_tick_rate_hz=30,
duration_s=10.0, # 30 Hz * 10s = 300 pulses
)
start_clocks()
@abstractmethod
def start_clocks(
self,
wait_for_pulsed_clocks_to_finish: bool = False,
)
Start all configured clocks simultaneously.
Parameters:
wait_for_pulsed_clocks_to_finish(bool): If True, block until pulsed clocks complete
Raises:
XClockException: If no clocks configured or start fails
Example:
device = LabJackT4()
device.add_clock_channel(100, "FIO0", number_of_pulses=500)
# Start and wait for completion
device.start_clocks(wait_for_pulsed_clocks_to_finish=True)
print("All pulses generated!")
# Or start without waiting
device.start_clocks(wait_for_pulsed_clocks_to_finish=False)
# Do other work...
device.stop_clocks()
stop_clocks()
@abstractmethod
def stop_clocks(self)
Stop all running clocks immediately.
Example:
device = LabJackT4()
device.add_clock_channel(100, "FIO0")
device.start_clocks()
import time
time.sleep(5) # Run for 5 seconds
device.stop_clocks()
clear_clocks()
@abstractmethod
def clear_clocks(self)
Remove all configured clocks and stop them if running.
Example:
device = LabJackT4()
device.add_clock_channel(100, "FIO0")
device.add_clock_channel(60, "FIO1")
device.clear_clocks()
print(len(device.get_added_clock_channels())) # 0
wait_for_trigger_edge()
@abstractmethod
def wait_for_trigger_edge(
self,
channel_name: str,
timeout_s: float = 5.0,
edge_type: EdgeType = EdgeType.RISING,
) -> bool
Wait for an edge on the specified trigger input channel.
Parameters:
channel_name(str): Trigger input channel nametimeout_s(float): Timeout in seconds (0 or negative = infinite)edge_type(EdgeType): Type of edge to wait for
Returns:
bool: True if triggered, False if timeout
Raises:
XClockException: If channel is invalid
Example:
device = LabJackT4()
device.add_clock_channel(100, "FIO0", duration_s=10)
print("Waiting for trigger...")
triggered = device.wait_for_trigger_edge(
channel_name="DIO4",
timeout_s=30.0,
edge_type=EdgeType.RISING,
)
if triggered:
print("Trigger received! Starting clocks...")
device.start_clocks(wait_for_pulsed_clocks_to_finish=True)
else:
print("Timeout - no trigger received")
start_clocks_and_record_edge_timestamps()
@abstractmethod
def start_clocks_and_record_edge_timestamps(
self,
wait_for_pulsed_clocks_to_finish: bool = True,
extra_channels: list[str] = [],
filename: Path | str | None = None,
)
Start clocks and record edge timestamps to a CSV file.
Parameters:
wait_for_pulsed_clocks_to_finish(bool): Block until pulsed clocks finishextra_channels(list[str]): Additional channels to monitor for edgesfilename(Path | str | None): Output file path, or None for auto-generate
Output Format: CSV file with three columns:
Column 1: Timestamp in nanoseconds (int64) - Device-relative time since start
Column 2: Edge type (int8)
Positive: Rising edge on clock N
Negative: Falling edge on clock N
Column 3: Unix timestamp in nanoseconds (int64) - Host system time
Example:
from pathlib import Path
device = LabJackT4()
device.add_clock_channel(100, "FIO0", duration_s=10)
device.add_clock_channel(60, "FIO1", duration_s=10)
output = Path.home() / "Documents" / "XClock" / "sync.csv"
device.start_clocks_and_record_edge_timestamps(
wait_for_pulsed_clocks_to_finish=True,
extra_channels=["EIO4"], # Also monitor EIO4
filename=output,
)
# Load and analyze
import numpy as np
data = np.loadtxt(output, dtype=np.int64, delimiter=",")
timestamps_ns = data[:, 0] # Device-relative timestamps
edge_types = data[:, 1] # Edge type
unix_timestamps_ns = data[:, 2] # Unix timestamps (host time)
print(f"Recorded {len(data)} edges")
close()
@abstractmethod
def close(self)
Clean up resources and close device connection.
Always call this when done, or use context manager if supported.
Example:
device = LabJackT4()
try:
device.add_clock_channel(100, "FIO0")
device.start_clocks()
finally:
device.close() # Ensure cleanup
Device Implementations
LabJackT4
LabJack T4 USB DAQ device implementation.
class LabJackT4(ClockDaqDevice):
"""
XClock driver for LabJack T4 device.
Attributes:
base_clock_frequency_hz: 80,000,000 (80 MHz)
handle: LabJack device handle
"""
Specifications:
Base clock: 80 MHz
Output channels: FIO0-7, EIO0-3 (12 total)
Trigger input: DIO4
Frequency range: ~1 Hz to ~40 MHz
Timing precision: < 1 µs jitter
Example:
from xclock.devices import LabJackT4
t4 = LabJackT4()
print(f"Base clock: {t4.base_clock_frequency_hz} Hz")
# Use all features
t4.add_clock_channel(100, "FIO0", duration_s=10)
t4.start_clocks_and_record_edge_timestamps(
wait_for_pulsed_clocks_to_finish=True,
filename="timestamps.csv"
)
t4.close()
DummyDaqDevice
Software-only device for testing without hardware.
class DummyDaqDevice(ClockDaqDevice):
"""
Dummy DAQ device for testing.
Simulates all functionality without requiring hardware.
Useful for development, testing, and demonstrations.
"""
Features:
No hardware required
Same API as real devices
Simulated timestamps
Configurable behavior
Example:
from xclock.devices import DummyDaqDevice
dummy = DummyDaqDevice()
dummy.add_clock_channel(100, duration_s=5)
dummy.start_clocks(wait_for_pulsed_clocks_to_finish=True)
dummy.close()
print("Test completed without hardware!")
Data Classes
ClockChannel
Represents a configured clock channel.
@dataclass
class ClockChannel:
"""
Configuration and state of a clock channel.
Attributes:
channel_name: Output channel name (e.g., "FIO0")
clock_id: Unique identifier (1, 2, 3, ...)
clock_enabled: Whether clock is currently running
actual_sample_rate_hz: Actual achieved frequency
number_of_pulses: Number of pulses (None = continuous)
"""
channel_name: str
clock_id: int
clock_enabled: bool
actual_sample_rate_hz: int
number_of_pulses: int | None = None
Example:
device = LabJackT4()
clock = device.add_clock_channel(100, "FIO0", number_of_pulses=1000)
print(f"Channel: {clock.channel_name}")
print(f"Requested: 100 Hz")
print(f"Actual: {clock.actual_sample_rate_hz} Hz")
print(f"Pulses: {clock.number_of_pulses}")
print(f"Enabled: {clock.clock_enabled}")
Enumerations
EdgeType
Enumeration for edge types in trigger detection.
class EdgeType(Enum):
"""
Type of edge for trigger detection.
Values:
RISING: Rising edge (low to high)
FALLING: Falling edge (high to low)
"""
RISING = "rising"
FALLING = "falling"
Example:
from xclock.devices import EdgeType
device = LabJackT4()
device.add_clock_channel(100, "FIO0", duration_s=10)
# Wait for rising edge
device.wait_for_trigger_edge("DIO4", edge_type=EdgeType.RISING)
device.start_clocks()
Complete Example
from pathlib import Path
import numpy as np
from xclock.devices import LabJackT4, EdgeType
# Initialize device
device = LabJackT4()
try:
# Check available resources
output_channels = device.get_available_output_clock_channels()
trigger_channels = device.get_available_input_start_trigger_channels()
print(f"Output channels: {output_channels}")
print(f"Trigger channels: {trigger_channels}")
# Configure multiple synchronized clocks
clock1 = device.add_clock_channel(
clock_tick_rate_hz=60,
channel_name="FIO0",
duration_s=30.0, # 30 seconds
)
clock2 = device.add_clock_channel(
clock_tick_rate_hz=100,
channel_name="FIO1",
duration_s=30.0,
)
print(f"Clock 1: {clock1.actual_sample_rate_hz} Hz on {clock1.channel_name}")
print(f"Clock 2: {clock2.actual_sample_rate_hz} Hz on {clock2.channel_name}")
# Wait for external trigger
print("Waiting for trigger on DIO4...")
triggered = device.wait_for_trigger_edge(
channel_name="DIO4",
timeout_s=60.0,
edge_type=EdgeType.RISING,
)
if not triggered:
print("Timeout - exiting")
exit(1)
print("Trigger received! Starting clocks and recording...")
# Start clocks and record timestamps
output_file = Path.home() / "Documents" / "XClock" / "experiment.csv"
device.start_clocks_and_record_edge_timestamps(
wait_for_pulsed_clocks_to_finish=True,
filename=output_file,
)
print(f"Recording complete: {output_file}")
# Load and analyze timestamps
data = np.loadtxt(output_file, dtype=np.int64, delimiter=",")
timestamps_ns = data[:, 0] # Device-relative timestamps
edge_types = data[:, 1] # Edge type
unix_timestamps_ns = data[:, 2] # Unix timestamps (host time)
# Count edges per clock
clock1_rising = np.sum(edge_types == 1)
clock1_falling = np.sum(edge_types == -1)
clock2_rising = np.sum(edge_types == 2)
clock2_falling = np.sum(edge_types == -2)
print(f"Clock 1: {clock1_rising} rising, {clock1_falling} falling")
print(f"Clock 2: {clock2_rising} rising, {clock2_falling} falling")
finally:
# Always clean up
device.close()
print("Device closed")
See Also
Quick Start Guide - Getting started guide
Device Support - Device-specific information
Adding New Device Support - Adding new devices
Edge Detection API Reference - Edge detection API
Errors API Reference - Error handling