Data Manager API¶
Real-time data processing and management with WebSocket streaming, multi-timeframe support, and efficient memory management.
Overview¶
The ProjectXRealtimeDataManager
handles real-time market data streaming via WebSocket connections, processes OHLCV bar data across multiple timeframes, and provides efficient data access with automatic memory management.
Quick Start¶
from project_x_py import TradingSuite
import asyncio
async def basic_data_usage():
# Create suite with real-time data
suite = await TradingSuite.create(
["MNQ"],
timeframes=["1min", "5min", "15min"]
)
# Access the integrated data manager for the specific instrument
data_manager = suite["MNQ"].data
# Get current price
current_price = await data_manager.get_current_price()
if current_price:
print(f"Current MNQ Price: ${current_price:.2f}")
# Get latest bars
bars_1min = await data_manager.get_data("1min")
bars_5min = await data_manager.get_data("5min")
if bars_1min is not None:
print(f"1min bars: {len(bars_1min)}")
if bars_5min is not None:
print(f"5min bars: {len(bars_5min)}")
await suite.disconnect()
asyncio.run(basic_data_usage())
Core Data Access Methods¶
Getting Bar Data¶
async def accessing_bar_data():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min", "5min"])
mnq_data = suite["MNQ"].data
# Get data for a specific timeframe
bars = await mnq_data.get_data("1min")
if bars is not None and not bars.is_empty():
print(f"Retrieved {len(bars)} bars")
# Access OHLCV data using Polars DataFrame
latest_bar = bars.tail(1)
print(f"Latest close: ${latest_bar['close'][0]:.2f}")
# Get data with specific count
recent_bars = await mnq_data.get_data("5min", count=20)
# Get data for time range
from datetime import datetime, timedelta
end_time = datetime.now()
start_time = end_time - timedelta(hours=2)
range_bars = await mnq_data.get_data(
timeframe="1min",
start_time=start_time,
end_time=end_time
)
await suite.disconnect()
asyncio.run(accessing_bar_data())
Current Price Methods¶
async def price_access():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
# Get current price (from latest tick or bar)
current_price = await mnq_data.get_current_price()
if current_price:
print(f"Current price: ${current_price:.2f}")
# Get latest price from specific timeframe
latest_price = await mnq_data.get_latest_price()
if latest_price:
print(f"Latest price: ${latest_price:.2f}")
# Get price range statistics
price_range = await mnq_data.get_price_range(
timeframe="1min",
bars=100 # Last 100 bars
)
if price_range:
print(f"High: ${price_range['high']:.2f}")
print(f"Low: ${price_range['low']:.2f}")
print(f"Range: ${price_range['range']:.2f}")
await suite.disconnect()
asyncio.run(price_access())
Volume Statistics¶
async def volume_stats():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min", "5min"])
mnq_data = suite["MNQ"].data
# Get volume statistics
vol_stats = await mnq_data.get_volume_stats(timeframe="1min")
if vol_stats:
print(f"Total volume: {vol_stats['total_volume']:,}")
print(f"Average volume: {vol_stats['avg_volume']:.0f}")
print(f"Volume trend: {vol_stats['volume_trend']}")
await suite.disconnect()
asyncio.run(volume_stats())
Memory Management¶
Memory Statistics and Control¶
async def memory_management():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min", "5min"])
mnq_data = suite["MNQ"].data
# Get memory statistics
memory_stats = await mnq_data.get_memory_stats()
print(f"Total bars in memory: {memory_stats.total_bars:,}")
print(f"Memory usage: {memory_stats.memory_usage_mb:.2f} MB")
print(f"Cache efficiency: {memory_stats.cache_efficiency:.1%}")
# Get resource statistics
resource_stats = await mnq_data.get_resource_stats()
print(f"CPU usage: {resource_stats['cpu_percent']:.1f}%")
print(f"Threads: {resource_stats['num_threads']}")
# Cleanup old data
await mnq_data.cleanup()
await suite.disconnect()
asyncio.run(memory_management())
MMap Overflow Support¶
The data manager includes memory-mapped file overflow support for handling large datasets:
async def overflow_configuration():
from project_x_py.realtime_data_manager.types import DataManagerConfig
# Configure with overflow enabled
config = DataManagerConfig(
enable_mmap_overflow=True,
overflow_threshold=0.8, # Overflow at 80% capacity
mmap_storage_path="/path/to/overflow/storage"
)
suite = await TradingSuite.create(
["MNQ"],
timeframes=["1min"],
data_manager_config=config
)
mnq_data = suite["MNQ"].data
# Monitor overflow statistics
overflow_stats = await mnq_data.get_overflow_stats("1min")
if overflow_stats:
print(f"Bars overflowed: {overflow_stats['total_overflowed_bars']}")
print(f"Disk usage: {overflow_stats['disk_storage_size_mb']:.2f} MB")
await suite.disconnect()
asyncio.run(overflow_configuration())
Performance Optimization¶
DataFrame Optimization¶
The data manager includes built-in DataFrame optimization:
async def dataframe_optimization():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
# Optimize data access patterns
optimization_result = await mnq_data.optimize_data_access_patterns()
print(f"Cache hits improved by: {optimization_result['cache_improvement']:.1%}")
print(f"Access time reduced by: {optimization_result['time_reduction']:.1%}")
await suite.disconnect()
asyncio.run(dataframe_optimization())
Lock Optimization¶
async def lock_optimization():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
# Get lock optimization statistics
lock_stats = await mnq_data.get_lock_optimization_stats()
print(f"Lock acquisitions: {lock_stats['total_acquisitions']}")
print(f"Average wait time: {lock_stats['avg_wait_time_ms']:.2f}ms")
print(f"Contention rate: {lock_stats['contention_rate']:.1%}")
await suite.disconnect()
asyncio.run(lock_optimization())
DST Handling¶
The data manager includes sophisticated Daylight Saving Time handling:
async def dst_handling():
from project_x_py.realtime_data_manager.types import DataManagerConfig
# Configure with DST awareness
config = DataManagerConfig(
session_type="RTH", # Regular Trading Hours
timezone="America/New_York"
)
suite = await TradingSuite.create(
["MNQ"],
timeframes=["1min"],
data_manager_config=config
)
# DST transitions are handled automatically
# The data manager will adjust bar timestamps and handle
# missing/duplicate hours during transitions
await suite.disconnect()
asyncio.run(dst_handling())
Statistics and Monitoring¶
Component Statistics¶
async def component_statistics():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min", "5min"])
mnq_data = suite["MNQ"].data
# Get comprehensive statistics
stats = await mnq_data.get_stats()
print(f"Component: {stats.component_type}")
print(f"Health score: {stats.health_score:.1f}/100")
print(f"Uptime: {stats.uptime_seconds}s")
# Performance metrics
for metric, value in stats.performance_metrics.items():
print(f"{metric}: {value}")
# Get bounded statistics (with size limits)
bounded_stats = await mnq_data.get_bounded_statistics()
if bounded_stats:
print(f"Recent operations: {bounded_stats['recent_operations']}")
print(f"Error rate: {bounded_stats['error_rate']:.2%}")
await suite.disconnect()
asyncio.run(component_statistics())
Health Monitoring¶
async def health_monitoring():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
# Get health score
health_score = await mnq_data.get_health_score()
print(f"Health score: {health_score:.1f}/100")
if health_score < 80:
print("Warning: Data manager health is degraded")
# Check specific issues
stats = await mnq_data.get_stats()
if stats.error_count > 0:
print(f"Errors detected: {stats.error_count}")
await suite.disconnect()
asyncio.run(health_monitoring())
Real-time Feed Management¶
Starting and Stopping Feeds¶
async def feed_management():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
# Start real-time feed
success = await mnq_data.start_realtime_feed()
if success:
print("Real-time feed started")
# Monitor feed for some time
await asyncio.sleep(60)
# Stop real-time feed
await mnq_data.stop_realtime_feed()
print("Real-time feed stopped")
await suite.disconnect()
asyncio.run(feed_management())
Data Validation¶
Built-in Validation¶
async def data_validation():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
# Data validation is performed automatically
# Check validation statistics in memory stats
memory_stats = await mnq_data.get_memory_stats()
# Look for validation indicators
if hasattr(memory_stats, 'validation_errors'):
print(f"Validation errors: {memory_stats.validation_errors}")
# Data readiness check
bars = await mnq_data.get_data("1min")
if bars is not None and len(bars) > 0:
print("Data is ready and validated")
await suite.disconnect()
asyncio.run(data_validation())
Dynamic Resource Limits¶
The data manager includes dynamic resource management:
async def dynamic_resources():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
# Resource limits adjust automatically based on:
# - Available system memory
# - CPU usage
# - Data volume
# - Number of active timeframes
# Monitor resource adaptation
resource_stats = await mnq_data.get_resource_stats()
print(f"Current memory limit: {resource_stats['memory_limit_mb']:.0f} MB")
print(f"Adjusted for load: {resource_stats['load_factor']:.2f}x")
await suite.disconnect()
asyncio.run(dynamic_resources())
Error Handling¶
Proper Error Handling Patterns¶
async def error_handling():
suite = await TradingSuite.create(["MNQ"], timeframes=["1min"])
mnq_data = suite["MNQ"].data
try:
# Always check for None returns
data = await mnq_data.get_data("1min")
if data is None:
print("No data available yet")
return
# Check for empty DataFrames
if data.is_empty():
print("Data frame is empty")
return
# Safe data access
if len(data) > 0:
latest_price = data.tail(1)["close"][0]
print(f"Latest price: ${latest_price:.2f}")
except Exception as e:
print(f"Error accessing data: {e}")
finally:
await suite.disconnect()
asyncio.run(error_handling())
Configuration Options¶
DataManagerConfig¶
from project_x_py.realtime_data_manager.types import DataManagerConfig
# Full configuration example
config = DataManagerConfig(
# Memory management
max_bars_per_timeframe=1000,
enable_mmap_overflow=True,
overflow_threshold=0.8,
mmap_storage_path="/path/to/storage",
# Performance
enable_caching=True,
cache_size=100,
optimization_interval=300,
# DST handling
session_type="RTH",
timezone="America/New_York",
# Resource limits
enable_dynamic_limits=True,
memory_threshold_percent=80.0,
cpu_threshold_percent=70.0,
# Validation
validate_data=True,
max_price_deviation=0.1, # 10% max deviation
# Cleanup
cleanup_interval_seconds=300,
retention_hours=24
)
Best Practices¶
Memory Efficiency¶
# ✅ Good: Get only needed data
recent_bars = await suite["MNQ"].data.get_data("1min", count=100)
# ❌ Avoid: Getting all data when not needed
all_bars = await suite["MNQ"].data.get_data("1min") # Gets everything
Null Checking¶
# ✅ Good: Always check for None
data = await suite["MNQ"].data.get_data("1min")
if data is not None and not data.is_empty():
# Process data
pass
# ❌ Bad: Assuming data exists
data = await suite["MNQ"].data.get_data("1min")
latest = data.tail(1) # May fail if data is None
Resource Cleanup¶
# ✅ Good: Always cleanup
try:
suite = await TradingSuite.create(["MNQ"])
# Use suite
finally:
await suite.disconnect()
# ✅ Better: Use context manager (if available)
async with await TradingSuite.create(["MNQ"]) as suite:
# Suite automatically cleaned up
pass
Performance Tips¶
- Use appropriate timeframes - Don't subscribe to more timeframes than needed
- Enable caching - For frequently accessed data
- Configure overflow - For long-running sessions with lots of data
- Monitor health - Check health scores regularly
- Cleanup regularly - Use automatic cleanup for long sessions
See Also¶
- Trading Suite API - Main trading interface
- Real-time Guide - Real-time data concepts
- Examples - Complete working examples