Adding New Device Support
This guide explains how to add support for a new DAQ device to XClock. Following this guide, you can extend XClock to work with any data acquisition hardware that can generate clock signals.
Overview
Adding a new device involves:
Creating a new device class that implements the
ClockDaqDeviceinterfaceImplementing all required abstract methods
Writing tests for your device
Documenting device-specific features
Registering the device in the CLI (optional)
Prerequisites
Before you start:
Familiarity with Python object-oriented programming
Understanding of your DAQ device’s API/SDK
The device’s driver/SDK installed and working
XClock development environment set up
Step 1: Understand the Interface
All clock devices must inherit from the ClockDaqDevice abstract base class and implement its interface.
Required Interface
from abc import ABC, abstractmethod
from xclock.devices.daq_device import ClockDaqDevice, ClockChannel, EdgeType
class ClockDaqDevice(ABC):
"""Abstract base class for all clock DAQ devices."""
handle: int | None
base_clock_frequency_hz: int | float
@staticmethod
@abstractmethod
def get_available_input_start_trigger_channels() -> tuple[str, ...]:
"""Return tuple of channel names that can be used as trigger inputs."""
pass
@staticmethod
@abstractmethod
def get_available_output_clock_channels() -> tuple[str, ...]:
"""Return tuple of channel names that can output clock signals."""
pass
@abstractmethod
def get_added_clock_channels(self) -> list[ClockChannel]:
"""Return list of currently configured clock channels."""
pass
@abstractmethod
def get_unused_clock_channel_names(self) -> list[str]:
"""Return list of available (not yet used) channel names."""
pass
@abstractmethod
def add_clock_channel(
self,
clock_tick_rate_hz: int | float,
channel_name: str | None = None,
number_of_pulses: int | None = None,
duration_s: float | None = None,
enable_clock_now: bool = False,
) -> ClockChannel:
"""Configure a new clock channel."""
pass
@abstractmethod
def wait_for_trigger_edge(
self,
channel_name: str,
timeout_s: float = 5.0,
edge_type: EdgeType = EdgeType.RISING,
) -> bool:
"""Wait for trigger signal. Returns True if triggered, False if timeout."""
pass
@abstractmethod
def start_clocks(
self,
wait_for_pulsed_clocks_to_finish: bool = False,
):
"""Start all configured clocks."""
pass
@abstractmethod
def start_clocks_and_record_edge_timestamps(
self,
wait_for_pulsed_clocks_to_finish: bool = True,
extra_channels: list[str] = [],
filename: Path | str | None = None,
):
"""Start clocks and record edge timestamps to CSV file."""
pass
@abstractmethod
def stop_clocks(self):
"""Stop all running clocks."""
pass
@abstractmethod
def clear_clocks(self):
"""Remove all configured clocks."""
pass
@abstractmethod
def close(self):
"""Clean up resources and close device connection."""
pass
Step 2: Create Your Device Class
Create a new file in src/xclock/devices/ for your device. For example, my_daq_device.py:
from pathlib import Path
import logging
from xclock.devices.daq_device import ClockDaqDevice, ClockChannel, EdgeType
from xclock.errors import XClockException
logger = logging.getLogger(__name__)
class MyDAQDevice(ClockDaqDevice):
"""
XClock driver for MyDAQ Device.
This device supports:
- Base clock frequency: 100 MHz
- 4 output channels (CH0-CH3)
- 1 trigger input (TRIG0)
- Hardware-synchronized multi-channel output
Example:
>>> device = MyDAQDevice()
>>> device.add_clock_channel(clock_tick_rate_hz=100, channel_name="CH0")
>>> device.start_clocks()
>>> device.close()
"""
def __init__(self):
"""Initialize MyDAQ device."""
self.handle = None
self.base_clock_frequency_hz = 100_000_000 # 100 MHz
self._clock_channels = []
# Initialize your device here
try:
# Example: Open device connection
# self.handle = mydaq_sdk.open_device()
logger.info("MyDAQ device initialized successfully")
except Exception as e:
raise XClockException(f"Failed to initialize MyDAQ: {e}")
@staticmethod
def get_available_output_clock_channels() -> tuple[str, ...]:
"""Return available output channels."""
return ("CH0", "CH1", "CH2", "CH3")
@staticmethod
def get_available_input_start_trigger_channels() -> tuple[str, ...]:
"""Return available trigger input channels."""
return ("TRIG0",)
def get_added_clock_channels(self) -> list[ClockChannel]:
"""Return list of configured clock channels."""
return self._clock_channels.copy()
def get_unused_clock_channel_names(self) -> list[str]:
"""Return list of unused channel names."""
used_names = {ch.channel_name for ch in self._clock_channels}
all_names = set(self.get_available_output_clock_channels())
return list(all_names - used_names)
def add_clock_channel(
self,
clock_tick_rate_hz: int | float,
channel_name: str | None = None,
number_of_pulses: int | None = None,
duration_s: float | None = None,
enable_clock_now: bool = False,
) -> ClockChannel:
"""
Add a new clock channel.
Args:
clock_tick_rate_hz: Desired clock frequency in Hz
channel_name: Output channel name (or None for auto-select)
number_of_pulses: Number of pulses (None = continuous)
duration_s: Duration in seconds (auto-calculates pulses)
enable_clock_now: Start immediately if True
Returns:
ClockChannel object with actual configuration
"""
# Auto-select channel if not specified
if channel_name is None:
unused = self.get_unused_clock_channel_names()
if not unused:
raise XClockException("No available channels")
channel_name = unused[0]
# Validate channel name
if channel_name not in self.get_available_output_clock_channels():
raise XClockException(f"Invalid channel: {channel_name}")
# Check if channel already in use
if channel_name in [ch.channel_name for ch in self._clock_channels]:
raise XClockException(f"Channel {channel_name} already in use")
# Calculate pulses from duration if needed
if duration_s is not None and number_of_pulses is None:
number_of_pulses = int(duration_s * clock_tick_rate_hz)
# Calculate actual achievable frequency
# This is device-specific - adjust for your hardware
actual_frequency = self._calculate_actual_frequency(clock_tick_rate_hz)
# Create clock channel object
clock_id = len(self._clock_channels) + 1
channel = ClockChannel(
channel_name=channel_name,
clock_id=clock_id,
clock_enabled=enable_clock_now,
actual_sample_rate_hz=actual_frequency,
number_of_pulses=number_of_pulses,
)
self._clock_channels.append(channel)
# Configure hardware
self._configure_hardware_clock(channel)
logger.info(f"Added clock: {actual_frequency} Hz on {channel_name}")
return channel
def _calculate_actual_frequency(self, requested_hz: float) -> int:
"""
Calculate actual achievable frequency given device constraints.
This is device-specific. Implement based on your hardware's
clock generation mechanism (divisors, PLLs, etc.).
"""
# Example: Simple divisor-based calculation
divisor = round(self.base_clock_frequency_hz / requested_hz)
divisor = max(1, divisor) # Ensure at least 1
actual_hz = self.base_clock_frequency_hz / divisor
return int(actual_hz)
def _configure_hardware_clock(self, channel: ClockChannel):
"""Configure hardware registers/settings for this clock."""
# Implement device-specific configuration here
# Example:
# mydaq_sdk.configure_clock(
# self.handle,
# channel.channel_name,
# channel.actual_sample_rate_hz,
# channel.number_of_pulses
# )
pass
def start_clocks(self, wait_for_pulsed_clocks_to_finish: bool = False):
"""Start all configured clocks."""
if not self._clock_channels:
raise XClockException("No clocks configured")
logger.info(f"Starting {len(self._clock_channels)} clocks")
# Start clocks on hardware
# Example:
# mydaq_sdk.start_all_clocks(self.handle)
# Mark all as enabled
for channel in self._clock_channels:
channel.clock_enabled = True
# Wait if requested
if wait_for_pulsed_clocks_to_finish:
self._wait_for_completion()
def _wait_for_completion(self):
"""Wait for pulsed clocks to finish."""
# Implement waiting logic
# Example:
# while mydaq_sdk.is_running(self.handle):
# time.sleep(0.1)
pass
def stop_clocks(self):
"""Stop all running clocks."""
logger.info("Stopping all clocks")
# Stop hardware
# Example:
# mydaq_sdk.stop_all_clocks(self.handle)
# Mark all as disabled
for channel in self._clock_channels:
channel.clock_enabled = False
def clear_clocks(self):
"""Remove all configured clocks."""
self.stop_clocks()
self._clock_channels.clear()
logger.info("Cleared all clocks")
def wait_for_trigger_edge(
self,
channel_name: str,
timeout_s: float = 5.0,
edge_type: EdgeType = EdgeType.RISING,
) -> bool:
"""
Wait for trigger signal on specified channel.
Returns:
True if triggered, False if timeout
"""
if channel_name not in self.get_available_input_start_trigger_channels():
raise XClockException(f"Invalid trigger channel: {channel_name}")
logger.info(f"Waiting for {edge_type.value} edge on {channel_name}")
# Implement trigger waiting
# Example:
# return mydaq_sdk.wait_for_edge(
# self.handle,
# channel_name,
# edge_type.value,
# timeout_s
# )
return True # Placeholder
def start_clocks_and_record_edge_timestamps(
self,
wait_for_pulsed_clocks_to_finish: bool = True,
extra_channels: list[str] = [],
filename: Path | str | None = None,
):
"""Start clocks and record edge timestamps to CSV file."""
# This is complex - see LabJackT4 implementation for reference
# You may need to implement a separate edge streamer class
# Generate default filename if not provided
if filename is None:
import time
output_dir = Path.home() / "Documents" / "XClock"
output_dir.mkdir(parents=True, exist_ok=True)
timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S")
filename = output_dir / f"mydaq_timestamps_{timestamp_str}.csv"
# Start recording and clocks
# This typically requires:
# 1. Start clocks
# 2. Record timestamps
# 4. Save to CSV
raise NotImplementedError("Timestamp recording not yet implemented")
def close(self):
"""Clean up and close device."""
if self.handle is not None:
self.stop_clocks()
# Close device connection
# Example:
# mydaq_sdk.close_device(self.handle)
logger.info("MyDAQ device closed")
self.handle = None
def __del__(self):
"""Destructor to ensure cleanup."""
try:
self.close()
except:
pass
Step 3: Write Tests
Create a test file tests/xclock/test_my_daq.py:
import pytest
from xclock.devices.my_daq_device import MyDAQDevice
from xclock.devices.daq_device import EdgeType
# Set these for your device
DEVICE_NAME = "MyDAQ"
DEVICE_CLASS = MyDAQDevice
def test_device_initialization():
"""Test that device initializes successfully."""
device = DEVICE_CLASS()
assert device is not None
assert device.handle is not None
device.close()
def test_get_available_channels():
"""Test that device reports available channels."""
output_channels = DEVICE_CLASS.get_available_output_clock_channels()
assert len(output_channels) > 0
assert all(isinstance(ch, str) for ch in output_channels)
trigger_channels = DEVICE_CLASS.get_available_input_start_trigger_channels()
assert len(trigger_channels) >= 0 # May be 0 if no trigger support
def test_add_clock_channel():
"""Test adding a clock channel."""
device = DEVICE_CLASS()
channels = device.get_available_output_clock_channels()
clock = device.add_clock_channel(
clock_tick_rate_hz=100,
channel_name=channels[0],
number_of_pulses=1000,
)
assert clock is not None
assert clock.channel_name == channels[0]
assert clock.actual_sample_rate_hz > 0
assert clock.number_of_pulses == 1000
device.close()
def test_multiple_clocks():
"""Test adding multiple synchronized clocks."""
device = DEVICE_CLASS()
channels = device.get_available_output_clock_channels()
if len(channels) < 2:
pytest.skip("Device has fewer than 2 channels")
clock1 = device.add_clock_channel(60, channels[0], number_of_pulses=100)
clock2 = device.add_clock_channel(100, channels[1], number_of_pulses=100)
assert clock1.channel_name != clock2.channel_name
assert len(device.get_added_clock_channels()) == 2
device.close()
def test_start_stop_clocks():
"""Test starting and stopping clocks."""
device = DEVICE_CLASS()
channels = device.get_available_output_clock_channels()
device.add_clock_channel(100, channels[0], number_of_pulses=10)
# Should not raise
device.start_clocks(wait_for_pulsed_clocks_to_finish=False)
device.stop_clocks()
device.close()
def test_clear_clocks():
"""Test clearing all clocks."""
device = DEVICE_CLASS()
channels = device.get_available_output_clock_channels()
device.add_clock_channel(100, channels[0])
assert len(device.get_added_clock_channels()) == 1
device.clear_clocks()
assert len(device.get_added_clock_channels()) == 0
device.close()
def test_auto_channel_selection():
"""Test automatic channel selection."""
device = DEVICE_CLASS()
# Don't specify channel_name
clock = device.add_clock_channel(clock_tick_rate_hz=100)
assert clock.channel_name in device.get_available_output_clock_channels()
device.close()
def test_duration_pulse_calculation():
"""Test that duration correctly calculates pulses."""
device = DEVICE_CLASS()
clock = device.add_clock_channel(
clock_tick_rate_hz=100,
duration_s=5.0, # 5 seconds at 100 Hz = 500 pulses
)
assert clock.number_of_pulses == 500
device.close()
def test_unused_channels():
"""Test getting unused channel names."""
device = DEVICE_CLASS()
channels = device.get_available_output_clock_channels()
# Initially all unused
unused = device.get_unused_clock_channel_names()
assert len(unused) == len(channels)
# Add one clock
device.add_clock_channel(100, channels[0])
unused = device.get_unused_clock_channel_names()
assert len(unused) == len(channels) - 1
assert channels[0] not in unused
device.close()
def test_close_cleanup():
"""Test that close() properly cleans up."""
device = DEVICE_CLASS()
device.add_clock_channel(100)
device.close()
# After close, handle should be None
assert device.handle is None
Step 4: Register in CLI (Optional)
If you want your device to be available in the CLI, add it to src/cli/main.py:
from xclock.devices import ClockDaqDevice, DummyDaqDevice, LabJackT4
from xclock.devices.my_daq_device import MyDAQDevice # Add import
# Device mapping
DEVICE_MAP = {
"labjackt4": LabJackT4,
"dummydaqdevice": DummyDaqDevice,
"mydaqdevice": MyDAQDevice, # Add your device
}
Step 5: Update Package Exports
Add your device to src/xclock/devices/__init__.py:
from xclock.devices.daq_device import ClockDaqDevice
from xclock.devices.labjack_devices import LabJackT4
from xclock.devices.dummy_daq_device import DummyDaqDevice
from xclock.devices.my_daq_device import MyDAQDevice # Add import
__all__ = [
"ClockDaqDevice",
"LabJackT4",
"DummyDaqDevice",
"MyDAQDevice", # Add to exports
]
Step 6: Document Your Device
Add documentation in docs/source/user/devices.md:
### MyDAQ Device
Brief description of your device.
**Specifications:**
- Base clock frequency: 100 MHz
- Available output channels: 4 (CH0-CH3)
- Trigger input: TRIG0
- ...
**Example:**
\```python
from xclock.devices import MyDAQDevice
device = MyDAQDevice()
device.add_clock_channel(clock_tick_rate_hz=100, channel_name="CH0")
device.start_clocks()
device.close()
\```
Best Practices
Error Handling
Always use XClockException for device-specific errors:
from xclock.errors import XClockException
if not self.handle:
raise XClockException("Device not initialized")
Logging
Use Python’s logging module:
import logging
logger = logging.getLogger(__name__)
logger.info("Device initialized")
logger.debug(f"Clock configured: {channel_name} at {frequency} Hz")
logger.warning("Frequency adjusted to nearest achievable value")
logger.error("Failed to start clock")
Resource Cleanup
Always implement cleanup in close() and __del__():
def close(self):
"""Clean up resources."""
if self.handle is not None:
self.stop_clocks()
# Close device connection
self.handle = None
def __del__(self):
"""Ensure cleanup on deletion."""
try:
self.close()
except:
pass
Type Hints
Use type hints for better IDE support and documentation:
def add_clock_channel(
self,
clock_tick_rate_hz: int | float,
channel_name: str | None = None,
number_of_pulses: int | None = None,
) -> ClockChannel:
...
Advanced Features
Timestamp Recording
For precise timestamp recording, you may need to implement a streaming class similar to LabJackEdgeStreamer. This typically involves:
Starting continuous data acquisition
Processing incoming data in real-time
Detecting edges
Recording timestamps
Saving to CSV format
See LabJackEdgeStreamer in labjack_devices.py for a complete example.
Hardware Synchronization
If your device supports hardware-synchronized multi-channel output:
Configure all channels before starting
Use hardware trigger to start all simultaneously
Ensure all channels reference the same base clock
Frequency Calculation
Different devices have different clock generation methods:
Divisor-based: Frequency = BaseClk / Divisor
PLL-based: More complex calculations
DDS-based: Direct digital synthesis
Implement _calculate_actual_frequency() based on your hardware.
Testing Checklist
Before submitting your device implementation:
All abstract methods implemented
Unit tests written and passing
Device can be initialized
Clocks can be added, started, and stopped
Multiple clocks work simultaneously
Trigger functionality works (if applicable)
Resources cleaned up properly
Documentation added
CLI integration (optional)
Example code provided
Common Pitfalls
1. Not Handling Resource Cleanup
Always close device connections in close() and __del__().
2. Ignoring Frequency Limitations
Not all frequencies are achievable. Calculate and return the actual frequency.
3. Thread Safety
If using threading (e.g., for timestamp recording), ensure thread-safe operations.
4. Hardware State
Track hardware state (started/stopped) to avoid conflicts.
5. Channel Validation
Always validate channel names against available channels.
Example: Complete Minimal Device
Here’s a minimal but complete device implementation:
from pathlib import Path
from xclock.devices.daq_device import ClockDaqDevice, ClockChannel, EdgeType
from xclock.errors import XClockException
class MinimalDevice(ClockDaqDevice):
"""Minimal device implementation for reference."""
def __init__(self):
self.handle = 1 # Dummy handle
self.base_clock_frequency_hz = 1_000_000
self._clocks = []
@staticmethod
def get_available_output_clock_channels() -> tuple[str, ...]:
return ("CH0", "CH1")
@staticmethod
def get_available_input_start_trigger_channels() -> tuple[str, ...]:
return ("TRIG0",)
def get_added_clock_channels(self) -> list[ClockChannel]:
return self._clocks.copy()
def get_unused_clock_channel_names(self) -> list[str]:
used = {c.channel_name for c in self._clocks}
return [ch for ch in self.get_available_output_clock_channels() if ch not in used]
def add_clock_channel(self, clock_tick_rate_hz, channel_name=None,
number_of_pulses=None, duration_s=None, enable_clock_now=False):
if channel_name is None:
channel_name = self.get_unused_clock_channel_names()[0]
if duration_s and not number_of_pulses:
number_of_pulses = int(duration_s * clock_tick_rate_hz)
channel = ClockChannel(
channel_name=channel_name,
clock_id=len(self._clocks) + 1,
clock_enabled=enable_clock_now,
actual_sample_rate_hz=int(clock_tick_rate_hz),
number_of_pulses=number_of_pulses,
)
self._clocks.append(channel)
return channel
def start_clocks(self, wait_for_pulsed_clocks_to_finish=False):
for c in self._clocks:
c.clock_enabled = True
def stop_clocks(self):
for c in self._clocks:
c.clock_enabled = False
def clear_clocks(self):
self._clocks.clear()
def wait_for_trigger_edge(self, channel_name, timeout_s=5.0, edge_type=EdgeType.RISING):
return True
def start_clocks_and_record_edge_timestamps(self, wait_for_pulsed_clocks_to_finish=True,
extra_channels=[], filename=None):
self.start_clocks(wait_for_pulsed_clocks_to_finish)
def close(self):
self.stop_clocks()
self.handle = None
Getting Help
If you need help adding a new device:
Check the
LabJackT4implementation as a referenceReview the
DummyDaqDevicefor a simpler exampleOpen an issue on GitHub with the device details
Join the discussion in the developer community
See Also
../api/devices - API reference
Example:
src/xclock/devices/labjack_devices.pyExample:
src/xclock/devices/dummy_daq_device.py