Adding New Device Support

This guide explains how to add support for a new DAQ device to XClock. Following this guide, you can extend XClock to work with any data acquisition hardware that can generate clock signals.

Overview

Adding a new device involves:

  1. Creating a new device class that implements the ClockDaqDevice interface

  2. Implementing all required abstract methods

  3. Writing tests for your device

  4. Documenting device-specific features

  5. Registering the device in the CLI (optional)

Prerequisites

Before you start:

  • Familiarity with Python object-oriented programming

  • Understanding of your DAQ device’s API/SDK

  • The device’s driver/SDK installed and working

  • XClock development environment set up

Step 1: Understand the Interface

All clock devices must inherit from the ClockDaqDevice abstract base class and implement its interface.

Required Interface

from abc import ABC, abstractmethod
from xclock.devices.daq_device import ClockDaqDevice, ClockChannel, EdgeType

class ClockDaqDevice(ABC):
    """Abstract base class for all clock DAQ devices."""

    handle: int | None
    base_clock_frequency_hz: int | float

    @staticmethod
    @abstractmethod
    def get_available_input_start_trigger_channels() -> tuple[str, ...]:
        """Return tuple of channel names that can be used as trigger inputs."""
        pass

    @staticmethod
    @abstractmethod
    def get_available_output_clock_channels() -> tuple[str, ...]:
        """Return tuple of channel names that can output clock signals."""
        pass

    @abstractmethod
    def get_added_clock_channels(self) -> list[ClockChannel]:
        """Return list of currently configured clock channels."""
        pass

    @abstractmethod
    def get_unused_clock_channel_names(self) -> list[str]:
        """Return list of available (not yet used) channel names."""
        pass

    @abstractmethod
    def add_clock_channel(
        self,
        clock_tick_rate_hz: int | float,
        channel_name: str | None = None,
        number_of_pulses: int | None = None,
        duration_s: float | None = None,
        enable_clock_now: bool = False,
    ) -> ClockChannel:
        """Configure a new clock channel."""
        pass

    @abstractmethod
    def wait_for_trigger_edge(
        self,
        channel_name: str,
        timeout_s: float = 5.0,
        edge_type: EdgeType = EdgeType.RISING,
    ) -> bool:
        """Wait for trigger signal. Returns True if triggered, False if timeout."""
        pass

    @abstractmethod
    def start_clocks(
        self,
        wait_for_pulsed_clocks_to_finish: bool = False,
    ):
        """Start all configured clocks."""
        pass

    @abstractmethod
    def start_clocks_and_record_edge_timestamps(
        self,
        wait_for_pulsed_clocks_to_finish: bool = True,
        extra_channels: list[str] = [],
        filename: Path | str | None = None,
    ):
        """Start clocks and record edge timestamps to CSV file."""
        pass

    @abstractmethod
    def stop_clocks(self):
        """Stop all running clocks."""
        pass

    @abstractmethod
    def clear_clocks(self):
        """Remove all configured clocks."""
        pass

    @abstractmethod
    def close(self):
        """Clean up resources and close device connection."""
        pass

Step 2: Create Your Device Class

Create a new file in src/xclock/devices/ for your device. For example, my_daq_device.py:

from pathlib import Path
import logging
from xclock.devices.daq_device import ClockDaqDevice, ClockChannel, EdgeType
from xclock.errors import XClockException

logger = logging.getLogger(__name__)


