Skip to main content

Overview

ClyptQ Data System integrates multi-asset class data and provides consistent data access from development to production through environment-based automatic routing.

Data Sources

Supported Asset Classes:
  • Crypto: Binance, Coinbase, Gate.io (spot & futures)
  • Stocks: Yahoo Finance, Tiingo
  • Macro: FRED (economic indicators)
  • On-Chain: CryptoQuant (on-chain data)
Data Types:
  • OHLCV: Price candles (1m, 5m, 1h, 1d, etc.)
  • Orderbook: L2 order book snapshots
  • Tick: Trade execution data
  • Fundamentals: Financial data
  • Alternative: News, social media

Observation Specs

Data is defined with Specs:
from clyptq.apps.trading.spec.observation import OHLCVSpec, OrderBookSpec

# OHLCV data
ohlcv_spec = OHLCVSpec(
    identifier="ohlcv_1m",
    venue="binance",
    market_type="futures",
    timeframe="1m",
    fields=["open", "high", "low", "close", "volume"]
)

# Orderbook data
orderbook_spec = OrderBookSpec(
    identifier="orderbook",
    venue="binance",
    market_type="futures",
    depth=20  # Top 20 levels
)

# Include in TradingSpec
spec = TradingSpec(
    observations=[ohlcv_spec, orderbook_spec],
    ...
)

Storage Architecture

Environment-based Automatic Routing:
EnvironmentStorage BackendPath
devLocal Parquet./data
prodEFS Parquet/efs/data
kernelBackend APIHTTP
Configuration:
# Dev (default)
export CLYPTQ_ENV=dev
export CLYPTQ_DATA_ROOT=./data

# Prod
export CLYPTQ_ENV=prod
export CLYPTQ_STORAGE_ROOT=/efs/data

# Kernel (Jupyter, Web)
export CLYPTQ_ENV=kernel
export CLYPTQ_BACKEND_URL=http://backend:8000
Automatic Routing:
from clyptq.data.storage import get_storage

# Auto-select based on environment
storage = get_storage("binance/futures/ohlcv")
# dev → ParquetStore(./data)
# prod → ParquetStore(/efs/data)
# kernel → APIStore(backend_url)

Data Loading

Backtest Mode:
driver = TradingDriver.from_spec(spec)

driver.load_data(
    items=["BTC/USDT:USDT", "ETH/USDT:USDT"],
    start=datetime(2024, 1, 1, tzinfo=timezone.utc),
    end=datetime(2024, 12, 31, tzinfo=timezone.utc)
)

# Pre-load and validate all data
# → ValidationReport auto-generated
Live Mode:
driver.load_data(
    items=["BTC/USDT:USDT"],
    mode="live",
    warmup_ticks=100  # Historical data for initialization
)

# Historical warmup + Live streaming
# → Gaps auto-filled

Data Validation

ValidationReport: Automatic validation on all data loading:
# Coverage check
if provider._validation_report.has_errors:
    provider._validation_report.print_summary()
    # ERROR: No data for BTC/USDT:USDT
    # WARNING: Low coverage 50% (5/10 symbols)
Validation Items:
  • Coverage: All requested symbols loaded
  • NaN Ratio: Ratio of invalid data
  • Time Range: Covers requested period
  • Timestamps: Sufficient data points
Quality Criteria:
report = ValidationReport(
    fail_on_no_data=True,           # Error if no data
    fail_on_zero_coverage=True,     # Error if all symbols missing
    warn_on_low_coverage=0.5,       # Warning if less than 50% coverage
    warn_on_high_nan_ratio=0.9      # Warning if more than 90% NaN
)

Data Preprocessing

Forward-Fill (FFill): Missing data is automatically FFilled:
# Tracked with TaggedArray
TaggedArray(
    value=42000.0,
    exists=True,
    valid=True,
    updated=False  # FFilled value (updated=False)
)
FFill Behavior:
  1. First missing: valid=False (cannot FFill)
  2. Subsequent missing: FFill with last valid value → valid=True, updated=False
  3. New value: valid=True, updated=True
Timestamp Synchronization: Align data from multiple sources to common timestamps:
# Source 1: OHLCV (1-minute intervals)
# Source 2: Orderbook (real-time)

# → Synchronize to common timestamp set
# → Create TaggedArray for each timestamp

Live Data Handling

Warmup + Gap Filling:
# Warmup request: 100 ticks
driver.load_data(
    items=["BTC/USDT:USDT"],
    mode="live",
    warmup_ticks=100
)

