Real-time Data Processing Examples¶
This page demonstrates how to work with real-time market data streams using the ProjectX Python SDK v3.5.7. Learn to handle WebSocket data, process multiple timeframes, and build real-time trading systems with the enhanced event system.
Prerequisites¶
- ProjectX API credentials with real-time data access
- Active market during testing (futures trading hours)
- Understanding of async/await patterns
- Basic knowledge of market data structures
1. Basic Real-time Data Streaming¶
Start with simple real-time data consumption:
#!/usr/bin/env python
"""
Basic real-time data streaming example.
This example demonstrates:
- Connecting to real-time data feeds
- Handling tick (quote) updates
- Processing new bar events
- Monitoring connection health
- Displaying streaming statistics
"""
import asyncio
from datetime import datetime
from project_x_py import EventType, TradingSuite
from project_x_py.event_bus import Event
async def main():
"""Main function to run real-time data streaming."""
# Create suite with real-time capabilities
suite = await TradingSuite.create(
["MNQ"],
timeframes=["15sec", "1min"],
initial_days=1, # Minimal historical data
)
mnq_context = suite["MNQ"]
print(f"Real-time streaming started for {mnq_context.symbol}")
print(f"Connected: {suite.is_connected}")
# Track statistics
tick_count = 0
bar_count = 0
last_price = None
async def on_tick(event: Event):
"""Handle tick updates."""
nonlocal tick_count, last_price
tick_data = event.data
tick_count += 1
last_price = tick_data.get("last") or last_price
# Display every 10th tick to avoid spam
if tick_count % 10 == 0:
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] Tick #{tick_count}: ${last_price:.2f}")
async def on_new_bar(event: Event):
"""Handle new bar events."""
nonlocal bar_count
bar_count += 1
timestamp = datetime.now().strftime("%H:%M:%S")
# The event.data contains timeframe and nested data
event_data = event.data
timeframe = event_data.get("timeframe", "unknown")
# Get the bar data directly from the event
bar_data = event_data.get("data", {})
if bar_data:
print(f"[{timestamp}] New {timeframe} bar #{bar_count}:")
# Access the bar data fields directly
open_price = bar_data.get("open", 0)
high_price = bar_data.get("high", 0)
low_price = bar_data.get("low", 0)
close_price = bar_data.get("close", 0)
volume = bar_data.get("volume", 0)
bar_timestamp = bar_data.get("timestamp", "")
print(
f" OHLC: ${open_price:.2f} / ${high_price:.2f} / "
f"${low_price:.2f} / ${close_price:.2f}"
)
print(f" Volume: {volume}")
print(f" Timestamp: {bar_timestamp}")
async def on_connection_status(event: Event):
"""Handle connection status changes."""
status = event.data.get("connected", False)
print(f"Connection Status Changed: {status}")
if status:
print("✅ Real-time feed connected")
else:
print("❌ Real-time feed disconnected")
# Register event handlers
await mnq_context.on(EventType.QUOTE_UPDATE, on_tick)
await mnq_context.on(EventType.NEW_BAR, on_new_bar)
await mnq_context.on(EventType.CONNECTED, on_connection_status)
await mnq_context.on(EventType.DISCONNECTED, on_connection_status)
print("Listening for real-time data... Press Ctrl+C to exit")
try:
while True:
await asyncio.sleep(10)
# Display periodic status
current_price = await mnq_context.data.get_current_price()
connection_health = await mnq_context.data.get_health_score()
print(
f"Status - Price: ${current_price:.2f} | "
f"Ticks: {tick_count} | Bars: {bar_count} | "
f"Health: {connection_health}"
)
except KeyboardInterrupt:
print("\nShutting down real-time stream...")
finally:
# Ensure proper cleanup
await suite.disconnect()
print("Disconnected from real-time feeds")
if __name__ == "__main__":
asyncio.run(main())
2. Multi-Timeframe Data Synchronization¶
Handle multiple timeframes with proper synchronization:
#!/usr/bin/env python
"""
Multi-timeframe real-time data synchronization
"""
import asyncio
from collections import defaultdict
from datetime import datetime
from project_x_py import EventType, TradingSuite
from project_x_py.indicators import RSI, SMA
class MultiTimeframeDataProcessor:
def __init__(self, suite: TradingSuite):
self.suite = suite
self.timeframes = ["1min", "5min", "15min"]
self.data_cache = defaultdict(list)
self.last_analysis = defaultdict(dict)
self.analysis_count = 0
async def process_new_bar(self, event):
"""Process incoming bar data for all timeframes."""
bar_data = event.data.get("data", event.data)
timeframe = event.data.get("timeframe", "unknown")
if timeframe not in self.timeframes:
return
# Store the bar
self.data_cache[timeframe].append(bar_data)
# Keep only recent bars (memory management)
if len(self.data_cache[timeframe]) > 200:
self.data_cache[timeframe] = self.data_cache[timeframe][-100:]
print(
f"New {timeframe} bar: ${bar_data['close']:.2f} @ {bar_data.get('timestamp')}"
)
# Perform analysis on this timeframe
await self.analyze_timeframe(timeframe)
# Check for multi-timeframe confluence
if timeframe == "1min": # Trigger confluence check on fastest timeframe
await self.check_confluence()
async def analyze_timeframe(self, timeframe: str):
"""Analyze a specific timeframe with technical indicators."""
try:
# Get fresh data from suite
bars = await self.suite["MNQ"].data.get_data(timeframe)
if bars is None:
return
if len(bars) < 50: # Need enough data for indicators
return
# Calculate indicators
bars = bars.pipe(SMA, period=20).pipe(RSI, period=14)
current_price = bars["close"][-1]
current_sma = bars["sma_20"][-1]
current_rsi = bars["rsi_14"][-1]
# Determine trend and momentum
trend = "bullish" if current_price > current_sma else "bearish"
momentum = (
"strong"
if (trend == "bullish" and current_rsi > 50)
or (trend == "bearish" and current_rsi < 50)
else "weak"
)
# Store analysis
self.last_analysis[timeframe] = {
"price": current_price,
"sma_20": current_sma,
"rsi": current_rsi,
"trend": trend,
"momentum": momentum,
"timestamp": datetime.now(),
}
print(
f" {timeframe} Analysis - Trend: {trend}, RSI: {current_rsi:.1f}, Momentum: {momentum}"
)
except Exception as e:
print(f"Error analyzing {timeframe}: {e}")
async def check_confluence(self):
"""Check for confluence across all timeframes."""
self.analysis_count += 1
# Only check confluence every 5th analysis to avoid spam
if self.analysis_count % 5 != 0:
return
if len(self.last_analysis) < len(self.timeframes):
return
# Count bullish/bearish signals
bullish_count = sum(
1
for analysis in self.last_analysis.values()
if analysis.get("trend") == "bullish"
)
bearish_count = sum(
1
for analysis in self.last_analysis.values()
if analysis.get("trend") == "bearish"
)
# Check for strong confluence
total_timeframes = len(self.last_analysis)
if bullish_count >= total_timeframes * 0.8: # 80% agreement
print(
f"\n= BULLISH CONFLUENCE DETECTED ({bullish_count}/{total_timeframes})"
)
await self.display_confluence_analysis("BULLISH")
elif bearish_count >= total_timeframes * 0.8:
print(
f"\n=4 BEARISH CONFLUENCE DETECTED ({bearish_count}/{total_timeframes})"
)
await self.display_confluence_analysis("BEARISH")
async def display_confluence_analysis(self, signal_type: str):
"""Display detailed confluence analysis."""
print(f"{signal_type} CONFLUENCE ANALYSIS:")
print("-" * 40)
for tf, analysis in self.last_analysis.items():
trend_emoji = "=" if analysis["trend"] == "bullish" else "="
momentum_emoji = "=" if analysis["momentum"] == "strong" else "="
print(
f" {tf:>5} {trend_emoji} {analysis['trend']:>8} | RSI: {analysis['rsi']:>5.1f} | {momentum_emoji} {analysis['momentum']}"
)
print("-" * 40)
# Get current market data
current_price = await self.suite["MNQ"].data.get_current_price()
print(f"Current Price: ${current_price:.2f}")
print()
async def main():
# Create suite with multiple timeframes
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min", "5min", "15min"],
initial_days=3, # Enough data for indicators
)
processor = MultiTimeframeDataProcessor(suite)
# Register event handler
await suite.on(EventType.NEW_BAR, processor.process_new_bar)
print("Multi-Timeframe Data Processor Active")
print("Monitoring 1min, 5min, and 15min timeframes...")
print("Press Ctrl+C to exit")
try:
while True:
await asyncio.sleep(15)
# Display periodic status
print(f"\nStatus Update - {datetime.now().strftime('%H:%M:%S')}")
for tf in processor.timeframes:
cached_bars = len(processor.data_cache[tf])
analysis = processor.last_analysis.get(tf, {})
trend = analysis.get("trend", "unknown")
rsi = analysis.get("rsi", 0)
print(
f" {tf}: {cached_bars} bars cached, {trend} trend, RSI: {rsi:.1f}"
)
except KeyboardInterrupt:
print("\nShutting down multi-timeframe processor...")
if __name__ == "__main__":
asyncio.run(main())
3. Real-time Data Export and Visualization¶
Export real-time data and create visualizations:
#!/usr/bin/env python
"""
Real-time data export with CSV logging and Plotly visualization
"""
import asyncio
import csv
import json
from datetime import datetime, timedelta
from pathlib import Path
from project_x_py import EventType, TradingSuite
class RealTimeDataExporter:
def __init__(self, suite: TradingSuite, export_dir: str = "data_exports"):
self.suite = suite
self.export_dir = Path(export_dir)
self.export_dir.mkdir(exist_ok=True)
# Data storage
self.tick_data = []
self.bar_data = []
self.trade_data = []
# File handles
self.csv_files = {}
self.export_interval = 60 # Export every 60 seconds
async def initialize_export_files(self):
"""Initialize CSV files for data export."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Bar data CSV
bar_file = self.export_dir / f"bars_{timestamp}.csv"
bar_csv = open(bar_file, "w", newline="")
bar_writer = csv.writer(bar_csv)
bar_writer.writerow(
["timestamp", "timeframe", "open", "high", "low", "close", "volume"]
)
self.csv_files["bars"] = {"file": bar_csv, "writer": bar_writer}
print(f"Export files initialized in {self.export_dir}")
async def process_bar(self, event):
"""Process and export bar data."""
timestamp = datetime.now().isoformat()
# Get the real data for the timeframe
# Data from the event is from the new bar that was just started, so we need to get the previous bar
real_data = await self.suite["MNQ"].data.get_data(
event.data.get("timeframe", "unknown")
)
if real_data is None:
return
# Store in memory
bar_record = {
"timestamp": timestamp,
"bar_timestamp": real_data["timestamp"][-2],
"timeframe": event.data.get("timeframe", "unknown"),
"open": real_data["open"][-2],
"high": real_data["high"][-2],
"low": real_data["low"][-2],
"close": real_data["close"][-2],
"volume": real_data["volume"][-2],
}
self.bar_data.append(bar_record)
# Write to CSV
if "bars" in self.csv_files:
writer = self.csv_files["bars"]["writer"]
writer.writerow(
[
bar_record["bar_timestamp"] or timestamp,
bar_record["timeframe"],
bar_record["open"],
bar_record["high"],
bar_record["low"],
bar_record["close"],
bar_record["volume"],
]
)
self.csv_files["bars"]["file"].flush()
print(f"Exported {bar_record['timeframe']} bar: ${bar_record['close']:.2f}")
async def export_json_snapshot(self):
"""Export current data snapshot as JSON."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot = {
"export_timestamp": datetime.now().isoformat(),
"data_summary": {
"bar_count": len(self.bar_data),
},
"recent_data": {
"bars": self.bar_data[-5:], # Last 5 bars
},
}
json_file = self.export_dir / f"snapshot_{timestamp}.json"
with open(json_file, "w") as f:
json.dump(snapshot, f, indent=2)
print(f"JSON snapshot exported: {json_file}")
return json_file
def close_files(self):
"""Close all open CSV files."""
for file_info in self.csv_files.values():
file_info["file"].close()
print("Export files closed")
async def main():
# Create suite for data export
suite = await TradingSuite.create(
"MNQ", timeframes=["15sec", "1min", "5min"], initial_days=1
)
mnq_context = suite["MNQ"]
exporter = RealTimeDataExporter(suite)
await exporter.initialize_export_files()
# Event handlers
await suite.on(EventType.NEW_BAR, exporter.process_bar)
print("Real-time Data Exporter Active")
print(f"Exporting to: {exporter.export_dir}")
print("Streaming data...")
try:
export_timer = 0
while True:
await asyncio.sleep(10)
export_timer += 10
# Periodic status
current_price = await mnq_context.data.get_current_price()
if current_price is None:
continue
print(f"Price: ${current_price:.2f} | Bars: {len(exporter.bar_data)}")
# Auto-export JSON snapshot every 5 minutes
if export_timer >= 300: # 5 minutes
await exporter.export_json_snapshot()
export_timer = 0
except KeyboardInterrupt:
print("\nShutting down data exporter...")
# Final exports
print("Creating final exports...")
await exporter.export_json_snapshot()
# Close files
exporter.close_files()
print("Data export complete!")
if __name__ == "__main__":
asyncio.run(main())
Key Real-time Data Concepts¶
WebSocket Connection Management¶
- Automatic Reconnection: SDK handles connection drops automatically
- Heartbeat Monitoring: Built-in connection health checks
- Circuit Breaker: Prevents cascading failures during connectivity issues
- Backpressure Handling: Manages high-frequency data flows efficiently
Data Processing Patterns¶
- Event-Driven Architecture: Use events for decoupled real-time processing
- Buffering: Store recent data for analysis and comparison
- Memory Management: Implement sliding windows to prevent memory leaks
- Synchronization: Handle multiple timeframes with proper timing
Performance Optimization¶
- Async Processing: All data handling is fully asynchronous
- Batch Operations: Group related operations for better performance
- Caching: Cache frequently accessed data and calculations
- Resource Limits: Set appropriate limits for data storage
Common Patterns¶
Data Storage¶
# Use deque for efficient FIFO operations
from collections import deque
tick_buffer = deque(maxlen=1000)
# Use defaultdict for organized multi-timeframe data
from collections import defaultdict
timeframe_data = defaultdict(list)
Event Handling¶
# Always use async event handlers
async def handle_event(event):
try:
# Process event data
data = event.data
# Your processing logic here
except Exception as e:
logger.error(f"Event handling error: {e}")
# Register handlers properly
await suite.on(EventType.NEW_BAR, handle_event)
Error Recovery¶
# Implement proper error handling
async def robust_data_processor(event):
try:
await process_data(event.data)
except ConnectionError:
logger.warning("Connection issue, retrying...")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Processing error: {e}")
# Continue processing other events
Next Steps¶
For building production real-time systems:
- Implement Backtesting: Test strategies on historical data first
- Add Monitoring: Comprehensive logging and alerting
- Scale Architecture: Handle multiple instruments and strategies
- Optimize Performance: Profile and optimize critical paths
- Deploy Infrastructure: Use proper deployment and monitoring tools
See also: - Advanced Trading Examples for complex strategies - Backtesting Examples for historical testing - Basic Usage Examples for fundamentals