class MyDAQDevice(ClockDaqDevice):
    """
    XClock driver for MyDAQ Device.

    This device supports:
    - Base clock frequency: 100 MHz
    - 4 output channels (CH0-CH3)
    - 1 trigger input (TRIG0)
    - Hardware-synchronized multi-channel output

    Example:
        >>> device = MyDAQDevice()
        >>> device.add_clock_channel(clock_tick_rate_hz=100, channel_name="CH0")
        >>> device.start_clocks()
        >>> device.close()
    """

    def __init__(self):
        """Initialize MyDAQ device."""
        self.handle = None
        self.base_clock_frequency_hz = 100_000_000  # 100 MHz
        self._clock_channels = []

        # Initialize your device here
        try:
            # Example: Open device connection
            # self.handle = mydaq_sdk.open_device()
            logger.info("MyDAQ device initialized successfully")
        except Exception as e:
            raise XClockException(f"Failed to initialize MyDAQ: {e}")

    @staticmethod
    def get_available_output_clock_channels() -> tuple[str, ...]:
        """Return available output channels."""
        return ("CH0", "CH1", "CH2", "CH3")

    @staticmethod
    def get_available_input_start_trigger_channels() -> tuple[str, ...]:
        """Return available trigger input channels."""
        return ("TRIG0",)

    def get_added_clock_channels(self) -> list[ClockChannel]:
        """Return list of configured clock channels."""
        return self._clock_channels.copy()

    def get_unused_clock_channel_names(self) -> list[str]:
        """Return list of unused channel names."""
        used_names = {ch.channel_name for ch in self._clock_channels}
        all_names = set(self.get_available_output_clock_channels())
        return list(all_names - used_names)

    def add_clock_channel(
        self,
        clock_tick_rate_hz: int | float,
        channel_name: str | None = None,
        number_of_pulses: int | None = None,
        duration_s: float | None = None,
        enable_clock_now: bool = False,
    ) -> ClockChannel:
        """
        Add a new clock channel.

        Args:
            clock_tick_rate_hz: Desired clock frequency in Hz
            channel_name: Output channel name (or None for auto-select)
            number_of_pulses: Number of pulses (None = continuous)
            duration_s: Duration in seconds (auto-calculates pulses)
            enable_clock_now: Start immediately if True

        Returns:
            ClockChannel object with actual configuration
        """
        # Auto-select channel if not specified
        if channel_name is None:
            unused = self.get_unused_clock_channel_names()
            if not unused:
                raise XClockException("No available channels")
            channel_name = unused[0]

        # Validate channel name
        if channel_name not in self.get_available_output_clock_channels():
            raise XClockException(f"Invalid channel: {channel_name}")

        # Check if channel already in use
        if channel_name in [ch.channel_name for ch in self._clock_channels]:
            raise XClockException(f"Channel {channel_name} already in use")

        # Calculate pulses from duration if needed
        if duration_s is not None and number_of_pulses is None:
            number_of_pulses = int(duration_s * clock_tick_rate_hz)

        # Calculate actual achievable frequency
        # This is device-specific - adjust for your hardware
        actual_frequency = self._calculate_actual_frequency(clock_tick_rate_hz)

        # Create clock channel object
        clock_id = len(self._clock_channels) + 1
        channel = ClockChannel(
            channel_name=channel_name,
            clock_id=clock_id,
            clock_enabled=enable_clock_now,
            actual_sample_rate_hz=actual_frequency,
            number_of_pulses=number_of_pulses,
        )

        self._clock_channels.append(channel)

        # Configure hardware
        self._configure_hardware_clock(channel)

        logger.info(f"Added clock: {actual_frequency} Hz on {channel_name}")
        return channel

    def _calculate_actual_frequency(self, requested_hz: float) -> int:
        """
        Calculate actual achievable frequency given device constraints.

        This is device-specific. Implement based on your hardware's
        clock generation mechanism (divisors, PLLs, etc.).
        """
        # Example: Simple divisor-based calculation
        divisor = round(self.base_clock_frequency_hz / requested_hz)
        divisor = max(1, divisor)  # Ensure at least 1
        actual_hz = self.base_clock_frequency_hz / divisor
        return int(actual_hz)

    def _configure_hardware_clock(self, channel: ClockChannel):
        """Configure hardware registers/settings for this clock."""
        # Implement device-specific configuration here
        # Example:
        # mydaq_sdk.configure_clock(
        #     self.handle,
        #     channel.channel_name,
        #     channel.actual_sample_rate_hz,
        #     channel.number_of_pulses
        # )
        pass

    def start_clocks(self, wait_for_pulsed_clocks_to_finish: bool = False):
        """Start all configured clocks."""
        if not self._clock_channels:
            raise XClockException("No clocks configured")

        logger.info(f"Starting {len(self._clock_channels)} clocks")

        # Start clocks on hardware
        # Example:
        # mydaq_sdk.start_all_clocks(self.handle)

        # Mark all as enabled
        for channel in self._clock_channels:
            channel.clock_enabled = True

        # Wait if requested
        if wait_for_pulsed_clocks_to_finish:
            self._wait_for_completion()

    def _wait_for_completion(self):
        """Wait for pulsed clocks to finish."""
        # Implement waiting logic
        # Example:
        # while mydaq_sdk.is_running(self.handle):
        #     time.sleep(0.1)
        pass

    def stop_clocks(self):
        """Stop all running clocks."""
        logger.info("Stopping all clocks")

        # Stop hardware
        # Example:
        # mydaq_sdk.stop_all_clocks(self.handle)

        # Mark all as disabled
        for channel in self._clock_channels:
            channel.clock_enabled = False

    def clear_clocks(self):
        """Remove all configured clocks."""
        self.stop_clocks()
        self._clock_channels.clear()
        logger.info("Cleared all clocks")

    def wait_for_trigger_edge(
        self,
        channel_name: str,
        timeout_s: float = 5.0,
        edge_type: EdgeType = EdgeType.RISING,
    ) -> bool:
        """
        Wait for trigger signal on specified channel.

        Returns:
            True if triggered, False if timeout
        """
        if channel_name not in self.get_available_input_start_trigger_channels():
            raise XClockException(f"Invalid trigger channel: {channel_name}")

        logger.info(f"Waiting for {edge_type.value} edge on {channel_name}")

        # Implement trigger waiting
        # Example:
        # return mydaq_sdk.wait_for_edge(
        #     self.handle,
        #     channel_name,
        #     edge_type.value,
        #     timeout_s
        # )

        return True  # Placeholder

    def start_clocks_and_record_edge_timestamps(
        self,
        wait_for_pulsed_clocks_to_finish: bool = True,
        extra_channels: list[str] = [],
        filename: Path | str | None = None,
    ):
        """Start clocks and record edge timestamps to CSV file."""
        # This is complex - see LabJackT4 implementation for reference
        # You may need to implement a separate edge streamer class

        # Generate default filename if not provided
        if filename is None:
            import time
            output_dir = Path.home() / "Documents" / "XClock"
            output_dir.mkdir(parents=True, exist_ok=True)
            timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S")
            filename = output_dir / f"mydaq_timestamps_{timestamp_str}.csv"

        # Start recording and clocks
        # This typically requires:
        # 1. Start clocks
        # 2. Record timestamps
        # 4. Save to CSV

        raise NotImplementedError("Timestamp recording not yet implemented")

    def close(self):
        """Clean up and close device."""
        if self.handle is not None:
            self.stop_clocks()

            # Close device connection
            # Example:
            # mydaq_sdk.close_device(self.handle)

            logger.info("MyDAQ device closed")
            self.handle = None

    def __del__(self):
        """Destructor to ensure cleanup."""
        try:
            self.close()
        except:
            pass