# 1. Historical warmup attempt (50 ticks available)
# 2. Fill remaining 50 ticks with live stream
# 3. Store in gap buffer
# 4. Switch to realtime after warmup complete
Source Type Tracking: Driver tracks data source:
# driver._provider._last_source_type
"history"   # Historical warmup
"gap"       # Gap filling (warmup shortfall)
"realtime"  # Live stream
Warmup Skip: Gap data can skip warmup computation:
def compute(self, data, timestamp, context):
    if context.is_warmup:
        # Don't treat gap data as warmup
        return self._get_warmup_output(data)

Symbol-Source Mapping

SymbolSourceMap maps symbols to specs:
from clyptq.apps.trading.spec.unified import SymbolSourceMap

symbol_map = SymbolSourceMap(
    mappings={
        "BTC/USDT:USDT": ["ohlcv_1m", "orderbook"],
        "ETH/USDT:USDT": ["ohlcv_1m"],
        "AAPL": ["stock_1d"]
    }
)

spec = TradingSpec(
    observations=[ohlcv_spec, orderbook_spec, stock_spec],
    symbol_source_map=symbol_map
)
Behavior:
  • BTC loads from both ohlcv_1m + orderbook
  • ETH loads from ohlcv_1m only
  • AAPL loads from stock_1d only

Redis Integration

Multi-Consumer Optimization: When multiple strategies use the same data:
# Collector → Redis Pub/Sub
collector = CollectorRegistry.create(
    domain="binance.futures.ohlcv",
    mode="live"
)

redis_client = redis.Redis.from_url("redis://localhost:6379/0")

def publish_to_redis(data):
    channel = f"live:ohlcv_1m:{data['symbol']}"
    redis_client.publish(channel, json.dumps(data))

collector.subscribe(items=symbols, callback=publish_to_redis)

# Strategies → Redis Subscribe
spec = TradingSpec(
    data=TradingDataSpec(
        redis_mode=True,
        redis_url="redis://localhost:6379/0"
    )
)
Benefits:
  • Single WebSocket connection
  • Multiple strategies consume same data
  • Reduced collector load

Metadata Catalog

Pre-Execution Validation: Check data availability before execution:
from clyptq.data.storage import get_storage

storage = get_storage("binance/futures/ohlcv")

# Check metadata
availability = storage.check_data_availability(
    items=["BTC/USDT:USDT", "ETH/USDT:USDT"],
    category="binance/futures/ohlcv",
    timeframe="1m",
    start=datetime(2024, 1, 1),
    end=datetime(2024, 12, 31)
)

for symbol, date_range in availability.items():
    if date_range is None:
        print(f"{symbol}: No data available")
    else:
        start, end = date_range
        print(f"{symbol}: {start} to {end}")
Gap Detection: Find missing periods:
all_meta = storage.list_metadata(
    category="binance/futures/ohlcv",
    timeframe="1m"
)

for meta in all_meta:
    if meta.start > required_start:
        print(f"{meta.item}: Gap before {meta.start}")
    if meta.end < required_end:
        print(f"{meta.item}: Gap after {meta.end}")

Best Practices

1. Pre-Execution Validation
# Check data before execution
availability = storage.check_data_availability(...)
if any(v is None for v in availability.values()):
    raise ValueError("Missing data for some symbols")
2. Environment Awareness
# Use appropriate storage per environment
# get_storage() handles this automatically
storage = get_storage(category)
3. Check Validation Report
driver.load_data(...)
if driver._provider._validation_report.has_errors:
    driver._provider._validation_report.print_summary()
    # Handle errors
4. Set Warmup Ticks Appropriately
# Base on indicators
# SMA(50) → warmup_ticks=50 or more
driver.load_data(mode="live", warmup_ticks=100)
5. Use Redis
# When running multiple strategies
# Share data via Redis
spec = TradingSpec(
    data=TradingDataSpec(redis_mode=True)
)

Performance

Typical Sizes:
  • 1 month 1m OHLCV, 100 symbols: ~200 MB
  • 1 year 1d stock, 500 symbols: ~50 MB
  • Live orderbook, 20 symbols: ~5 MB cache
Loading Time:
  • Backtest (1 month, 100 symbols): 2-5 seconds
  • Live warmup (100 ticks): 1-3 seconds
  • Redis subscribe: less than 100ms

Ecosystem Value

For Builders:
  • Immediate access to diverse data sources
  • Production infrastructure provided
  • Identical to development environment
For Buyers:
  • Data quality guaranteed
  • Transparent validation process
  • Reproducible results