Real-time Data Guide¶
This guide covers comprehensive real-time data streaming using ProjectX Python SDK v3.5.7+. All real-time operations are fully asynchronous and provide high-performance WebSocket connectivity with automatic reconnection, memory management, and enhanced event forwarding for multi-instrument support.
Overview¶
The ProjectXRealtimeDataManager provides complete real-time market data streaming including OHLCV bars, tick data, price updates, and multi-timeframe synchronization. All operations are designed for high-frequency trading applications with minimal latency.
Key Features¶
- Multi-timeframe Streaming: Simultaneous data across multiple timeframes
- WebSocket Connectivity: High-performance async WebSocket connections
- Automatic Memory Management: Sliding windows with automatic cleanup
- Event-Driven Architecture: Real-time callbacks for all data updates
- Data Synchronization: Synchronized updates across timeframes
- Performance Optimization: DataFrame caching and lock optimization
- DST Handling: Automatic Daylight Saving Time transition management
- MMap Overflow: Disk-based overflow for large datasets
- Dynamic Resource Limits: Adaptive memory and CPU management
Getting Started¶
Basic Real-time Setup¶
import asyncio
from project_x_py import TradingSuite
async def basic_realtime_setup():
# Initialize with real-time capabilities
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min", "5min", "15min"], # Multiple timeframes
initial_days=2 # Historical data for context
)
# Real-time data manager is automatically initialized and connected
data_manager = suite.data
print("Real-time connection established!")
# Get current price
current_price = await data_manager.get_current_price()
if current_price:
print(f"Current MNQ price: ${current_price:.2f}")
# Get recent data
recent_1min = await data_manager.get_data("1min", count=10)
if recent_1min is not None:
print(f"Last 10 1-minute bars: {len(recent_1min)} rows")
await suite.disconnect()
asyncio.run(basic_realtime_setup())
Health Monitoring¶
Monitor the health and status of the real-time data manager:
async def health_monitoring():
suite = await TradingSuite.create("MNQ")
# Get health score (0-100)
health_score = await suite.data.get_health_score()
print(f"Health Score: {health_score:.1f}/100")
# Get comprehensive statistics
stats = await suite.data.get_stats()
print(f"Component Status: {stats.status}")
print(f"Uptime: {stats.uptime_seconds}s")
print(f"Error Count: {stats.error_count}")
# Memory statistics
memory_stats = await suite.data.get_memory_stats()
print(f"Total Bars: {memory_stats.total_bars:,}")
print(f"Memory Usage: {memory_stats.memory_usage_mb:.2f} MB")
await suite.disconnect()
asyncio.run(health_monitoring())
Real-time Data Access¶
Getting Bar Data¶
Access OHLCV bar data across multiple timeframes:
async def accessing_bar_data():
suite = await TradingSuite.create("MNQ", timeframes=["1min", "5min"])
# Get all available data for a timeframe
all_bars = await suite.data.get_data("1min")
# Get specific number of bars
recent_bars = await suite.data.get_data("5min", count=20)
# Get data for a time range
from datetime import datetime, timedelta
end_time = datetime.now()
start_time = end_time - timedelta(hours=2)
range_bars = await suite.data.get_data(
timeframe="1min",
start_time=start_time,
end_time=end_time
)
# Always check for None returns
if all_bars is not None and not all_bars.is_empty():
latest = all_bars.tail(1)
print(f"Latest 1min close: ${latest['close'][0]:.2f}")
await suite.disconnect()
asyncio.run(accessing_bar_data())
Current Price and Volume¶
Get real-time price and volume information:
async def price_and_volume():
suite = await TradingSuite.create("MNQ", timeframes=["1min"])
# Current price from latest tick or bar
current_price = await suite.data.get_current_price()
if current_price:
print(f"Current price: ${current_price:.2f}")
# Latest price from specific timeframe
latest_price = await suite.data.get_latest_price()
if latest_price:
print(f"Latest price: ${latest_price:.2f}")
# Price range statistics
price_range = await suite.data.get_price_range(
timeframe="1min",
bars=100
)
if price_range:
print(f"100-bar range: ${price_range['low']:.2f} - ${price_range['high']:.2f}")
print(f"Range size: ${price_range['range']:.2f}")
# Volume statistics
vol_stats = await suite.data.get_volume_stats(timeframe="1min")
if vol_stats:
print(f"Total volume: {vol_stats['total_volume']:,}")
print(f"Average volume: {vol_stats['avg_volume']:.0f}")
await suite.disconnect()
asyncio.run(price_and_volume())
Multi-Timeframe Synchronization¶
Working with Multiple Timeframes¶
async def multi_timeframe_analysis():
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min", "5min", "15min", "1hour"]
)
# Get data from all timeframes concurrently
timeframes = ["1min", "5min", "15min", "1hour"]
tasks = [suite.data.get_data(tf, count=10) for tf in timeframes]
results = await asyncio.gather(*tasks)
# Analyze alignment
for tf, data in zip(timeframes, results):
if data is not None and not data.is_empty():
latest = data.tail(1)
close = latest['close'][0]
print(f"{tf:>6}: ${close:.2f}")
await suite.disconnect()
asyncio.run(multi_timeframe_analysis())
Performance Optimization¶
Memory Management¶
Configure and monitor memory usage:
async def memory_optimization():
from project_x_py.realtime_data_manager.types import DataManagerConfig
# Configure memory limits
config = DataManagerConfig(
max_bars_per_timeframe=1000, # Limit bars in memory
enable_mmap_overflow=True, # Use disk for overflow
overflow_threshold=0.8, # Overflow at 80% capacity
enable_dynamic_limits=True # Adaptive limits
)
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min"],
data_manager_config=config
)
# Monitor memory usage
memory_stats = await suite.data.get_memory_stats()
print(f"Memory Usage: {memory_stats.memory_usage_mb:.2f} MB")
print(f"Cache Efficiency: {memory_stats.cache_efficiency:.1%}")
# Optimize data access patterns
optimization = await suite.data.optimize_data_access_patterns()
print(f"Cache improvement: {optimization['cache_improvement']:.1%}")
await suite.disconnect()
asyncio.run(memory_optimization())
Lock Optimization¶
Monitor and optimize lock contention:
async def lock_optimization():
suite = await TradingSuite.create("MNQ", timeframes=["1min"])
# Get lock statistics
lock_stats = await suite.data.get_lock_optimization_stats()
print(f"Lock acquisitions: {lock_stats['total_acquisitions']}")
print(f"Average wait time: {lock_stats['avg_wait_time_ms']:.2f}ms")
print(f"Contention rate: {lock_stats['contention_rate']:.1%}")
await suite.disconnect()
asyncio.run(lock_optimization())
DST Handling¶
Automatic DST Transition Management¶
async def dst_aware_trading():
from project_x_py.realtime_data_manager.types import DataManagerConfig
# Configure with timezone awareness
config = DataManagerConfig(
session_type="RTH", # Regular Trading Hours
timezone="America/New_York" # Exchange timezone
)
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min"],
data_manager_config=config
)
# DST transitions are handled automatically:
# - Spring forward: Missing hour is skipped
# - Fall back: Duplicate hour is disambiguated
# - Bar timestamps are adjusted correctly
# Data access works normally across DST boundaries
bars = await suite.data.get_data("1min")
if bars is not None:
print(f"Bars across DST: {len(bars)}")
await suite.disconnect()
asyncio.run(dst_aware_trading())
Advanced Features¶
MMap Overflow for Large Datasets¶
Handle large amounts of historical data with disk overflow:
async def large_dataset_handling():
from project_x_py.realtime_data_manager.types import DataManagerConfig
config = DataManagerConfig(
max_bars_per_timeframe=500, # Low memory limit
enable_mmap_overflow=True, # Enable disk overflow
overflow_threshold=0.8, # Trigger at 80%
mmap_storage_path="/tmp/overflow" # Storage location
)
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min"],
data_manager_config=config,
initial_days=30 # Large initial dataset
)
# Check overflow statistics
overflow_stats = await suite.data.get_overflow_stats("1min")
if overflow_stats:
print(f"Overflowed bars: {overflow_stats['total_overflowed_bars']}")
print(f"Disk usage: {overflow_stats['disk_storage_size_mb']:.2f} MB")
# Data access seamlessly combines memory and disk
all_data = await suite.data.get_data("1min")
if all_data is not None:
print(f"Total bars available: {len(all_data)}")
await suite.disconnect()
asyncio.run(large_dataset_handling())
Dynamic Resource Management¶
Adaptive resource limits based on system load:
async def dynamic_resources():
from project_x_py.realtime_data_manager.types import DataManagerConfig
config = DataManagerConfig(
enable_dynamic_limits=True,
memory_threshold_percent=80.0, # Adjust at 80% memory
cpu_threshold_percent=70.0 # Adjust at 70% CPU
)
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min"],
data_manager_config=config
)
# Monitor resource adaptation
resource_stats = await suite.data.get_resource_stats()
print(f"Memory limit: {resource_stats.get('memory_limit_mb', 0):.0f} MB")
print(f"CPU usage: {resource_stats['cpu_percent']:.1f}%")
print(f"Thread count: {resource_stats['num_threads']}")
await suite.disconnect()
asyncio.run(dynamic_resources())
Error Handling¶
Proper Error Handling Patterns¶
async def robust_data_access():
suite = await TradingSuite.create("MNQ", timeframes=["1min"])
try:
# Always check for None returns
data = await suite.data.get_data("1min")
if data is None:
print("No data available yet")
return
# Check for empty DataFrames
if data.is_empty():
print("DataFrame is empty")
return
# Safe data access
if len(data) > 0:
latest = data.tail(1)
close_price = latest["close"][0]
print(f"Latest close: ${close_price:.2f}")
except Exception as e:
print(f"Error accessing data: {e}")
# Log error for debugging
import logging
logging.error(f"Data access error: {e}", exc_info=True)
finally:
# Always cleanup
await suite.disconnect()
asyncio.run(robust_data_access())
Best Practices¶
1. Resource Management¶
# ✅ Good: Limit timeframes to what you need
suite = await TradingSuite.create("MNQ", timeframes=["1min", "5min"])
# ❌ Bad: Too many unnecessary timeframes
suite = await TradingSuite.create("MNQ",
timeframes=["1sec", "5sec", "10sec", "15sec", "30sec", "1min", "2min", "5min", "15min", "30min", "1hour"])
2. Data Access¶
# ✅ Good: Get only needed data
recent = await suite.data.get_data("1min", count=100)
# ❌ Bad: Get all data when you only need recent
all_data = await suite.data.get_data("1min")
recent = all_data.tail(100) if all_data else None
3. Null Checking¶
# ✅ Good: Always check for None and empty
data = await suite.data.get_data("1min")
if data is not None and not data.is_empty():
# Process data safely
pass
# ❌ Bad: Assume data exists
data = await suite.data.get_data("1min")
latest = data.tail(1) # May fail!
4. Cleanup¶
# ✅ Good: Use try/finally for cleanup
try:
suite = await TradingSuite.create("MNQ")
# Use suite
finally:
await suite.disconnect()
# ✅ Better: Use async context manager (when available)
async with await TradingSuite.create("MNQ") as suite:
# Suite automatically cleaned up
pass
Performance Tips¶
- Configure memory limits - Set appropriate
max_bars_per_timeframe
- Enable overflow - Use MMap overflow for long-running sessions
- Monitor health - Check health scores and statistics regularly
- Optimize access - Use
count
parameter to limit data retrieval - Enable caching - DataFrame optimization improves repeated access
- Use appropriate timeframes - Don't subscribe to unnecessary timeframes
- Batch operations - Use
asyncio.gather()
for concurrent operations
Troubleshooting¶
Common Issues¶
No data returned - Check if real-time feed is started - Verify authentication and connection - Allow time for initial data to accumulate
High memory usage
- Enable MMap overflow
- Reduce max_bars_per_timeframe
- Enable dynamic resource limits
- Call cleanup()
periodically
Performance degradation - Check lock contention statistics - Optimize data access patterns - Reduce number of timeframes - Enable DataFrame caching
See Also¶
- Data Manager API - Complete API reference
- Trading Suite Guide - Integrated trading
- Examples - Working code examples