Step 3: Write Tests

Create a test file tests/xclock/test_my_daq.py:

import pytest
from xclock.devices.my_daq_device import MyDAQDevice
from xclock.devices.daq_device import EdgeType


# Set these for your device
DEVICE_NAME = "MyDAQ"
DEVICE_CLASS = MyDAQDevice


def test_device_initialization():
    """Test that device initializes successfully."""
    device = DEVICE_CLASS()
    assert device is not None
    assert device.handle is not None
    device.close()


def test_get_available_channels():
    """Test that device reports available channels."""
    output_channels = DEVICE_CLASS.get_available_output_clock_channels()
    assert len(output_channels) > 0
    assert all(isinstance(ch, str) for ch in output_channels)

    trigger_channels = DEVICE_CLASS.get_available_input_start_trigger_channels()
    assert len(trigger_channels) >= 0  # May be 0 if no trigger support


def test_add_clock_channel():
    """Test adding a clock channel."""
    device = DEVICE_CLASS()
    channels = device.get_available_output_clock_channels()

    clock = device.add_clock_channel(
        clock_tick_rate_hz=100,
        channel_name=channels[0],
        number_of_pulses=1000,
    )

    assert clock is not None
    assert clock.channel_name == channels[0]
    assert clock.actual_sample_rate_hz > 0
    assert clock.number_of_pulses == 1000

    device.close()


def test_multiple_clocks():
    """Test adding multiple synchronized clocks."""
    device = DEVICE_CLASS()
    channels = device.get_available_output_clock_channels()

    if len(channels) < 2:
        pytest.skip("Device has fewer than 2 channels")

    clock1 = device.add_clock_channel(60, channels[0], number_of_pulses=100)
    clock2 = device.add_clock_channel(100, channels[1], number_of_pulses=100)

    assert clock1.channel_name != clock2.channel_name
    assert len(device.get_added_clock_channels()) == 2

    device.close()


