OrderBook Guide¶
This guide covers comprehensive Level 2 market depth analysis using ProjectX Python SDK v3.3.4+. The OrderBook component provides real-time market microstructure analysis with advanced features including spoofing detection, iceberg identification, and volume profile analysis.
Overview¶
The OrderBook provides complete Level 2 market depth analysis with real-time updates via WebSocket. It includes sophisticated market microstructure analysis tools designed for institutional-level trading and market making applications.
Key Features¶
- Level 2 Market Depth: Real-time bid/ask levels with volume
- Market Microstructure Analysis: Spread analysis, depth imbalance, and liquidity metrics
- Spoofing Detection: 6 different spoofing patterns with confidence scoring
- Iceberg Detection: Identify hidden order algorithms
- Volume Profile: Price-volume distribution analysis
- Trade Flow Analysis: Real-time trade classification and flow
- Memory Management: Efficient sliding window data management
Getting Started¶
Basic Setup¶
import asyncio
from project_x_py import TradingSuite
async def basic_orderbook_setup():
# Enable OrderBook feature
suite = await TradingSuite.create(
"MNQ",
features=["orderbook"], # Enable Level 2 data
initial_days=1
)
# OrderBook is automatically initialized and connected
orderbook = suite.orderbook
print("OrderBook connected and receiving Level 2 data")
# Get current best bid/ask
best_prices = await orderbook.get_best_bid_ask()
print(f"Best Bid: ${best_prices['bid']:.2f}")
print(f"Best Ask: ${best_prices['ask']:.2f}")
print(f"Spread: ${best_prices['spread']:.2f}")
OrderBook Data Structure¶
The OrderBook maintains real-time Level 2 data with the following structure:
async def examine_orderbook_structure():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Get bid levels (sorted by price descending)
bids = await orderbook.get_orderbook_bids(levels=10)
print("Bid Levels (highest first):")
print(bids.head())
# Get ask levels (sorted by price ascending)
asks = await orderbook.get_orderbook_asks(levels=10)
print("Ask Levels (lowest first):")
print(asks.head())
# Full orderbook snapshot
full_book = await orderbook.get_full_orderbook()
print(f"Full book - Bid levels: {len(full_book['bids'])}, Ask levels: {len(full_book['asks'])}")
Market Depth Analysis¶
Bid/Ask Spread Analysis¶
async def spread_analysis():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Real-time spread monitoring
spread_history = []
async def monitor_spread():
"""Monitor spread changes in real-time."""
for _ in range(20): # Monitor for 20 updates
best_prices = await orderbook.get_best_bid_ask()
if best_prices['bid'] and best_prices['ask']:
spread = best_prices['spread']
mid_price = (best_prices['bid'] + best_prices['ask']) / 2
spread_bps = (spread / mid_price) * 10000 # Basis points
spread_history.append({
'timestamp': datetime.now(),
'spread': spread,
'spread_bps': spread_bps,
'bid': best_prices['bid'],
'ask': best_prices['ask']
})
print(f"Spread: ${spread:.2f} ({spread_bps:.1f} bps) | Bid: ${best_prices['bid']:.2f} Ask: ${best_prices['ask']:.2f}")
# Spread alerts
if spread > 5.0: # Wide spread for MNQ
print(" Wide spread detected!")
elif spread < 0.5: # Tight spread
print("=% Tight spread - high liquidity")
await asyncio.sleep(5) # Check every 5 seconds
await monitor_spread()
# Spread analysis
if spread_history:
avg_spread = sum(s['spread'] for s in spread_history) / len(spread_history)
max_spread = max(s['spread'] for s in spread_history)
min_spread = min(s['spread'] for s in spread_history)
print(f"\nSpread Analysis:")
print(f"Average: ${avg_spread:.2f}")
print(f"Range: ${min_spread:.2f} - ${max_spread:.2f}")
print(f"Volatility: ${max_spread - min_spread:.2f}")
Market Depth and Liquidity¶
async def depth_analysis():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Analyze market depth at multiple levels
levels_to_analyze = [1, 5, 10, 20]
for level_count in levels_to_analyze:
bids = await orderbook.get_orderbook_bids(levels=level_count)
asks = await orderbook.get_orderbook_asks(levels=level_count)
# Calculate cumulative volume
bid_volume = bids['volume'].sum() if not bids.is_empty() else 0
ask_volume = asks['volume'].sum() if not asks.is_empty() else 0
total_volume = bid_volume + ask_volume
# Volume imbalance
if total_volume > 0:
bid_ratio = bid_volume / total_volume
imbalance = bid_ratio - 0.5 # -0.5 to +0.5 scale
print(f"Depth at {level_count} levels:")
print(f" Bid Volume: {bid_volume:,}")
print(f" Ask Volume: {ask_volume:,}")
print(f" Imbalance: {imbalance:+.3f} ({'Bid Heavy' if imbalance > 0 else 'Ask Heavy' if imbalance < 0 else 'Balanced'})")
# Large imbalance alerts
if abs(imbalance) > 0.3:
direction = "Bullish" if imbalance > 0 else "Bearish"
print(f" = Strong {direction} Imbalance!")
print()
# Price impact analysis
await analyze_price_impact(orderbook)
async def analyze_price_impact(orderbook):
"""Analyze price impact for different order sizes."""
test_sizes = [1, 5, 10, 25, 50] # Contract sizes to test
print("Price Impact Analysis:")
for size in test_sizes:
# Calculate impact for buying
buy_impact = await orderbook.calculate_price_impact(size, 'buy')
# Calculate impact for selling
sell_impact = await orderbook.calculate_price_impact(size, 'sell')
print(f" {size} contracts:")
print(f" Buy impact: ${buy_impact:.2f}")
print(f" Sell impact: ${sell_impact:.2f}")
# Large impact warning
if buy_impact > 10 or sell_impact > 10: # $10+ impact
print(f" High price impact for {size} contracts!")
Order Flow and Trade Analysis¶
from project_x_py import EventType
async def order_flow_analysis():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Trade flow tracking
trade_flow = {
'buy_volume': 0,
'sell_volume': 0,
'trade_count': 0,
'large_trades': []
}
async def on_trade(event):
"""Analyze each trade for flow patterns."""
trade_data = event.data
price = trade_data['price']
size = trade_data['size']
side = trade_data.get('side', 'unknown') # 'buy' or 'sell'
trade_flow['trade_count'] += 1
# Classify trade direction if not provided
if side == 'unknown':
best_prices = await orderbook.get_best_bid_ask()
if best_prices['bid'] and best_prices['ask']:
mid_price = (best_prices['bid'] + best_prices['ask']) / 2
side = 'buy' if price >= mid_price else 'sell'
# Update flow metrics
if side == 'buy':
trade_flow['buy_volume'] += size
elif side == 'sell':
trade_flow['sell_volume'] += size
# Track large trades
if size >= 50: # Large trade threshold
trade_flow['large_trades'].append({
'timestamp': trade_data.get('timestamp', datetime.now()),
'price': price,
'size': size,
'side': side
})
print(f"=% Large {side} trade: {size} @ ${price:.2f}")
# Periodic flow summary
if trade_flow['trade_count'] % 20 == 0:
await print_flow_summary(trade_flow)
async def print_flow_summary(flow):
"""Print current order flow summary."""
total_volume = flow['buy_volume'] + flow['sell_volume']
if total_volume > 0:
buy_ratio = flow['buy_volume'] / total_volume
flow_imbalance = buy_ratio - 0.5
print(f"\n= Order Flow Summary (last {flow['trade_count']} trades):")
print(f" Buy Volume: {flow['buy_volume']:,} ({buy_ratio:.1%})")
print(f" Sell Volume: {flow['sell_volume']:,} ({1-buy_ratio:.1%})")
print(f" Flow Bias: {flow_imbalance:+.3f} ({'Bullish' if flow_imbalance > 0 else 'Bearish' if flow_imbalance < 0 else 'Neutral'})")
print(f" Large Trades: {len(flow['large_trades'])}")
# Register for trade events
await suite.on(EventType.TRADE_TICK, on_trade)
# Monitor for 2 minutes
await asyncio.sleep(120)
# Final flow analysis
if trade_flow['large_trades']:
print(f"\n=
Large Trade Analysis:")
# Analyze large trade timing
large_trades = trade_flow['large_trades'][-10:] # Last 10 large trades
buy_large = sum(t['size'] for t in large_trades if t['side'] == 'buy')
sell_large = sum(t['size'] for t in large_trades if t['side'] == 'sell')
if buy_large > sell_large * 1.5:
print(" = Large buyer dominance")
elif sell_large > buy_large * 1.5:
print(" = Large seller dominance")
else:
print(" Balanced large trade flow")
Advanced Market Microstructure¶
Market Imbalance Detection¶
async def market_imbalance_analysis():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Track imbalance over time
imbalance_history = []
async def monitor_imbalance():
"""Monitor order book imbalance patterns."""
for _ in range(30): # 30 measurements
imbalance = await orderbook.get_market_imbalance()
timestamp = datetime.now()
imbalance_history.append({
'timestamp': timestamp,
'imbalance': imbalance,
'level': classify_imbalance(imbalance)
})
print(f"Market Imbalance: {imbalance:+.3f} ({classify_imbalance(imbalance)})")
# Extreme imbalance alerts
if abs(imbalance) > 0.7:
direction = "Bullish" if imbalance > 0 else "Bearish"
print(f"= EXTREME {direction.upper()} IMBALANCE!")
await asyncio.sleep(10) # Every 10 seconds
def classify_imbalance(imbalance):
"""Classify imbalance level."""
if imbalance > 0.5:
return "Strong Bid"
elif imbalance > 0.2:
return "Moderate Bid"
elif imbalance > -0.2:
return "Balanced"
elif imbalance > -0.5:
return "Moderate Ask"
else:
return "Strong Ask"
await monitor_imbalance()
# Imbalance trend analysis
if len(imbalance_history) >= 10:
recent_avg = sum(h['imbalance'] for h in imbalance_history[-10:]) / 10
earlier_avg = sum(h['imbalance'] for h in imbalance_history[-20:-10]) / 10
trend = recent_avg - earlier_avg
print(f"\nImbalance Trend Analysis:")
print(f"Recent Average: {recent_avg:+.3f}")
print(f"Earlier Average: {earlier_avg:+.3f}")
print(f"Trend: {trend:+.3f} ({'Increasing Bid Pressure' if trend > 0 else 'Increasing Ask Pressure' if trend < 0 else 'Stable'})")
Liquidity Analysis¶
async def liquidity_analysis():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Analyze liquidity at different price levels
liquidity_metrics = await orderbook.get_liquidity_metrics()
print("= Liquidity Analysis:")
print(f" Total Bid Liquidity: {liquidity_metrics['total_bid_volume']:,}")
print(f" Total Ask Liquidity: {liquidity_metrics['total_ask_volume']:,}")
print(f" Weighted Mid Price: ${liquidity_metrics['weighted_mid']:.2f}")
print(f" Liquidity Ratio: {liquidity_metrics['liquidity_ratio']:.3f}")
# Liquidity depth analysis
price_levels = [1, 2, 5, 10, 25] # Price points away from mid
best_prices = await orderbook.get_best_bid_ask()
if best_prices['bid'] and best_prices['ask']:
mid_price = (best_prices['bid'] + best_prices['ask']) / 2
print(f"\nLiquidity at Price Levels (from mid ${mid_price:.2f}):")
for level in price_levels:
# Analyze liquidity $X away from mid
upper_price = mid_price + level
lower_price = mid_price - level
# Get volume within range
bids = await orderbook.get_orderbook_bids()
asks = await orderbook.get_orderbook_asks()
# Filter for price range
bids_in_range = bids.filter(pl.col('price') >= lower_price)
asks_in_range = asks.filter(pl.col('price') <= upper_price)
bid_liquidity = bids_in_range['volume'].sum() if not bids_in_range.is_empty() else 0
ask_liquidity = asks_in_range['volume'].sum() if not asks_in_range.is_empty() else 0
total_liquidity = bid_liquidity + ask_liquidity
print(f" ${level}: {total_liquidity:,} contracts ({bid_liquidity:,} bids, {ask_liquidity:,} asks)")
# Liquidity concentration analysis
await analyze_liquidity_concentration(orderbook)
async def analyze_liquidity_concentration(orderbook):
"""Analyze where liquidity is concentrated."""
bids = await orderbook.get_orderbook_bids(levels=20)
asks = await orderbook.get_orderbook_asks(levels=20)
if not bids.is_empty() and not asks.is_empty():
# Calculate cumulative volume
bids_with_cumsum = bids.with_columns(
pl.col('volume').cumsum().alias('cumulative_volume')
)
asks_with_cumsum = asks.with_columns(
pl.col('volume').cumsum().alias('cumulative_volume')
)
total_bid_volume = bids['volume'].sum()
total_ask_volume = asks['volume'].sum()
# Find levels containing 50% and 80% of volume
def find_concentration_level(df, total_volume, percentage):
target_volume = total_volume * percentage
level = df.filter(pl.col('cumulative_volume') >= target_volume)
return len(df) - len(level) + 1 if not level.is_empty() else len(df)
bid_50_level = find_concentration_level(bids_with_cumsum, total_bid_volume, 0.5)
bid_80_level = find_concentration_level(bids_with_cumsum, total_bid_volume, 0.8)
ask_50_level = find_concentration_level(asks_with_cumsum, total_ask_volume, 0.5)
ask_80_level = find_concentration_level(asks_with_cumsum, total_ask_volume, 0.8)
print(f"\n< Liquidity Concentration:")
print(f" Bid side - 50% in top {bid_50_level} levels, 80% in top {bid_80_level} levels")
print(f" Ask side - 50% in top {ask_50_level} levels, 80% in top {ask_80_level} levels")
# Concentration analysis
if bid_50_level <= 3 or ask_50_level <= 3:
print(" = High liquidity concentration - market depth may be thin")
elif bid_50_level >= 8 or ask_50_level >= 8:
print(" <
Well-distributed liquidity - good market depth")
Spoofing Detection¶
The OrderBook includes sophisticated spoofing detection with 6 different pattern types:
async def spoofing_detection():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Enable spoofing detection with custom parameters
await orderbook.enable_spoofing_detection(
min_order_size=20, # Minimum size to consider
time_threshold=30, # Max time for pattern (seconds)
confidence_threshold=0.7 # Minimum confidence to alert
)
# Spoofing event handler
async def on_spoofing_detected(event):
"""Handle spoofing detection events."""
spoof_data = event.data
pattern_type = spoof_data['pattern_type']
confidence = spoof_data['confidence']
side = spoof_data['side'] # 'bid' or 'ask'
price_level = spoof_data['price_level']
size = spoof_data['size']
print(f"= SPOOFING DETECTED:")
print(f" Pattern: {pattern_type}")
print(f" Confidence: {confidence:.2f}")
print(f" Side: {side}")
print(f" Level: ${price_level:.2f} x {size}")
print(f" Time: {datetime.now()}")
# High confidence alerts
if confidence >= 0.85:
print(f" =% HIGH CONFIDENCE SPOOFING!")
# Register spoofing event handler
await suite.on(EventType.SPOOFING_DETECTED, on_spoofing_detected)
# Manual spoofing analysis
spoofing_analysis = await orderbook.get_spoofing_analysis()
print("= Current Spoofing Analysis:")
print(f" Patterns detected (last hour): {spoofing_analysis['patterns_detected']}")
print(f" Average confidence: {spoofing_analysis['avg_confidence']:.2f}")
print(f" Most common pattern: {spoofing_analysis['most_common_pattern']}")
# Pattern breakdown
for pattern_type, count in spoofing_analysis['pattern_counts'].items():
if count > 0:
print(f" {pattern_type}: {count} occurrences")
# Monitor for spoofing
print("\n=
Monitoring for spoofing patterns...")
await asyncio.sleep(300) # Monitor for 5 minutes
# Get updated analysis
final_analysis = await orderbook.get_spoofing_analysis()
new_patterns = final_analysis['patterns_detected'] - spoofing_analysis['patterns_detected']
print(f"\n= Spoofing Summary:")
print(f" New patterns detected: {new_patterns}")
if new_patterns > 0:
print(f" Detection rate: {new_patterns / 5:.1f} patterns per minute")
# Spoofing pattern types available:
spoofing_patterns = {
'layering': 'Multiple large orders at same level, cancelled before execution',
'spoofing': 'Large order opposite to intended direction, cancelled after market moves',
'quote_stuffing': 'Rapid placement and cancellation of orders',
'momentum_ignition': 'Large orders to trigger momentum, then trade opposite',
'liquidity_detection': 'Small orders to detect hidden liquidity',
'pinging': 'Rapid small orders testing price levels'
}
Iceberg Detection¶
async def iceberg_detection():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Enable iceberg detection
detected_icebergs = await orderbook.detect_iceberg_orders(
min_total_size=100, # Minimum total iceberg size
min_visible_ratio=0.2, # Maximum visible portion
time_window=300 # 5 minute window
)
print(f"> Iceberg Orders Detected: {len(detected_icebergs)}")
for iceberg in detected_icebergs:
side = iceberg['side']
price = iceberg['price']
estimated_size = iceberg['estimated_total_size']
visible_size = iceberg['visible_size']
confidence = iceberg['confidence']
print(f"\n< Iceberg Order:")
print(f" Side: {side}")
print(f" Price: ${price:.2f}")
print(f" Estimated Total: {estimated_size:,}")
print(f" Visible: {visible_size:,}")
print(f" Confidence: {confidence:.2f}")
# High confidence icebergs
if confidence >= 0.8:
print(f" = High confidence iceberg - large hidden size!")
# Real-time iceberg monitoring
iceberg_tracker = {
'detected_count': 0,
'total_hidden_size': 0
}
async def on_iceberg_detected(event):
"""Handle real-time iceberg detection."""
iceberg_data = event.data
iceberg_tracker['detected_count'] += 1
iceberg_tracker['total_hidden_size'] += iceberg_data.get('estimated_hidden_size', 0)
print(f"> New iceberg detected at ${iceberg_data['price']:.2f}")
print(f" Estimated hidden: {iceberg_data.get('estimated_hidden_size', 0):,}")
await suite.on(EventType.ICEBERG_DETECTED, on_iceberg_detected)
# Monitor for new icebergs
print("\n=
Monitoring for iceberg orders...")
await asyncio.sleep(180) # 3 minutes
print(f"\n= Iceberg Monitoring Summary:")
print(f" New icebergs detected: {iceberg_tracker['detected_count']}")
print(f" Total hidden size estimate: {iceberg_tracker['total_hidden_size']:,}")
Volume Profile Analysis¶
async def volume_profile_analysis():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Get volume profile for current session
volume_profile = await orderbook.get_volume_profile(
time_window=3600, # 1 hour window
price_buckets=50 # 50 price levels
)
print("= Volume Profile Analysis:")
# Find high volume nodes (HVN) and low volume nodes (LVN)
max_volume = volume_profile['volume'].max()
hvn_threshold = max_volume * 0.7 # 70% of max volume
lvn_threshold = max_volume * 0.2 # 20% of max volume
hvn_levels = volume_profile.filter(pl.col('volume') >= hvn_threshold)
lvn_levels = volume_profile.filter(pl.col('volume') <= lvn_threshold)
print(f"\n< High Volume Nodes (HVN):")
for row in hvn_levels.iter_rows(named=True):
print(f" ${row['price_level']:.2f}: {row['volume']:,} contracts")
print(f"\n=s Low Volume Nodes (LVN) - Potential breakout levels:")
for row in lvn_levels.head(5).iter_rows(named=True):
print(f" ${row['price_level']:.2f}: {row['volume']:,} contracts")
# Point of Control (POC) - highest volume price
poc_row = volume_profile.sort('volume', descending=True).head(1)
if not poc_row.is_empty():
poc_price = poc_row['price_level'][0]
poc_volume = poc_row['volume'][0]
print(f"\n=Q Point of Control (POC): ${poc_price:.2f} ({poc_volume:,} contracts)")
# Value Area calculation (70% of volume)
total_volume = volume_profile['volume'].sum()
target_volume = total_volume * 0.7
# Find value area range
sorted_profile = volume_profile.sort('volume', descending=True)
cumulative_volume = 0
value_area_prices = []
for row in sorted_profile.iter_rows(named=True):
cumulative_volume += row['volume']
value_area_prices.append(row['price_level'])
if cumulative_volume >= target_volume:
break
if value_area_prices:
value_area_high = max(value_area_prices)
value_area_low = min(value_area_prices)
print(f"\n= Value Area (70% of volume):")
print(f" High: ${value_area_high:.2f}")
print(f" Low: ${value_area_low:.2f}")
print(f" Range: ${value_area_high - value_area_low:.2f}")
# Current price relative to value area
current_price = (await orderbook.get_best_bid_ask())
if current_price['bid'] and current_price['ask']:
mid_price = (current_price['bid'] + current_price['ask']) / 2
if mid_price > value_area_high:
print(f" = Price above value area - potential resistance")
elif mid_price < value_area_low:
print(f" = Price below value area - potential support")
else:
print(f" Price within value area")
# Volume distribution analysis
await analyze_volume_distribution(volume_profile)
async def analyze_volume_distribution(volume_profile):
"""Analyze the distribution characteristics of volume."""
# Calculate statistics
volumes = volume_profile['volume'].to_list()
prices = volume_profile['price_level'].to_list()
if volumes and prices:
# Weighted average price by volume
total_volume = sum(volumes)
vwap = sum(p * v for p, v in zip(prices, volumes)) / total_volume
# Volume distribution metrics
max_volume = max(volumes)
min_volume = min(volumes)
avg_volume = sum(volumes) / len(volumes)
# Concentration ratio (top 20% of levels contain what % of volume?)
sorted_volumes = sorted(volumes, reverse=True)
top_20_count = max(1, len(sorted_volumes) // 5)
top_20_volume = sum(sorted_volumes[:top_20_count])
concentration_ratio = top_20_volume / total_volume
print(f"\n= Volume Distribution Analysis:")
print(f" VWAP: ${vwap:.2f}")
print(f" Volume Range: {min_volume:,} - {max_volume:,}")
print(f" Average Volume: {avg_volume:,.0f}")
print(f" Concentration Ratio: {concentration_ratio:.1%}")
if concentration_ratio > 0.6:
print(" = High concentration - volume clustered at few levels")
elif concentration_ratio < 0.4:
print(" <
Well distributed volume across price levels")
else:
print(" Moderate volume concentration")
Memory Management and Performance¶
OrderBook Memory Management¶
async def orderbook_memory_management():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Check memory usage
memory_stats = await orderbook.get_memory_stats()
print("= OrderBook Memory Usage:")
print(f" Trade History: {memory_stats['trade_count']:,} trades ({memory_stats['trade_memory_mb']:.1f} MB)")
print(f" Depth Entries: {memory_stats['depth_entries']:,} levels ({memory_stats['depth_memory_mb']:.1f} MB)")
print(f" Total Memory: {memory_stats['total_memory_mb']:.1f} MB")
# Memory limits
limits = await orderbook.get_memory_limits()
print(f"\nMemory Limits:")
print(f" Max trades: {limits['max_trades']:,}")
print(f" Max depth entries: {limits['max_depth_entries']:,}")
print(f" Max memory: {limits['max_memory_mb']:.0f} MB")
# Usage ratios
trade_usage = memory_stats['trade_count'] / limits['max_trades']
depth_usage = memory_stats['depth_entries'] / limits['max_depth_entries']
memory_usage = memory_stats['total_memory_mb'] / limits['max_memory_mb']
print(f"\nUsage Ratios:")
print(f" Trade buffer: {trade_usage:.1%}")
print(f" Depth buffer: {depth_usage:.1%}")
print(f" Memory usage: {memory_usage:.1%}")
# Cleanup recommendations
if memory_usage > 0.8:
print(" High memory usage - consider cleanup")
# Manual cleanup (usually automatic)
cleaned_count = await orderbook.cleanup_old_data(keep_minutes=30)
print(f"> Cleaned up {cleaned_count:,} old entries")
# Performance monitoring
async def orderbook_performance_monitoring():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Performance metrics
perf_stats = await orderbook.get_performance_stats()
print(" OrderBook Performance:")
print(f" Update Rate: {perf_stats['updates_per_second']:.1f}/sec")
print(f" Processing Latency: {perf_stats['avg_processing_latency_ms']:.2f}ms")
print(f" Query Response Time: {perf_stats['avg_query_time_ms']:.2f}ms")
print(f" Memory Growth Rate: {perf_stats['memory_growth_mb_per_hour']:.2f} MB/hr")
# Performance optimization
if perf_stats['avg_processing_latency_ms'] > 10:
print(" High processing latency detected")
# Enable performance optimizations
await orderbook.enable_optimizations(
batch_updates=True, # Batch multiple updates
reduce_precision=False, # Keep full precision
compress_history=True # Compress old data
)
print(" Performance optimizations enabled")
Best Practices¶
1. Efficient OrderBook Queries¶
# Good: Get specific levels you need
bids = await orderbook.get_orderbook_bids(levels=10)
asks = await orderbook.get_orderbook_asks(levels=10)
# Avoid: Getting full book when you only need top levels
full_book = await orderbook.get_full_orderbook() # Expensive
2. Event-Driven Processing¶
async def efficient_orderbook_monitoring():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
# Good: Event-driven updates
async def on_depth_update(event):
# Process only the change
update_data = event.data
print(f"Depth updated at level {update_data['level']}")
await suite.on(EventType.DEPTH_UPDATE, on_depth_update)
# Avoid: Continuous polling
# while True:
# depth = await orderbook.get_orderbook_bids(levels=5) # Expensive
# await asyncio.sleep(1)
3. Memory-Conscious Operations¶
async def memory_conscious_analysis():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
# Set reasonable memory limits
await orderbook.configure_memory_limits(
max_trades=5000, # Keep last 5K trades
max_depth_entries=1000, # 1K depth entries per side
cleanup_interval=300 # Cleanup every 5 minutes
)
# Use time-based queries instead of keeping all data
recent_trades = await orderbook.get_recent_trades(minutes=30)
recent_volume_profile = await orderbook.get_volume_profile(time_window=1800)
4. Robust Error Handling¶
async def robust_orderbook_operations():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
try:
# OrderBook operations with error handling
best_prices = await orderbook.get_best_bid_ask()
if not best_prices['bid'] or not best_prices['ask']:
print(" No bid/ask available - market may be closed")
return
# Proceed with analysis
imbalance = await orderbook.get_market_imbalance()
except ConnectionError:
print("= Connection lost - attempting reconnection")
await suite.reconnect()
except Exception as e:
print(f"L OrderBook error: {e}")
# Fallback to basic price data
current_price = await suite.data.get_current_price()
print(f"Fallback to basic price: ${current_price:.2f}")
Integration with Trading Strategies¶
OrderBook-Based Entry Signals¶
async def orderbook_entry_signals():
suite = await TradingSuite.create("MNQ", features=["orderbook"])
orderbook = suite.orderbook
signal_tracker = {
'imbalance_signals': 0,
'liquidity_signals': 0,
'spoofing_signals': 0
}
async def check_entry_conditions():
"""Check OrderBook conditions for trade entries."""
# Get market data
imbalance = await orderbook.get_market_imbalance()
best_prices = await orderbook.get_best_bid_ask()
liquidity = await orderbook.get_liquidity_metrics()
signals = []
# Imbalance-based signals
if imbalance > 0.6: # Strong bid imbalance
signals.append(("BUY", "Strong bid imbalance", imbalance))
signal_tracker['imbalance_signals'] += 1
elif imbalance < -0.6: # Strong ask imbalance
signals.append(("SELL", "Strong ask imbalance", imbalance))
signal_tracker['imbalance_signals'] += 1
# Liquidity-based signals
if best_prices['spread'] > 5.0: # Wide spread
signals.append(("AVOID", "Wide spread - low liquidity", best_prices['spread']))
elif best_prices['spread'] < 1.0: # Tight spread
signals.append(("FAVORABLE", "Tight spread - good liquidity", best_prices['spread']))
signal_tracker['liquidity_signals'] += 1
# Recent spoofing detection
spoofing_analysis = await orderbook.get_spoofing_analysis()
recent_patterns = spoofing_analysis.get('recent_patterns', 0)
if recent_patterns > 3: # Multiple recent spoofing patterns
signals.append(("CAUTION", "Recent spoofing detected", recent_patterns))
signal_tracker['spoofing_signals'] += 1
# Print signals
if signals:
print(f"\n= OrderBook Entry Signals @ {datetime.now().strftime('%H:%M:%S')}:")
for signal_type, description, value in signals:
print(f" {signal_type}: {description} ({value:.2f})")
return signals
# Monitor for entry signals
for _ in range(20): # Check 20 times
await check_entry_conditions()
await asyncio.sleep(15) # Every 15 seconds
print(f"\n= Signal Summary:")
print(f" Imbalance signals: {signal_tracker['imbalance_signals']}")
print(f" Liquidity signals: {signal_tracker['liquidity_signals']}")
print(f" Spoofing warnings: {signal_tracker['spoofing_signals']}")
Summary¶
The ProjectX OrderBook provides comprehensive Level 2 market depth analysis:
- Real-time Level 2 data with WebSocket streaming
- Market microstructure analysis including spread, depth, and liquidity metrics
- Advanced pattern detection with spoofing and iceberg identification
- Volume profile analysis for identifying key price levels
- Memory management with sliding windows and automatic cleanup
- Performance optimization with event-driven updates and caching
- Integration-ready for sophisticated trading strategies
The OrderBook is designed for institutional-level market analysis with comprehensive error handling, real-time monitoring, and production-ready performance optimizations.
Previous: Technical Indicators Guide | Back to: User Guide Index