Skip to main content

Overview

Operator Protocol is the core interface of ClyptQ, standardizing all operations to enable composable and verifiable strategy development.

Core Interface

class Operator(ABC):
    """Standard interface for all operations"""

    def compute(
        self,
        data: Dict[str, TaggedArray],
        timestamp: datetime,
        context: ComputeContext
    ) -> TaggedArray:
        """
        Execute operation

        Args:
            data: Input data {input_id: TaggedArray}
            timestamp: Current timestamp
            context: Execution context (warmup status, strategy state, etc.)

        Returns:
            TaggedArray[N] where N = number of symbols
        """
Core Principles:
  • Pure Functions: Same input → Same output
  • Type Safety: TaggedArray for all inputs/outputs
  • State Management: Store state in self (lookback, warmup, etc.)

TaggedArray System

All data is represented with 4 fields:
@dataclass
class TaggedArray:
    value: np.ndarray[float]    # Actual values
    exists: np.ndarray[bool]    # Data existence
    valid: np.ndarray[bool]     # Value validity (not NaN)
    updated: np.ndarray[bool]   # Updated in current tick
Usage Example:
# OHLCV data
close = TaggedArray(
    value=[42000.0, 43000.0, 44000.0],
    exists=[True, True, True],
    valid=[True, True, False],      # 3rd value is NaN
    updated=[True, True, False]     # 3rd is FFilled
)

# Filter valid values only
valid_prices = close.value[close.valid]  # [42000.0, 43000.0]
Benefits:
  • Explicit Missing Data Handling: Distinguish NaN vs missing data
  • FFill Tracking: updated=False indicates FFilled value
  • Consistent Error Handling: All Operators handle errors the same way

Operator Roles

Operators are classified into 9 roles: 1. ALPHA: Generate predictive signals
class MomentumAlpha(Operator):
    role = OperatorRole.ALPHA

    def compute(self, data, timestamp, context):
        close = data["close"]
        returns = (close.value / close.value[0]) - 1.0
        return TaggedArray(value=returns, ...)
2. FACTOR: Systematic exposure (beta, sector, etc.)
class MarketBetaFactor(Operator):
    role = OperatorRole.FACTOR
3. FILTER: Universe selection (boolean output)
class LiquidityFilter(Operator):
    role = OperatorRole.FILTER

    def compute(self, data, timestamp, context):
        volume = data["volume"]
        meets_threshold = volume.value > self.min_volume
        return TaggedArray(value=meets_threshold.astype(float), ...)
4. INDICATOR: Technical indicators
class RSI(Operator):
    role = OperatorRole.INDICATOR
5. METRIC: Performance measurement
class SharpeRatio(Operator):
    role = OperatorRole.METRIC
6. TRANSFORM: Data transformation
class Returns(Operator):
    role = OperatorRole.TRANSFORM
7. OPTIMIZER: Portfolio optimization
class MeanVarianceOptimizer(Operator):
    role = OperatorRole.OPTIMIZER
8. SCORE: Ranking
class ZScore(Operator):
    role = OperatorRole.SCORE
9. SEMANTIC: LLM integration
class LLMScorer(Operator):
    role = OperatorRole.SEMANTIC

ComputeGraph

Operators are connected in a DAG to compose strategies:
from clyptq.system.graph import ComputeGraph, Input

graph = ComputeGraph()

# Add nodes
close = graph.add_node("close", FieldExtractor("close"))

sma_fast = graph.add_node("sma_fast",
    SMA(Input("close", lookback=10), span=10)
)

sma_slow = graph.add_node("sma_slow",
    SMA(Input("close", lookback=20), span=20)
)

signal = graph.add_node("signal",
    CrossoverAlpha(
        Input("sma_fast", lookback=1),
        Input("sma_slow", lookback=1)
    )
)

# Execute (automatic dependency resolution)
results = graph.compute(timestamp, data_provider, context)
alpha = results["signal"]
Graph Features:
  • Automatic Dependency Resolution: Topological sort determines execution order
  • Parallel Execution: Independent nodes execute in parallel
  • Lookback Management: Automatically provides historical data

Input Specification

Operators declare dependencies using Input objects:
Input(
    node_id="close",      # Dependent node
    lookback=20,          # Required historical data count
    field=None            # Specific field (for multi-output)
)
Usage Examples:
# Simple input (current value only)
current_close = Input("close", lookback=1)

# Historical data required (20 ticks)
historical_close = Input("close", lookback=20)

# Specific field from multi-output
portfolio_weights = Input("optimizer", lookback=1, field="weights")

Warmup System

Operators automatically handle warmup periods:
class SMA(Operator):
    def __init__(self, input_close: Input, span: int):
        self.span = span
        self._buffer = []  # Warmup buffer

    def compute(self, data, timestamp, context):
        close = data["close"]

        # During warmup
        if len(self._buffer) < self.span:
            self._buffer.append(close.value)
            return self._get_warmup_output(close)

        # Normal computation
        self._buffer.append(close.value)
        self._buffer = self._buffer[-self.span:]  # Keep last N

        sma = np.mean(self._buffer, axis=0)
        return TaggedArray(value=sma, exists=close.exists,
                          valid=True, updated=True)
Warmup Handling:
  • context.is_warmup: Whether in warmup period
  • _get_warmup_output(): Default output during warmup (usually NaN)
  • Semantic Operators skip API calls during warmup

Best Practices

1. Maintain Pure Functions
# Good: Store state in self only
class MyOperator(Operator):
    def __init__(self):
        self._state = []

    def compute(self, data, timestamp, context):
        # Pure computation
        result = self._process(data)
        self._state.append(result)  # Store state in self
        return result
2. TaggedArray Validation
def compute(self, data, timestamp, context):
    input_data = data["input"]

    # Process valid values only
    if not input_data.valid.any():
        return self._get_warmup_output(input_data)

    valid_values = input_data.value[input_data.valid]
    # Perform computation
3. Error Handling
def compute(self, data, timestamp, context):
    try:
        result = self._compute_impl(data)
        return result
    except Exception as e:
        logger.error(f"Operator failed: {e}")
        # Return safe default value
        return self._get_warmup_output(data["input"])
4. Efficient Lookback
# Good: Request only what's needed
Input("close", lookback=20)  # SMA(20) needs 20 ticks

# Bad: Excessive lookback
Input("close", lookback=1000)  # Wastes memory

Performance Considerations

Operator Execution Cost:
  • INDICATOR/TRANSFORM: O(N) where N = number of symbols
  • ALPHA/FACTOR: O(N × L) where L = lookback
  • OPTIMIZER: O(N²) or O(N³) (depends on optimization algorithm)
  • SEMANTIC: API latency (100-1000ms)
Optimization Tips:
  1. Minimize lookback
  2. Cache expensive computations (store in self)
  3. Skip Semantic Operators during warmup
  4. Use vectorized operations (NumPy)