def test_start_stop_clocks():
    """Test starting and stopping clocks."""
    device = DEVICE_CLASS()
    channels = device.get_available_output_clock_channels()

    device.add_clock_channel(100, channels[0], number_of_pulses=10)

    # Should not raise
    device.start_clocks(wait_for_pulsed_clocks_to_finish=False)
    device.stop_clocks()

    device.close()


def test_clear_clocks():
    """Test clearing all clocks."""
    device = DEVICE_CLASS()
    channels = device.get_available_output_clock_channels()

    device.add_clock_channel(100, channels[0])
    assert len(device.get_added_clock_channels()) == 1

    device.clear_clocks()
    assert len(device.get_added_clock_channels()) == 0

    device.close()


def test_auto_channel_selection():
    """Test automatic channel selection."""
    device = DEVICE_CLASS()

    # Don't specify channel_name
    clock = device.add_clock_channel(clock_tick_rate_hz=100)

    assert clock.channel_name in device.get_available_output_clock_channels()

    device.close()


def test_duration_pulse_calculation():
    """Test that duration correctly calculates pulses."""
    device = DEVICE_CLASS()

    clock = device.add_clock_channel(
        clock_tick_rate_hz=100,
        duration_s=5.0,  # 5 seconds at 100 Hz = 500 pulses
    )

    assert clock.number_of_pulses == 500

    device.close()


def test_unused_channels():
    """Test getting unused channel names."""
    device = DEVICE_CLASS()
    channels = device.get_available_output_clock_channels()

    # Initially all unused
    unused = device.get_unused_clock_channel_names()
    assert len(unused) == len(channels)

    # Add one clock
    device.add_clock_channel(100, channels[0])
    unused = device.get_unused_clock_channel_names()
    assert len(unused) == len(channels) - 1
    assert channels[0] not in unused

    device.close()


def test_close_cleanup():
    """Test that close() properly cleans up."""
    device = DEVICE_CLASS()
    device.add_clock_channel(100)
    device.close()

    # After close, handle should be None
    assert device.handle is None

Step 4: Register in CLI (Optional)

If you want your device to be available in the CLI, add it to src/cli/main.py:

from xclock.devices import ClockDaqDevice, DummyDaqDevice, LabJackT4
from xclock.devices.my_daq_device import MyDAQDevice  # Add import

# Device mapping
DEVICE_MAP = {
    "labjackt4": LabJackT4,
    "dummydaqdevice": DummyDaqDevice,
    "mydaqdevice": MyDAQDevice,  # Add your device
}

Step 5: Update Package Exports

Add your device to src/xclock/devices/__init__.py:

from xclock.devices.daq_device import ClockDaqDevice
from xclock.devices.labjack_devices import LabJackT4
from xclock.devices.dummy_daq_device import DummyDaqDevice
from xclock.devices.my_daq_device import MyDAQDevice  # Add import

__all__ = [
    "ClockDaqDevice",
    "LabJackT4",
    "DummyDaqDevice",
    "MyDAQDevice",  # Add to exports
]

Step 6: Document Your Device

Add documentation in docs/source/user/devices.md:

### MyDAQ Device

Brief description of your device.

**Specifications:**

- Base clock frequency: 100 MHz
- Available output channels: 4 (CH0-CH3)
- Trigger input: TRIG0
- ...

**Example:**

