Edge Detection API Reference
This page documents the edge detection and timestamp recording functionality in XClock.
Overview
The edge detection module provides real-time monitoring and timestamp recording of digital signal edges. This is essential for synchronizing data acquisition across multiple devices by providing precise timing information.
from xclock.edge_detection import detect_edges, process_timestamps
Key Concepts
Timestamp Recording
When recording timestamps, XClock:
Continuously streams data from input channels
Detects rising and falling edges in real-time
Calculates precise nanosecond timestamps
Handles hardware timer rollover
Writes timestamps to CSV files
Edge Types
Edges are encoded as signed integers:
Positive integers (1, 2, 3, …): Rising edges on clock 1, 2, 3, …
Negative integers (-1, -2, -3, …): Falling edges on clock 1, 2, 3, …
Timestamp Format
Timestamps are 64-bit integers representing nanoseconds since device start:
Timestamp (ns) = (Counter Value × 10^9) / Base Clock Frequency
Unix Timestamp (ns) = Host system time in nanoseconds since Unix epoch
CSV Output Format
Timestamp files are saved as CSV with three columns:
Column |
Type |
Description |
|---|---|---|
0 |
int64 |
Timestamp in nanoseconds (relative to device start) |
1 |
int8 |
Edge type (positive=rising, negative=falling) |
2 |
int64 |
Unix timestamp in nanoseconds (host system time) |
Example CSV content:
1000000,1,1638360000000000000
1500000,-1,1638360000500000000
2000000,1,1638360001000000000
2500000,-1,1638360001500000000
2000000,2,1638360001000000000
2500000,-2,1638360001500000000
Interpretation:
Line 1: Rising edge on clock 1 at 1.0 ms (device time), Unix timestamp in column 3
Line 2: Falling edge on clock 1 at 1.5 ms (device time)
Line 3: Rising edge on clock 1 at 2.0 ms (device time)
Line 4: Falling edge on clock 1 at 2.5 ms (device time)
Line 5: Rising edge on clock 2 at 2.0 ms (device time, synchronized with clock 1)
Line 6: Falling edge on clock 2 at 2.5 ms (device time)
The Unix timestamp (column 2) provides the host system time when the edge was detected, useful for synchronizing with other systems or absolute time references.
Device-Specific Implementation
LabJack Edge Streaming
The LabJack T4 uses the LabJackEdgeStreamer class for continuous edge detection.
LabJackEdgeStreamer
class LabJackEdgeStreamer:
"""
Real-time edge detection and timestamp recording for LabJack devices.
This class manages continuous streaming from LabJack input channels,
detects edges, and records timestamps to a CSV file.
Attributes:
handle: LabJack device handle
channels: List of channel names to monitor
filename: Output CSV file path
base_clock_hz: Device base clock frequency
"""
Internal Operation:
Configures LabJack streaming mode
Reads data buffers continuously
Processes timer values with rollover detection
Detects state changes (edges)
Calculates nanosecond timestamps
Writes to CSV file
Threading:
Runs in background thread
Thread-safe operation
Graceful shutdown on stop
Usage Examples
Basic Timestamp Recording
from xclock.devices import LabJackT4
from pathlib import Path
device = LabJackT4()
# Add clocks
device.add_clock_channel(100, "FIO0", duration_s=10)
device.add_clock_channel(60, "FIO1", duration_s=10)
# Record timestamps
output = Path("timestamps.csv")
device.start_clocks_and_record_edge_timestamps(
wait_for_pulsed_clocks_to_finish=True,
filename=output,
)
device.close()
Monitoring Additional Channels
from xclock.devices import LabJackT4
device = LabJackT4()
# Add clock channels
device.add_clock_channel(100, "FIO0", duration_s=5)
# Also monitor external signals
device.start_clocks_and_record_edge_timestamps(
wait_for_pulsed_clocks_to_finish=True,
extra_channels=["EIO4", "EIO5"], # Monitor these too
filename="all_edges.csv",
)
device.close()
Analyzing Recorded Timestamps
import numpy as np
from pathlib import Path
# Load timestamp data
data = np.loadtxt("timestamps.csv", dtype=np.int64, delimiter=",")
timestamps_ns = data[:, 0] # Device-relative timestamps
edge_types = data[:, 1] # Edge type
unix_timestamps_ns = data[:, 2] # Unix timestamps (host time)
# Extract clock 1 rising edges
clock1_rising_times = timestamps_ns[edge_types == 1]
# Calculate inter-pulse intervals
intervals_ns = np.diff(clock1_rising_times)
intervals_us = intervals_ns / 1000 # Convert to microseconds
print(f"Mean interval: {np.mean(intervals_us):.2f} μs")
print(f"Std deviation: {np.std(intervals_us):.2f} μs")
# Expected interval for 100 Hz clock
expected_us = 1_000_000 / 100 # 10,000 μs
print(f"Expected: {expected_us:.2f} μs")
# Check timing accuracy
tolerance_us = 1.0 # 1 microsecond tolerance
accurate = np.all(np.abs(intervals_us - expected_us) < tolerance_us)
print(f"All intervals within tolerance: {accurate}")
Separating Clocks
import numpy as np
# Load data
data = np.loadtxt("timestamps.csv", dtype=np.int64, delimiter=",")
timestamps_ns = data[:, 0]
edge_types = data[:, 1]
# Separate by clock
clock1_mask = np.abs(edge_types) == 1
clock2_mask = np.abs(edge_types) == 2
clock1_times = timestamps_ns[clock1_mask]
clock1_edges = edge_types[clock1_mask]
clock2_times = timestamps_ns[clock2_mask]
clock2_edges = edge_types[clock2_mask]
print(f"Clock 1: {len(clock1_times)} edges")
print(f"Clock 2: {len(clock2_times)} edges")
# Find simultaneous events (within 1 μs)
tolerance_ns = 1000
for t1 in clock1_times:
nearby = clock2_times[np.abs(clock2_times - t1) < tolerance_ns]
if len(nearby) > 0:
print(f"Simultaneous event at {t1} ns")
Detecting Missed Pulses
import numpy as np
# Load data
data = np.loadtxt("timestamps.csv", dtype=np.int64, delimiter=",")
timestamps_ns = data[:, 0]
edge_types = data[:, 1]
# Extract rising edges for 100 Hz clock
rising_edges = timestamps_ns[edge_types == 1]
intervals_ns = np.diff(rising_edges)
# Expected interval (in ns)
expected_ns = 1_000_000_000 / 100 # 10,000,000 ns
# Find intervals significantly larger than expected (missed pulses)
tolerance = 0.1 # 10% tolerance
missed = intervals_ns > expected_ns * (1 + tolerance)
if np.any(missed):
print(f"Warning: {np.sum(missed)} potential missed pulses detected")
missed_indices = np.where(missed)[0]
for idx in missed_indices:
print(f" Gap between pulse {idx} and {idx+1}: {intervals_ns[idx]/1e6:.2f} ms")
else:
print("No missed pulses detected")
Rollover Handling
Hardware timers are typically 32-bit, causing rollover every ~53 seconds (for 80 MHz clock):
Rollover Period = 2^32 / 80,000,000 Hz ≈ 53.687 seconds
XClock automatically detects and handles rollover:
def handle_rollover(current_value, previous_value, max_value=2**32):
"""
Detect and compensate for timer rollover.
Args:
current_value: Current timer reading
previous_value: Previous timer reading
max_value: Maximum timer value before rollover
Returns:
Actual time difference accounting for rollover
"""
if current_value < previous_value:
# Rollover detected
return (max_value - previous_value) + current_value
else:
return current_value - previous_value
Performance Considerations
Streaming Rate
Maximum sustainable edge detection rate depends on:
USB bandwidth: ~1 MB/s for LabJack T4
CPU processing: Modern CPUs easily handle required rates
Disk I/O: CSV writing is buffered
Typical limits:
Up to ~100,000 edges per second
Limited by USB communication overhead
File size grows ~40 bytes per edge
Memory Usage
Memory usage is modest:
Streaming buffer: ~1-10 MB
Processing overhead: <100 MB
Output buffering: Configurable
File Size Estimation
File Size ≈ (Total Edges) × 60 bytes
Example:
- 2 clocks at 100 Hz for 60 seconds
- 2 × 100 Hz × 60 s × 2 edges/cycle = 24,000 edges
- File size ≈ 24,000 × 60 = 1.44 MB
Note: Each row contains three 64-bit integers plus delimiters and newline.
Error Handling
Common Issues
Dropped samples:
Occurs if streaming can’t keep up
XClock logs warnings
Check USB connection quality
Disk full:
Writing stops gracefully
Partial data saved
Error message logged
Device disconnection:
Streaming stops
Exception raised
Partial data may be recovered
Advanced Usage
Custom Processing Pipeline
import numpy as np
from pathlib import Path
def process_edge_stream(filename, callback):
"""
Process timestamp file with custom callback.
Args:
filename: Path to timestamp CSV
callback: Function called for each edge (timestamp_ns, edge_type)
"""
data = np.loadtxt(filename, dtype=np.int64, delimiter=",")
for timestamp_ns, edge_type in data:
callback(timestamp_ns, edge_type)
# Example: Real-time frequency measurement
class FrequencyMonitor:
def __init__(self, clock_id):
self.clock_id = clock_id
self.last_time = None
self.frequencies = []
def process_edge(self, timestamp_ns, edge_type):
# Only process rising edges of our clock
if edge_type != self.clock_id:
return
if self.last_time is not None:
interval_s = (timestamp_ns - self.last_time) / 1e9
freq_hz = 1.0 / interval_s
self.frequencies.append(freq_hz)
self.last_time = timestamp_ns
def get_stats(self):
if not self.frequencies:
return None
return {
'mean': np.mean(self.frequencies),
'std': np.std(self.frequencies),
'min': np.min(self.frequencies),
'max': np.max(self.frequencies),
}
# Use it
monitor = FrequencyMonitor(clock_id=1)
process_edge_stream("timestamps.csv", monitor.process_edge)
stats = monitor.get_stats()
print(f"Clock 1 frequency: {stats['mean']:.2f} ± {stats['std']:.2f} Hz")
See Also
Devices API Reference - Device API reference
Errors API Reference - Error handling
Quick Start Guide - Usage examples