\```python
from xclock.devices import MyDAQDevice

device = MyDAQDevice()
device.add_clock_channel(clock_tick_rate_hz=100, channel_name="CH0")
device.start_clocks()
device.close()
\```

Best Practices

Error Handling

Always use XClockException for device-specific errors:

from xclock.errors import XClockException

if not self.handle:
    raise XClockException("Device not initialized")

Logging

Use Python’s logging module:

import logging
logger = logging.getLogger(__name__)

logger.info("Device initialized")
logger.debug(f"Clock configured: {channel_name} at {frequency} Hz")
logger.warning("Frequency adjusted to nearest achievable value")
logger.error("Failed to start clock")

Resource Cleanup

Always implement cleanup in close() and __del__():

def close(self):
    """Clean up resources."""
    if self.handle is not None:
        self.stop_clocks()
        # Close device connection
        self.handle = None

def __del__(self):
    """Ensure cleanup on deletion."""
    try:
        self.close()
    except:
        pass

Type Hints

Use type hints for better IDE support and documentation:

def add_clock_channel(
    self,
    clock_tick_rate_hz: int | float,
    channel_name: str | None = None,
    number_of_pulses: int | None = None,
) -> ClockChannel:
    ...

Advanced Features

Timestamp Recording

For precise timestamp recording, you may need to implement a streaming class similar to LabJackEdgeStreamer. This typically involves:

  1. Starting continuous data acquisition

  2. Processing incoming data in real-time

  3. Detecting edges

  4. Recording timestamps

  5. Saving to CSV format

See LabJackEdgeStreamer in labjack_devices.py for a complete example.

Hardware Synchronization

If your device supports hardware-synchronized multi-channel output:

  1. Configure all channels before starting

  2. Use hardware trigger to start all simultaneously

  3. Ensure all channels reference the same base clock

Frequency Calculation

Different devices have different clock generation methods:

  • Divisor-based: Frequency = BaseClk / Divisor

  • PLL-based: More complex calculations

  • DDS-based: Direct digital synthesis

Implement _calculate_actual_frequency() based on your hardware.

Testing Checklist

Before submitting your device implementation:

  • All abstract methods implemented

  • Unit tests written and passing

  • Device can be initialized

  • Clocks can be added, started, and stopped

  • Multiple clocks work simultaneously

  • Trigger functionality works (if applicable)

  • Resources cleaned up properly

  • Documentation added

  • CLI integration (optional)

  • Example code provided

Common Pitfalls

1. Not Handling Resource Cleanup

Always close device connections in close() and __del__().

2. Ignoring Frequency Limitations

Not all frequencies are achievable. Calculate and return the actual frequency.

3. Thread Safety

If using threading (e.g., for timestamp recording), ensure thread-safe operations.

4. Hardware State

Track hardware state (started/stopped) to avoid conflicts.

5. Channel Validation

Always validate channel names against available channels.

Example: Complete Minimal Device

Here’s a minimal but complete device implementation:

from pathlib import Path
from xclock.devices.daq_device import ClockDaqDevice, ClockChannel, EdgeType
from xclock.errors import XClockException


class MinimalDevice(ClockDaqDevice):
    """Minimal device implementation for reference."""

    def __init__(self):
        self.handle = 1  # Dummy handle
        self.base_clock_frequency_hz = 1_000_000
        self._clocks = []

    @staticmethod
    def get_available_output_clock_channels() -> tuple[str, ...]:
        return ("CH0", "CH1")

    @staticmethod
    def get_available_input_start_trigger_channels() -> tuple[str, ...]:
        return ("TRIG0",)

    def get_added_clock_channels(self) -> list[ClockChannel]:
        return self._clocks.copy()

    def get_unused_clock_channel_names(self) -> list[str]:
        used = {c.channel_name for c in self._clocks}
        return [ch for ch in self.get_available_output_clock_channels() if ch not in used]

    def add_clock_channel(self, clock_tick_rate_hz, channel_name=None,
                         number_of_pulses=None, duration_s=None, enable_clock_now=False):
        if channel_name is None:
            channel_name = self.get_unused_clock_channel_names()[0]

        if duration_s and not number_of_pulses:
            number_of_pulses = int(duration_s * clock_tick_rate_hz)

        channel = ClockChannel(
            channel_name=channel_name,
            clock_id=len(self._clocks) + 1,
            clock_enabled=enable_clock_now,
            actual_sample_rate_hz=int(clock_tick_rate_hz),
            number_of_pulses=number_of_pulses,
        )
        self._clocks.append(channel)
        return channel

    def start_clocks(self, wait_for_pulsed_clocks_to_finish=False):
        for c in self._clocks:
            c.clock_enabled = True

    def stop_clocks(self):
        for c in self._clocks:
            c.clock_enabled = False

    def clear_clocks(self):
        self._clocks.clear()

    def wait_for_trigger_edge(self, channel_name, timeout_s=5.0, edge_type=EdgeType.RISING):
        return True

    def start_clocks_and_record_edge_timestamps(self, wait_for_pulsed_clocks_to_finish=True,
                                                extra_channels=[], filename=None):
        self.start_clocks(wait_for_pulsed_clocks_to_finish)

    def close(self):
        self.stop_clocks()
        self.handle = None

Getting Help

If you need help adding a new device:

  1. Check the LabJackT4 implementation as a reference

  2. Review the DummyDaqDevice for a simpler example

  3. Open an issue on GitHub with the device details

  4. Join the discussion in the developer community

See Also

  • ../api/devices - API reference

  • Example: src/xclock/devices/labjack_devices.py

  • Example: src/xclock/devices/dummy_daq_device.py