OrderBook API

Advanced Level 2 orderbook management and market microstructure analysis.

Overview

The OrderBook class provides institutional-grade orderbook analytics for real-time market depth analysis. It includes advanced features like iceberg order detection with price level history tracking, market imbalance analysis, volume profiling, and dynamic support/resistance identification based on temporal patterns.

New in v1.1.4: Enhanced analytics using price level refresh history for detecting persistent levels, iceberg orders, and market maker activity zones.

Async Level 2 orderbook toolkit for ProjectX.

Author: @TexasCoding Date: 2025-08-02

Overview:

Provides a complete async suite for Level 2 orderbook analysis, real-time market microstructure, and market depth analytics. Integrates with ProjectX for institutional-grade trading, strategy development, and execution research.

Key Features:
  • Real-time Level 2 market depth tracking (WebSocket)

  • Iceberg/cluster detection, volume profile, and POC analytics

  • Market imbalance, support/resistance, and trade flow stats

  • Memory-efficient, thread-safe, event-driven architecture

  • Component-based design for extensibility

  • Advanced market microstructure analysis

  • Comprehensive trade flow classification

  • Automatic memory management and cleanup

Orderbook Components:
  • Base OrderBook: Core data structures and thread-safe operations

  • Market Analytics: Imbalance, depth, delta, and liquidity analysis

  • Order Detection: Iceberg orders, clusters, and hidden liquidity detection

  • Volume Profile: Support/resistance levels and volume distribution

  • Memory Manager: Automatic cleanup and memory optimization

  • Realtime Handler: WebSocket integration and real-time data processing

Real-time Capabilities:
  • WebSocket-based Level 2 market depth updates

  • Immediate trade execution detection and classification

  • Real-time spread and price level monitoring

  • Event-driven callback system for custom logic

  • Automatic data validation and error handling

Example Usage:

```python # V3: Uses EventBus and factory functions from project_x_py import ProjectX, create_orderbook, create_realtime_client from project_x_py.events import EventBus, EventType import asyncio

async def main():

# V3: ProjectX client with context manager async with ProjectX.from_env() as client:

await client.authenticate()

# V3: Create realtime client with factory function realtime_client = await create_realtime_client(

jwt_token=client.jwt_token, account_id=str(client.account_id)

)

# V3: EventBus for unified event handling event_bus = EventBus()

# V3: Create orderbook with EventBus orderbook = create_orderbook(

“MNQ”, # V3: Using actual contract symbols event_bus=event_bus, project_x=client,

) await orderbook.initialize(realtime_client=realtime_client)

# V3: Register event handlers @event_bus.on(EventType.MARKET_DEPTH_UPDATE) async def on_depth_update(data):

print(f”Depth update: {data[‘timestamp’]}”)

# Get basic orderbook snapshot snapshot = await orderbook.get_orderbook_snapshot(levels=10) print(f”Best bid: {snapshot[‘best_bid’]}, Spread: {snapshot[‘spread’]}”)

# Advanced analytics imbalance = await orderbook.get_market_imbalance(levels=5) print(f”Market imbalance: {imbalance[‘imbalance_ratio’]:.2f}”)

# Detection algorithms icebergs = await orderbook.detect_iceberg_orders() print(f”Detected {len(icebergs[‘iceberg_levels’])} iceberg orders”)

await orderbook.cleanup()

asyncio.run(main()) ```

See also

  • orderbook.base.OrderBookBase

  • orderbook.analytics.MarketAnalytics

  • orderbook.detection.OrderDetection

  • orderbook.profile.VolumeProfile

  • orderbook.memory.MemoryManager

  • orderbook.realtime.RealtimeHandler

class DomType(*values)[source]

Bases: IntEnum

ProjectX DOM (Depth of Market) type codes.

UNKNOWN = 0
ASK = 1
BID = 2
BEST_ASK = 3
BEST_BID = 4
TRADE = 5
RESET = 6
SESSION_LOW = 7
SESSION_HIGH = 8
NEW_BEST_BID = 9
NEW_BEST_ASK = 10
FILL = 11
class IcebergConfig(min_refreshes=5, volume_threshold=50, time_window_minutes=10, confidence_threshold=0.7)[source]

Bases: object

Configuration for iceberg detection.

__init__(min_refreshes=5, volume_threshold=50, time_window_minutes=10, confidence_threshold=0.7)
confidence_threshold: float = 0.7
min_refreshes: int = 5
time_window_minutes: int = 10
volume_threshold: int = 50
class MarketAnalytics(orderbook)[source]

Bases: object

Provides market analytics for the orderbook.

This class implements advanced analytics methods for the OrderBook, focusing on extracting actionable market insights from the raw orderbook data. It is designed as a component that is injected into the main OrderBook to provide specialized analytical capabilities while maintaining a clean separation of concerns.

The analytics methods focus on several key areas: 1. Market imbalance analysis - Detecting buy/sell pressure 2. Liquidity distribution analysis - Understanding depth across price levels 3. Trade flow analysis - Classifying aggressive vs. passive executions 4. Cumulative delta tracking - Measuring net buying/selling pressure over time 5. Significant liquidity levels - Identifying potential support/resistance 6. Spread analysis - Bid-ask spread characteristics and patterns 7. Statistical summaries - Comprehensive orderbook health metrics

Each method follows a consistent pattern: - Thread-safe execution through the orderbook lock - Comprehensive error handling and logging - Return of structured analysis results with multiple metrics - Optional time filtering when appropriate - Configurable parameters for analysis sensitivity

These analytics are designed to be used by trading strategies, market analysis tools, and visualization components that need deeper insights beyond raw orderbook data.

Performance Characteristics:
  • All methods are optimized for real-time analysis

  • Thread-safe operations with minimal lock contention

  • Graceful degradation when data is insufficient

  • Memory-efficient calculations using Polars DataFrames

__init__(orderbook)[source]
static analyze_dataframe_spread(data, bid_column='bid', ask_column='ask', mid_column=None)[source]

Analyze bid-ask spread characteristics from a DataFrame.

This is a static method that can analyze spread data from any DataFrame, useful for historical analysis or backtesting scenarios where you have bid/ask data but not a live orderbook.

Parameters:
  • data (DataFrame) – DataFrame with bid/ask price columns

  • bid_column (str) – Name of the bid price column (default: “bid”)

  • ask_column (str) – Name of the ask price column (default: “ask”)

  • mid_column (str | None) – Name of the mid price column (optional, will calculate if not provided)

Returns:

  • avg_spread: Average absolute spread

  • median_spread: Median absolute spread

  • min_spread: Minimum spread observed

  • max_spread: Maximum spread observed

  • avg_relative_spread: Average spread as percentage of mid price

  • spread_volatility: Standard deviation of spread

Return type:

Dict containing spread analysis

Example

>>> # Analyze historical bid/ask data
>>> spread_stats = MarketAnalytics.analyze_dataframe_spread(historical_data)
>>> print(f"Average spread: ${spread_stats['avg_spread']:.4f}")
>>> print(f"Relative spread: {spread_stats['avg_relative_spread']:.4%}")
async get_cumulative_delta(time_window_minutes=60)[source]

Get cumulative delta (buy volume - sell volume) over time window.

Parameters:

time_window_minutes (int) – Time window to analyze

Return type:

dict[str, Any]

Returns:

Dict containing cumulative delta analysis

async get_liquidity_levels(min_volume=100, levels=20)[source]

Identify significant liquidity levels in the orderbook.

Parameters:
  • min_volume (int) – Minimum volume to consider significant

  • levels (int) – Number of levels to check on each side

Return type:

dict[str, Any]

Returns:

Dict containing liquidity analysis

async get_market_imbalance(levels=10)[source]

Calculate order flow imbalance between bid and ask sides.

This method quantifies the imbalance between buying and selling pressure in the orderbook by comparing the total volume on the bid side versus the ask side. A positive imbalance ratio indicates stronger buying pressure, while a negative ratio indicates stronger selling pressure.

The analysis includes: - Imbalance ratio: (bid_volume - ask_volume) / total_volume - Raw bid and ask volumes - Number of populated price levels on each side - Text analysis of market conditions based on the imbalance ratio - Timestamp of the analysis

The method is thread-safe and acquires the orderbook lock during execution.

Parameters:

levels (int) – Number of price levels to analyze on each side of the book (default: 10). Higher values analyze deeper into the orderbook but may include less relevant levels.

Returns:

imbalance_ratio: Float between -1.0 and 1.0 where:
  • Positive values indicate buying pressure

  • Negative values indicate selling pressure

  • Values near 0 indicate a balanced orderbook

bid_volume: Total volume on the bid side ask_volume: Total volume on the ask side bid_levels: Number of populated bid price levels analyzed ask_levels: Number of populated ask price levels analyzed analysis: Text description of market conditions timestamp: Time of analysis

Return type:

Dict containing

Example

>>> imbalance = await orderbook.get_market_imbalance(levels=5)
>>> print(f"Imbalance ratio: {imbalance['imbalance_ratio']:.2f}")
>>> print(f"Analysis: {imbalance['analysis']}")
>>> print(
...     f"Bid volume: {imbalance['bid_volume']}, "
...     f"Ask volume: {imbalance['ask_volume']}"
... )
async get_orderbook_depth(price_range)[source]

Analyze orderbook depth within a price range.

Parameters:

price_range (float) – Price range from best bid/ask to analyze

Return type:

MarketImpactResponse

Returns:

Dict containing depth analysis

async get_statistics()[source]

Get comprehensive orderbook statistics.

Return type:

dict[str, Any]

async get_trade_flow_summary()[source]

Get comprehensive trade flow statistics.

Return type:

dict[str, Any]

class MarketDataDict[source]

Bases: TypedDict

Type definition for market data updates.

contractId: str
data: list[dict[str, Any]]
class MemoryConfig(max_trades=10000, max_depth_entries=1000, cleanup_interval=300, max_history_per_level=50, price_history_window_minutes=30, max_best_price_history=1000, max_spread_history=1000, max_delta_history=1000)[source]

Bases: object

Configuration for memory management.

__init__(max_trades=10000, max_depth_entries=1000, cleanup_interval=300, max_history_per_level=50, price_history_window_minutes=30, max_best_price_history=1000, max_spread_history=1000, max_delta_history=1000)
cleanup_interval: int = 300
max_best_price_history: int = 1000
max_delta_history: int = 1000
max_depth_entries: int = 1000
max_history_per_level: int = 50
max_spread_history: int = 1000
max_trades: int = 10000
price_history_window_minutes: int = 30
class OrderBook(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Bases: OrderBookBase

Async Level 2 Orderbook with comprehensive market analysis.

This class combines all orderbook functionality into a single interface, providing a unified API for accessing real-time market depth data, advanced analytics, detection algorithms, and volume profiling. It uses a component-based architecture where specialized functionality is delegated to dedicated components while maintaining a simple, cohesive interface for the client code.

Key Components:
  • realtime_handler: Manages WebSocket connections and real-time data processing

  • analytics: Provides market analytics (imbalance, depth, delta, liquidity)

  • detection: Implements detection algorithms (iceberg, clusters)

  • profile: Handles volume profiling and support/resistance analysis

  • memory_manager: Manages memory usage and cleanup tasks

Thread Safety:

All methods are thread-safe and can be called concurrently from multiple asyncio tasks. Data consistency is maintained through internal locks.

Memory Management:

The orderbook implements automatic memory management through the MemoryManager component, which periodically cleans up historical data based on configurable parameters to prevent memory leaks during long-running sessions.

Real-time Features:
  • WebSocket-based Level 2 market depth updates

  • Immediate trade execution detection and classification

  • Real-time spread and price level monitoring

  • Event-driven callback system for custom logic

  • Automatic data validation and error handling

Analytics Capabilities:
  • Market imbalance analysis and ratio calculations

  • Orderbook depth analysis within price ranges

  • Cumulative delta tracking and trade flow statistics

  • Liquidity level identification and concentration analysis

  • Comprehensive orderbook statistics and health metrics

Detection Algorithms:
  • Iceberg order detection with confidence scoring

  • Order clustering analysis for institutional activity

  • Advanced market microstructure metrics

  • Hidden liquidity and volume pattern recognition

Example

>>> # V3: Create orderbook with EventBus
>>> event_bus = EventBus()
>>> orderbook = OrderBook("MNQ", event_bus, project_x_client)
>>> await orderbook.initialize(realtime_client)
>>>
>>> # V3: Register event handlers
>>> @event_bus.on(EventType.MARKET_DEPTH_UPDATE)
>>> async def handle_depth(data):
...     print(f"Depth: {data['bids'][0]['price']} @ {data['bids'][0]['size']}")
>>>
>>> # Get basic orderbook data
>>> snapshot = await orderbook.get_orderbook_snapshot()
>>> print(f"Spread: {snapshot['spread']}")
>>>
>>> # Advanced analytics
>>> imbalance = await orderbook.get_market_imbalance()
>>> liquidity = await orderbook.get_liquidity_levels()
>>>
>>> # Detection algorithms
>>> icebergs = await orderbook.detect_iceberg_orders()
>>> clusters = await orderbook.detect_order_clusters()
>>>
>>> # Volume profiling
>>> profile = await orderbook.get_volume_profile()
>>> support_resistance = await orderbook.get_support_resistance_levels()
>>>
>>> # Cleanup when done
>>> await orderbook.cleanup()
__init__(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Initialize the orderbook.

Parameters:
  • instrument (str) – Trading instrument symbol

  • event_bus (Any) – EventBus instance for unified event handling. Required for all event emissions including market depth updates and trade ticks.

  • project_x (ProjectXBase | None) – Optional ProjectX client for tick size lookup

  • timezone_str (str) – Timezone for timestamps (default: America/Chicago)

  • config (OrderbookConfig | None) – Optional configuration for orderbook behavior

async initialize(realtime_client=None, subscribe_to_depth=True, subscribe_to_quotes=True)[source]

Initialize the orderbook with optional real-time data feed.

This method configures the orderbook for operation, sets up the memory manager, and optionally connects to the real-time data feed. It must be called after creating an OrderBook instance and before using any other methods.

The initialization process performs the following steps: 1. Starts the memory manager for automatic cleanup 2. If a realtime_client is provided:

  • Registers callbacks for market depth and quote updates

  • Subscribes to the specified data channels

  • Sets up WebSocket connection handlers

Parameters:
  • realtime_client (ProjectXRealtimeClient | None) – Async real-time client for WebSocket data. If provided, the orderbook will receive live market data updates. If None, the orderbook will function in historical/static mode only.

  • subscribe_to_depth (bool) – Subscribe to market depth updates (Level 2 data). Set to False only if you don’t need full order book data.

  • subscribe_to_quotes (bool) – Subscribe to quote updates (top of book data). Set to False only if you don’t need quote data.

Returns:

True if initialization successful, False if any part of the

initialization failed.

Return type:

bool

Example

>>> # V3: Initialize with EventBus and realtime client
>>> event_bus = EventBus()
>>> orderbook = OrderBook("MNQ", event_bus, client)
>>> # V3: Create realtime client with factory
>>> realtime_client = await create_realtime_client(
...     jwt_token=client.jwt_token, account_id=str(client.account_id)
... )
>>> success = await orderbook.initialize(
...     realtime_client=realtime_client,
...     subscribe_to_depth=True,
...     subscribe_to_quotes=True,
... )
>>> if success:
...     print("Orderbook initialized and receiving real-time data")
... else:
...     print("Failed to initialize orderbook")
async get_market_imbalance(levels=10)[source]

Calculate order flow imbalance between bid and ask sides.

Delegates to MarketAnalytics.get_market_imbalance(). See MarketAnalytics.get_market_imbalance() for complete documentation.

Return type:

LiquidityAnalysisResponse

async get_orderbook_depth(price_range)[source]

Analyze orderbook depth within a price range.

Delegates to MarketAnalytics.get_orderbook_depth(). See MarketAnalytics.get_orderbook_depth() for complete documentation.

Return type:

MarketImpactResponse

async get_cumulative_delta(time_window_minutes=60)[source]

Get cumulative delta (buy volume - sell volume) over time window.

Delegates to MarketAnalytics.get_cumulative_delta(). See MarketAnalytics.get_cumulative_delta() for complete documentation.

Return type:

dict[str, Any]

async get_trade_flow_summary()[source]

Get comprehensive trade flow statistics.

Delegates to MarketAnalytics.get_trade_flow_summary(). See MarketAnalytics.get_trade_flow_summary() for complete documentation.

Return type:

dict[str, Any]

async get_liquidity_levels(min_volume=100, levels=20)[source]

Identify significant liquidity levels in the orderbook.

Delegates to MarketAnalytics.get_liquidity_levels(). See MarketAnalytics.get_liquidity_levels() for complete documentation.

Return type:

dict[str, Any]

async get_statistics()[source]

Get comprehensive orderbook statistics.

Delegates to MarketAnalytics.get_statistics(). See MarketAnalytics.get_statistics() for complete documentation.

Return type:

dict[str, Any]

async detect_iceberg_orders(min_refreshes=None, volume_threshold=None, time_window_minutes=None)[source]

Detect potential iceberg orders based on price level refresh patterns.

Delegates to OrderDetection.detect_iceberg_orders(). See OrderDetection.detect_iceberg_orders() for complete documentation.

Return type:

dict[str, Any]

async detect_order_clusters(min_cluster_size=3, price_tolerance=0.1)[source]

Detect clusters of orders at similar price levels.

Delegates to OrderDetection.detect_order_clusters(). See OrderDetection.detect_order_clusters() for complete documentation.

Return type:

list[dict[str, Any]]

async get_advanced_market_metrics()[source]

Calculate advanced market microstructure metrics.

Delegates to OrderDetection.get_advanced_market_metrics(). See OrderDetection.get_advanced_market_metrics() for complete documentation.

Return type:

OrderbookAnalysisResponse

async get_volume_profile(time_window_minutes=60, price_bins=20)[source]

Calculate volume profile showing volume distribution by price.

Delegates to VolumeProfile.get_volume_profile(). See VolumeProfile.get_volume_profile() for complete documentation.

Return type:

dict[str, Any]

async get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]

Identify support and resistance levels based on price history.

Delegates to VolumeProfile.get_support_resistance_levels(). See VolumeProfile.get_support_resistance_levels() for complete documentation.

Return type:

dict[str, Any]

async get_spread_analysis(window_minutes=30)[source]

Analyze bid-ask spread patterns over time.

Delegates to VolumeProfile.get_spread_analysis(). See VolumeProfile.get_spread_analysis() for complete documentation.

Return type:

LiquidityAnalysisResponse

async get_memory_stats()[source]

Get comprehensive memory usage statistics.

Delegates to MemoryManager.get_memory_stats(). See MemoryManager.get_memory_stats() for complete documentation.

Return type:

OrderbookStats

async cleanup()[source]

Clean up resources and disconnect from real-time feeds.

Return type:

None

class OrderbookSide(*values)[source]

Bases: IntEnum

Orderbook side enumeration.

BID = 0
ASK = 1
class OrderbookSnapshot[source]

Bases: TypedDict

Type definition for orderbook snapshot.

instrument: str
timestamp: datetime
best_bid: float | None
best_ask: float | None
spread: float | None
mid_price: float | None
bids: list[PriceLevelDict]
asks: list[PriceLevelDict]
total_bid_volume: int
total_ask_volume: int
bid_count: int
ask_count: int
imbalance: float | None
class PriceLevelDict[source]

Bases: TypedDict

Type definition for price level data.

price: float
volume: int
timestamp: datetime
class TradeDict[source]

Bases: TypedDict

Type definition for trade data.

price: float
volume: int
timestamp: datetime
side: str
spread_at_trade: float | None
mid_price_at_trade: float | None
best_bid_at_trade: float | None
best_ask_at_trade: float | None
order_type: str
class VolumeProfile(orderbook)[source]

Bases: object

Provides volume profile and price level analysis.

This class implements advanced market structure analysis methods focusing on volume distribution and key price level identification. It is designed as a specialized component of the OrderBook that reveals deeper insights into market structure and participant behavior patterns.

Key functionalities: 1. Volume profile generation - Creates histogram-style analysis of volume distribution

across price levels, identifying high-volume nodes and areas of interest

  1. Support/resistance detection - Identifies price levels that have shown significant reaction in the past based on price history and order flow

  2. Spread analysis - Studies bid-ask spread patterns over time to identify market regime changes and liquidity conditions

These analyses are particularly useful for: - Identifying key price levels for trade entry and exit - Understanding where significant market participant activity has occurred - Recognizing market structure patterns and regime changes - Planning trade executions around areas of expected support/resistance

The class implements thread-safe methods that operate on the historical data accumulated by the orderbook, with configurable time window parameters to focus analysis on the most relevant recent market activity.

__init__(orderbook)[source]
static calculate_dataframe_volume_profile(data, price_column='close', volume_column='volume', num_bins=50)[source]

Calculate volume profile from a DataFrame with price and volume data.

This static method provides volume profile analysis for any DataFrame, useful for historical analysis or when working with data outside of the orderbook context. It creates a histogram of volume distribution across price levels and identifies key areas of market interest.

Parameters:
  • data (DataFrame) – DataFrame with price and volume data

  • price_column (str) – Name of the price column (default: “close”)

  • volume_column (str) – Name of the volume column (default: “volume”)

  • num_bins (int) – Number of price bins for the histogram (default: 50)

Returns:

  • point_of_control: Price level with highest volume

  • poc_volume: Volume at the point of control

  • value_area_high: Upper bound of 70% volume area

  • value_area_low: Lower bound of 70% volume area

  • total_volume: Total volume analyzed

  • volume_distribution: Top 10 high-volume price levels

Return type:

Dict containing volume profile analysis

Example

>>> # Analyze volume distribution in historical data
>>> profile = VolumeProfile.calculate_dataframe_volume_profile(ohlcv_data)
>>> print(f"POC Price: ${profile['point_of_control']:.2f}")
>>> print(
...     f"Value Area: ${profile['value_area_low']:.2f} - ${profile['value_area_high']:.2f}"
... )
async get_spread_analysis(window_minutes=30)[source]

Analyze bid-ask spread patterns over time.

Parameters:

window_minutes (int) – Time window for analysis

Return type:

LiquidityAnalysisResponse

Returns:

Dict containing spread statistics and patterns

async get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]

Identify support and resistance levels based on price history.

This method analyzes historical price action to identify significant support and resistance levels where price has repeatedly reacted. These levels are critical for trading decisions as they often represent areas where price may reverse or consolidate.

Algorithm Overview: 1. Collects price points from recent trades and orderbook levels 2. Groups nearby prices within the tolerance range 3. Counts how many times each price cluster was “touched” 4. Identifies clusters with sufficient touches as support/resistance 5. Classifies levels as support (below current price) or resistance (above)

A “touch” is counted when: - A trade occurs at or near the price level - The orderbook shows significant volume at the level - Price approaches but doesn’t significantly break through the level

Strength Assessment: - More touches indicate stronger levels - Recent touches are weighted more heavily - Volume at the level increases its significance

Parameters:
  • lookback_minutes (int) – Time window to analyze in minutes (default: 120). Longer periods capture more historical levels but may include outdated information. Shorter periods focus on recent structure.

  • min_touches (int) – Minimum number of price touches required for a level to qualify as support/resistance (default: 3). Higher values identify only the strongest levels.

  • price_tolerance (float) – Price range to group similar levels (default: 0.1). This should be adjusted based on the instrument’s tick size and typical price movement.

Returns:

support_levels: List of identified support levels, each containing:
  • price: The support price level

  • touches: Number of times price touched this level

  • strength: Relative strength score (0.0 to 1.0)

  • last_touch: Timestamp of most recent touch

  • volume_at_level: Average volume when price was at this level

resistance_levels: List of identified resistance levels (same format) strongest_support: The support level with highest strength score strongest_resistance: The resistance level with highest strength score current_price: Current market price for reference analysis_window: Time window used for the analysis

Return type:

Dict containing support and resistance analysis

Example

>>> # Find strong support/resistance over 4 hours
>>> levels = await orderbook.get_support_resistance_levels(
...     lookback_minutes=240, min_touches=5, price_tolerance=0.25
... )
>>> print(f"Found {len(levels['support_levels'])} support levels")
>>> for level in levels["support_levels"]:
...     print(
...         f"Support at {level['price']}: {level['touches']} touches, strength {level['strength']:.2f}"
...     )
>>>
>>> if levels["strongest_resistance"]:
...     print(f"Key resistance: {levels['strongest_resistance']['price']}")
async get_volume_profile(time_window_minutes=60, price_bins=20)[source]

Calculate volume profile showing volume distribution by price.

Volume profile analysis reveals where the most trading activity has occurred by creating a histogram of volume distribution across price levels. This is a fundamental tool for identifying key areas of market interest, support/ resistance levels, and understanding market structure.

Key Metrics Calculated: 1. Point of Control (POC): The price level with the highest volume 2. Value Area: The range containing 70% of the total volume around the POC 3. Price bins: Histogram bins showing volume at each price level 4. Volume distribution: Complete breakdown of trading activity

The analysis works by: - Filtering trades within the specified time window - Dividing the price range into equal-sized bins - Calculating total volume for each price bin - Identifying the POC as the bin with maximum volume - Expanding around the POC to find the value area (70% of volume)

Parameters:
  • time_window_minutes (int) – Time window to analyze in minutes (default: 60). Larger windows provide more data but may include less relevant historical information.

  • price_bins (int) – Number of price bins for the histogram (default: 20). More bins provide finer granularity but may fragment the data; fewer bins provide smoother distribution but less detail.

Returns:

price_bins: List of price bin centers volumes: List of volumes corresponding to each price bin poc: Point of Control price (highest volume price) value_area_high: Upper boundary of the value area value_area_low: Lower boundary of the value area total_volume: Total volume analyzed time_window_minutes: Time window used for analysis

Return type:

Dict containing comprehensive volume profile analysis

Example

>>> # Get 1-hour volume profile with 30 price bins
>>> profile = await orderbook.get_volume_profile(
...     time_window_minutes=60, price_bins=30
... )
>>> print(f"POC at {profile['poc']}")
>>> print(
...     f"Value area: {profile['value_area_low']} - {profile['value_area_high']}"
... )
>>> print(f"Total volume: {profile['total_volume']}")
>>>
>>> # Find bins with highest activity
>>> max_vol_idx = profile["volumes"].index(max(profile["volumes"]))
>>> print(
...     f"Highest volume: {profile['volumes'][max_vol_idx]} at {profile['price_bins'][max_vol_idx]}"
... )
create_orderbook(instrument, event_bus, project_x=None, realtime_client=None, timezone_str='America/Chicago')[source]

Factory function to create an orderbook.

This factory function creates and returns an OrderBook instance for the specified instrument. It simplifies the process of creating an orderbook by handling the initial configuration. Note that the returned orderbook is not yet initialized - you must call the initialize() method separately to start the orderbook’s functionality.

The factory approach provides several benefits: 1. Ensures consistent orderbook creation across the application 2. Allows for future extension with pre-configured orderbook variants 3. Simplifies the API for common use cases

Parameters:
  • instrument (str) – Trading instrument symbol (e.g., “ES”, “NQ”, “MES”, “MNQ”). This should be the base symbol without contract-specific extensions.

  • project_x (ProjectXBase | None) – Optional AsyncProjectX client for tick size lookup and API access. If provided, the orderbook will be able to look up tick sizes and other contract details automatically.

  • realtime_client (ProjectXRealtimeClient | None) – Optional real-time client for WebSocket data. This is kept for compatibility but should be passed to initialize() instead.

  • timezone_str (str) – Timezone for timestamps (default: “America/Chicago”). All timestamps in the orderbook will be converted to this timezone.

Returns:

Orderbook instance that must be initialized with a call to initialize() before use.

Return type:

OrderBook

Example

>>> # V3: Create an orderbook with EventBus
>>> event_bus = EventBus()
>>> orderbook = create_orderbook(
...     instrument="MNQ",  # V3: Using actual contract symbols
...     event_bus=event_bus,
...     project_x=client,
...     timezone_str="America/Chicago",  # V3: Using CME timezone
... )
>>>
>>> # V3: Initialize with factory-created realtime client
>>> realtime_client = await create_realtime_client(
...     jwt_token=client.jwt_token, account_id=str(client.account_id)
... )
>>> await orderbook.initialize(realtime_client=realtime_client)
>>>
>>> # Start using the orderbook
>>> snapshot = await orderbook.get_orderbook_snapshot()

Quick Start

import asyncio
from project_x_py import ProjectX, create_orderbook, create_realtime_client

async def analyze_orderbook():
    # Create client and orderbook with dynamic tick size detection
    async with ProjectX.from_env() as client:
        await client.authenticate()

        # Get instrument and create orderbook
        instrument = await client.get_instrument("MGC")
        realtime_client = create_realtime_client(client.session_token)
        orderbook = create_orderbook(instrument, client, realtime_client)

        # Get real-time market depth (requires real-time data subscription)
        snapshot = await orderbook.get_orderbook_snapshot(levels=10)
        print(f"Best Bid: ${snapshot['metadata']['best_bid']:.2f}")
        print(f"Best Ask: ${snapshot['metadata']['best_ask']:.2f}")
        print(f"Spread: ${snapshot['metadata']['spread']:.2f}")

        # Advanced market microstructure analysis
        metrics = await orderbook.get_advanced_market_metrics()
        print(f"Market Imbalance: {metrics['market_imbalance']['direction']}")

asyncio.run(analyze_orderbook())

Core OrderBook Class

class OrderBook(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Bases: OrderBookBase

Async Level 2 Orderbook with comprehensive market analysis.

This class combines all orderbook functionality into a single interface, providing a unified API for accessing real-time market depth data, advanced analytics, detection algorithms, and volume profiling. It uses a component-based architecture where specialized functionality is delegated to dedicated components while maintaining a simple, cohesive interface for the client code.

Key Components:
  • realtime_handler: Manages WebSocket connections and real-time data processing

  • analytics: Provides market analytics (imbalance, depth, delta, liquidity)

  • detection: Implements detection algorithms (iceberg, clusters)

  • profile: Handles volume profiling and support/resistance analysis

  • memory_manager: Manages memory usage and cleanup tasks

Thread Safety:

All methods are thread-safe and can be called concurrently from multiple asyncio tasks. Data consistency is maintained through internal locks.

Memory Management:

The orderbook implements automatic memory management through the MemoryManager component, which periodically cleans up historical data based on configurable parameters to prevent memory leaks during long-running sessions.

Real-time Features:
  • WebSocket-based Level 2 market depth updates

  • Immediate trade execution detection and classification

  • Real-time spread and price level monitoring

  • Event-driven callback system for custom logic

  • Automatic data validation and error handling

Analytics Capabilities:
  • Market imbalance analysis and ratio calculations

  • Orderbook depth analysis within price ranges

  • Cumulative delta tracking and trade flow statistics

  • Liquidity level identification and concentration analysis

  • Comprehensive orderbook statistics and health metrics

Detection Algorithms:
  • Iceberg order detection with confidence scoring

  • Order clustering analysis for institutional activity

  • Advanced market microstructure metrics

  • Hidden liquidity and volume pattern recognition

Example

>>> # V3: Create orderbook with EventBus
>>> event_bus = EventBus()
>>> orderbook = OrderBook("MNQ", event_bus, project_x_client)
>>> await orderbook.initialize(realtime_client)
>>>
>>> # V3: Register event handlers
>>> @event_bus.on(EventType.MARKET_DEPTH_UPDATE)
>>> async def handle_depth(data):
...     print(f"Depth: {data['bids'][0]['price']} @ {data['bids'][0]['size']}")
>>>
>>> # Get basic orderbook data
>>> snapshot = await orderbook.get_orderbook_snapshot()
>>> print(f"Spread: {snapshot['spread']}")
>>>
>>> # Advanced analytics
>>> imbalance = await orderbook.get_market_imbalance()
>>> liquidity = await orderbook.get_liquidity_levels()
>>>
>>> # Detection algorithms
>>> icebergs = await orderbook.detect_iceberg_orders()
>>> clusters = await orderbook.detect_order_clusters()
>>>
>>> # Volume profiling
>>> profile = await orderbook.get_volume_profile()
>>> support_resistance = await orderbook.get_support_resistance_levels()
>>>
>>> # Cleanup when done
>>> await orderbook.cleanup()
__init__(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Initialize the orderbook.

Parameters:
  • instrument (str) – Trading instrument symbol

  • event_bus (Any) – EventBus instance for unified event handling. Required for all event emissions including market depth updates and trade ticks.

  • project_x (ProjectXBase | None) – Optional ProjectX client for tick size lookup

  • timezone_str (str) – Timezone for timestamps (default: America/Chicago)

  • config (OrderbookConfig | None) – Optional configuration for orderbook behavior

async initialize(realtime_client=None, subscribe_to_depth=True, subscribe_to_quotes=True)[source]

Initialize the orderbook with optional real-time data feed.

This method configures the orderbook for operation, sets up the memory manager, and optionally connects to the real-time data feed. It must be called after creating an OrderBook instance and before using any other methods.

The initialization process performs the following steps: 1. Starts the memory manager for automatic cleanup 2. If a realtime_client is provided:

  • Registers callbacks for market depth and quote updates

  • Subscribes to the specified data channels

  • Sets up WebSocket connection handlers

Parameters:
  • realtime_client (ProjectXRealtimeClient | None) – Async real-time client for WebSocket data. If provided, the orderbook will receive live market data updates. If None, the orderbook will function in historical/static mode only.

  • subscribe_to_depth (bool) – Subscribe to market depth updates (Level 2 data). Set to False only if you don’t need full order book data.

  • subscribe_to_quotes (bool) – Subscribe to quote updates (top of book data). Set to False only if you don’t need quote data.

Returns:

True if initialization successful, False if any part of the

initialization failed.

Return type:

bool

Example

>>> # V3: Initialize with EventBus and realtime client
>>> event_bus = EventBus()
>>> orderbook = OrderBook("MNQ", event_bus, client)
>>> # V3: Create realtime client with factory
>>> realtime_client = await create_realtime_client(
...     jwt_token=client.jwt_token, account_id=str(client.account_id)
... )
>>> success = await orderbook.initialize(
...     realtime_client=realtime_client,
...     subscribe_to_depth=True,
...     subscribe_to_quotes=True,
... )
>>> if success:
...     print("Orderbook initialized and receiving real-time data")
... else:
...     print("Failed to initialize orderbook")
async get_market_imbalance(levels=10)[source]

Calculate order flow imbalance between bid and ask sides.

Delegates to MarketAnalytics.get_market_imbalance(). See MarketAnalytics.get_market_imbalance() for complete documentation.

Return type:

LiquidityAnalysisResponse

async get_orderbook_depth(price_range)[source]

Analyze orderbook depth within a price range.

Delegates to MarketAnalytics.get_orderbook_depth(). See MarketAnalytics.get_orderbook_depth() for complete documentation.

Return type:

MarketImpactResponse

async get_cumulative_delta(time_window_minutes=60)[source]

Get cumulative delta (buy volume - sell volume) over time window.

Delegates to MarketAnalytics.get_cumulative_delta(). See MarketAnalytics.get_cumulative_delta() for complete documentation.

Return type:

dict[str, Any]

async get_trade_flow_summary()[source]

Get comprehensive trade flow statistics.

Delegates to MarketAnalytics.get_trade_flow_summary(). See MarketAnalytics.get_trade_flow_summary() for complete documentation.

Return type:

dict[str, Any]

async get_liquidity_levels(min_volume=100, levels=20)[source]

Identify significant liquidity levels in the orderbook.

Delegates to MarketAnalytics.get_liquidity_levels(). See MarketAnalytics.get_liquidity_levels() for complete documentation.

Return type:

dict[str, Any]

async get_statistics()[source]

Get comprehensive orderbook statistics.

Delegates to MarketAnalytics.get_statistics(). See MarketAnalytics.get_statistics() for complete documentation.

Return type:

dict[str, Any]

async detect_iceberg_orders(min_refreshes=None, volume_threshold=None, time_window_minutes=None)[source]

Detect potential iceberg orders based on price level refresh patterns.

Delegates to OrderDetection.detect_iceberg_orders(). See OrderDetection.detect_iceberg_orders() for complete documentation.

Return type:

dict[str, Any]

async detect_order_clusters(min_cluster_size=3, price_tolerance=0.1)[source]

Detect clusters of orders at similar price levels.

Delegates to OrderDetection.detect_order_clusters(). See OrderDetection.detect_order_clusters() for complete documentation.

Return type:

list[dict[str, Any]]

async get_advanced_market_metrics()[source]

Calculate advanced market microstructure metrics.

Delegates to OrderDetection.get_advanced_market_metrics(). See OrderDetection.get_advanced_market_metrics() for complete documentation.

Return type:

OrderbookAnalysisResponse

async get_volume_profile(time_window_minutes=60, price_bins=20)[source]

Calculate volume profile showing volume distribution by price.

Delegates to VolumeProfile.get_volume_profile(). See VolumeProfile.get_volume_profile() for complete documentation.

Return type:

dict[str, Any]

async get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]

Identify support and resistance levels based on price history.

Delegates to VolumeProfile.get_support_resistance_levels(). See VolumeProfile.get_support_resistance_levels() for complete documentation.

Return type:

dict[str, Any]

async get_spread_analysis(window_minutes=30)[source]

Analyze bid-ask spread patterns over time.

Delegates to VolumeProfile.get_spread_analysis(). See VolumeProfile.get_spread_analysis() for complete documentation.

Return type:

LiquidityAnalysisResponse

async get_memory_stats()[source]

Get comprehensive memory usage statistics.

Delegates to MemoryManager.get_memory_stats(). See MemoryManager.get_memory_stats() for complete documentation.

Return type:

OrderbookStats

async cleanup()[source]

Clean up resources and disconnect from real-time feeds.

Return type:

None

Core Methods

Market Depth Management

async OrderBook.get_orderbook_snapshot(levels=10)
Return type:

OrderbookSnapshot

Get a complete snapshot of the current orderbook state.

This method provides a comprehensive snapshot of the current orderbook state, including top-of-book information, bid/ask levels, volume totals, and imbalance calculations. It’s designed to give a complete picture of the market at a single point in time for analysis or display purposes.

The snapshot includes: - Best bid and ask prices with spread - Mid-price calculation - Specified number of bid and ask levels with prices and volumes - Total volume on bid and ask sides - Order count on each side - Bid/ask imbalance ratio - Last update timestamp and update count

The method is thread-safe and acquires the orderbook lock during execution.

Args:

levels: Number of price levels to include on each side (default: 10)

Returns:

Dict containing the complete orderbook snapshot with all the fields specified above. See OrderbookSnapshot type for details.

Raises:

ProjectXError: If an error occurs during snapshot generation

Example:
>>> # V3: Get full orderbook with 5 levels on each side
>>> snapshot = await orderbook.get_orderbook_snapshot(levels=5)
>>>
>>> # V3: Print top of book with imbalance
>>> print(
...     f"Best Bid: {snapshot['best_bid']:.2f} ({snapshot['total_bid_volume']} contracts)"
... )
>>> print(
...     f"Best Ask: {snapshot['best_ask']:.2f} ({snapshot['total_ask_volume']} contracts)"
... )
>>> print(
...     f"Spread: {snapshot['spread']:.2f}, Mid: {snapshot['mid_price']:.2f}"
... )
>>> print(
...     f"Imbalance: {snapshot['imbalance']:.2%} ({'Bid Heavy' if snapshot['imbalance'] > 0 else 'Ask Heavy'})"
... )
>>>
>>> # V3: Display depth with cumulative volume
>>> cumulative_bid = 0
>>> print("

Bids:”)
>>> for bid in snapshot["bids"]:
...     cumulative_bid += bid["volume"]
...     print(
...         f"  {bid['price']:.2f}: {bid['volume']:5d} (cum: {cumulative_bid:6d})"
...     )
>>>
>>> cumulative_ask = 0
>>> print("
Asks:”)
>>> for ask in snapshot["asks"]:
...     cumulative_ask += ask["volume"]
...     print(
...         f"  {ask['price']:.2f}: {ask['volume']:5d} (cum: {cumulative_ask:6d})"
...     )
async OrderBook.get_orderbook_bids(levels=10)

Get orderbook bids up to specified levels.

Return type:

DataFrame

async OrderBook.get_orderbook_asks(levels=10)

Get orderbook asks up to specified levels.

Return type:

DataFrame

async OrderBook.get_best_bid_ask()

Get current best bid and ask prices with spread calculation.

This method provides the current top-of-book information, including the best (highest) bid price, best (lowest) ask price, the calculated spread between them, and the timestamp of the calculation. It also updates internal history tracking for bid, ask, and spread values.

The method is thread-safe and acquires the orderbook lock before accessing the underlying data structures.

Returns:

bid: The highest bid price (float or None if no bids) ask: The lowest ask price (float or None if no asks) spread: The difference between ask and bid (float or None if either missing) timestamp: The time of calculation (datetime)

Return type:

Dict containing

Example

>>> # V3: Get best bid/ask with spread
>>> prices = await orderbook.get_best_bid_ask()
>>> if prices["bid"] is not None and prices["ask"] is not None:
...     print(
...         f"Bid: {prices['bid']:.2f}, Ask: {prices['ask']:.2f}, "
...         f"Spread: {prices['spread']:.2f} ticks"
...     )
...     # V3: Calculate mid price
...     mid = (prices["bid"] + prices["ask"]) / 2
...     print(f"Mid price: {mid:.2f}")
... else:
...     print("Incomplete market data")

Basic Analysis

async OrderBook.get_orderbook_depth(price_range)[source]

Analyze orderbook depth within a price range.

Delegates to MarketAnalytics.get_orderbook_depth(). See MarketAnalytics.get_orderbook_depth() for complete documentation.

Return type:

MarketImpactResponse

async OrderBook.get_order_type_statistics()

Get statistics about different order types processed.

Return type:

dict[str, int]

async OrderBook.get_statistics()[source]

Get comprehensive orderbook statistics.

Delegates to MarketAnalytics.get_statistics(). See MarketAnalytics.get_statistics() for complete documentation.

Return type:

dict[str, Any]

Advanced Market Microstructure

Liquidity Analysis

async OrderBook.get_liquidity_levels(min_volume=100, levels=20)[source]

Identify significant liquidity levels in the orderbook.

Delegates to MarketAnalytics.get_liquidity_levels(). See MarketAnalytics.get_liquidity_levels() for complete documentation.

Return type:

dict[str, Any]

async OrderBook.detect_order_clusters(min_cluster_size=3, price_tolerance=0.1)[source]

Detect clusters of orders at similar price levels.

Delegates to OrderDetection.detect_order_clusters(). See OrderDetection.detect_order_clusters() for complete documentation.

Return type:

list[dict[str, Any]]

async OrderBook.get_market_imbalance(levels=10)[source]

Calculate order flow imbalance between bid and ask sides.

Delegates to MarketAnalytics.get_market_imbalance(). See MarketAnalytics.get_market_imbalance() for complete documentation.

Return type:

LiquidityAnalysisResponse

Trade Flow Analysis

async OrderBook.get_trade_flow_summary()[source]

Get comprehensive trade flow statistics.

Delegates to MarketAnalytics.get_trade_flow_summary(). See MarketAnalytics.get_trade_flow_summary() for complete documentation.

Return type:

dict[str, Any]

async OrderBook.get_cumulative_delta(time_window_minutes=60)[source]

Get cumulative delta (buy volume - sell volume) over time window.

Delegates to MarketAnalytics.get_cumulative_delta(). See MarketAnalytics.get_cumulative_delta() for complete documentation.

Return type:

dict[str, Any]

async OrderBook.get_volume_profile(time_window_minutes=60, price_bins=20)[source]

Calculate volume profile showing volume distribution by price.

Delegates to VolumeProfile.get_volume_profile(). See VolumeProfile.get_volume_profile() for complete documentation.

Return type:

dict[str, Any]

Institutional Features

async OrderBook.detect_iceberg_orders(min_refreshes=None, volume_threshold=None, time_window_minutes=None)[source]

Detect potential iceberg orders based on price level refresh patterns.

Delegates to OrderDetection.detect_iceberg_orders(). See OrderDetection.detect_iceberg_orders() for complete documentation.

Return type:

dict[str, Any]

async OrderBook.get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]

Identify support and resistance levels based on price history.

Delegates to VolumeProfile.get_support_resistance_levels(). See VolumeProfile.get_support_resistance_levels() for complete documentation.

Return type:

dict[str, Any]

Comprehensive Analysis

async OrderBook.get_advanced_market_metrics()[source]

Calculate advanced market microstructure metrics.

Delegates to OrderDetection.get_advanced_market_metrics(). See OrderDetection.get_advanced_market_metrics() for complete documentation.

Return type:

OrderbookAnalysisResponse

Event Management

async OrderBook.add_callback(event_type, callback)

Register a callback for orderbook events.

This method allows client code to register callbacks that will be triggered when specific orderbook events occur. Callbacks can be either synchronous functions or asynchronous coroutines. When an event occurs, all registered callbacks for that event type will be executed with the event data.

Supported event types: - “depth_update”: Triggered when a price level is updated - “trade”: Triggered when a new trade is processed - “best_bid_change”: Triggered when the best bid price changes - “best_ask_change”: Triggered when the best ask price changes - “spread_change”: Triggered when the bid-ask spread changes - “reset”: Triggered when the orderbook is reset

Parameters:
  • event_type (str) – The type of event to listen for (from the list above)

  • callback (Callable[[dict[str, Any]], Coroutine[Any, Any, None]] | Callable[[dict[str, Any]], None]) – A callable function or coroutine that will receive the event data. The callback should accept a single parameter: a dictionary containing the event data specific to that event type.

Return type:

None

Example

>>> # V3: DEPRECATED - Use EventBus instead
>>> # Old callback style (deprecated):
>>> # await orderbook.add_callback("trade", on_trade)
>>> # V3: Modern EventBus approach
>>> from project_x_py.events import EventBus, EventType
>>> event_bus = EventBus()
>>> @event_bus.on(EventType.TRADE_TICK)
>>> async def on_trade(data):
...     print(
...         f"Trade: {data['size']} @ {data['price']} ({data['side']})"
...     )  # V3: actual field names
>>> @event_bus.on(EventType.MARKET_DEPTH_UPDATE)
>>> async def on_depth_change(data):
...     print(
...         f"New best bid: {data['bids'][0]['price'] if data['bids'] else 'None'}"
...     )
>>> # V3: Events automatically flow through EventBus
async OrderBook.remove_callback(event_type, callback)

Remove a registered callback.

Return type:

None

Usage Examples

Basic OrderBook Analysis

import asyncio
from project_x_py import OrderBook, ProjectX, create_realtime_client

async def basic_orderbook_analysis():
    async with ProjectX.from_env() as client:
        await client.authenticate()

        # Get instrument details
        instrument = await client.get_instrument("MGC")

        # Initialize orderbook for Gold futures
        orderbook = OrderBook(instrument, timezone="America/Chicago")

        # Connect to real-time data for live orderbook
        realtime_client = create_realtime_client(client.session_token)
        await realtime_client.connect()

        # Subscribe to market depth
        await realtime_client.subscribe_market_depth(instrument.id)

        # Wait a moment for data to populate
        await asyncio.sleep(2)

        # Get current market depth
        snapshot = await orderbook.get_orderbook_snapshot(levels=5)

        print("=== Market Depth ===")
        print(f"Best Bid: ${snapshot['metadata']['best_bid']:.2f}")
        print(f"Best Ask: ${snapshot['metadata']['best_ask']:.2f}")
        print(f"Spread: ${snapshot['metadata']['spread']:.2f}")
        print(f"Mid Price: ${snapshot['metadata']['mid_price']:.2f}")

        print("\n=== Top 5 Bids ===")
        for bid in snapshot['bids'].to_dicts():
            print(f"${bid['price']:.2f} x {bid['volume']}")

        print("\n=== Top 5 Asks ===")
        for ask in snapshot['asks'].to_dicts():
            print(f"${ask['price']:.2f} x {ask['volume']}")

asyncio.run(basic_orderbook_analysis())

Advanced Market Analysis

async def advanced_analysis():
    # Assumes orderbook is already initialized and connected
    # Comprehensive market microstructure analysis
    metrics = await orderbook.get_advanced_market_metrics()

    # Market imbalance analysis
    imbalance = metrics['market_imbalance']
    print(f"Market Direction: {imbalance['direction']}")
    print(f"Imbalance Ratio: {imbalance['imbalance_ratio']:.3f}")
    print(f"Confidence: {imbalance['confidence']}")

    # Liquidity analysis
    liquidity = metrics['liquidity_analysis']
    print(f"\nSignificant Bid Levels: {len(liquidity['bid_liquidity'])}")
    print(f"Significant Ask Levels: {len(liquidity['ask_liquidity'])}")

    # Volume profile
    volume_profile = metrics['volume_profile']
    if volume_profile['poc']:
        print(f"\nPoint of Control: ${volume_profile['poc']['price']:.2f}")
        print(f"POC Volume: {volume_profile['poc']['volume']}")

Iceberg Order Detection

async def detect_icebergs():
    # Advanced iceberg detection with statistical analysis
    icebergs = await orderbook.detect_iceberg_orders_advanced(
        time_window_minutes=30,
        min_refresh_count=5,
        volume_consistency_threshold=0.85,
        statistical_confidence=0.95
    )

    print("=== Iceberg Detection Results ===")
    print(f"Potential Icebergs Found: {len(icebergs.get('potential_icebergs', []))}")

    for iceberg in icebergs.get('potential_icebergs', []):
        print(f"\nPrice: ${iceberg['price']:.2f}")
        print(f"Confidence: {iceberg['confidence']:.2%}")
        print(f"Total Volume: {iceberg['total_volume_seen']}")
        print(f"Refresh Count: {iceberg['refresh_count']}")
        print(f"Statistical Score: {iceberg['statistical_score']:.3f}")

Support and Resistance Analysis

async def analyze_support_resistance():
    # Dynamic support and resistance identification
    levels = await orderbook.get_support_resistance_levels(lookback_minutes=60)

    print("=== Support Levels ===")
    for level in levels['support_levels'][:3]:  # Top 3 support levels
        print(f"${level['price']:.2f} - Strength: {level['strength']:.2f} - Volume: {level['volume']}")

    print("\n=== Resistance Levels ===")
    for level in levels['resistance_levels'][:3]:  # Top 3 resistance levels
        print(f"${level['price']:.2f} - Strength: {level['strength']:.2f} - Volume: {level['volume']}")

Real-time Integration

import asyncio
from project_x_py import ProjectX, create_realtime_client, OrderBook

async def setup_realtime_orderbook():
    async with ProjectX.from_env() as client:
        await client.authenticate()

        # Get instrument
        instrument = await client.get_instrument("MGC")

        # Set up real-time data with orderbook callbacks
        rt_client = create_realtime_client(client.session_token)
        orderbook = OrderBook(instrument)

        async def on_market_depth_update(data):
            """Handle real-time market depth updates"""
            await orderbook.process_market_depth(data)

            # Get updated metrics
            snapshot = await orderbook.get_orderbook_snapshot(levels=5)
            best_bid = snapshot['metadata']['best_bid']
            best_ask = snapshot['metadata']['best_ask']

            print(f"Updated: ${best_bid:.2f} x ${best_ask:.2f}")

        # Register callback for market depth updates
        orderbook.add_callback('market_depth', on_market_depth_update)

        # Connect and subscribe to real-time market data
        await rt_client.connect()
        await rt_client.subscribe_market_depth(instrument.id)

        # Keep running for 60 seconds
        await asyncio.sleep(60)

asyncio.run(setup_realtime_orderbook())

Market Imbalance Strategy

import asyncio

async def monitor_market_imbalance(orderbook):
    """Monitor market for trading opportunities based on imbalance"""

    while True:
        # Get current market metrics
        imbalance = await orderbook.get_market_imbalance()

        # Check for significant imbalance
        if abs(imbalance['imbalance_ratio']) > 0.3:  # 30% imbalance threshold
            direction = imbalance['direction']
            confidence = imbalance['confidence']

            print(f"SIGNAL: {direction.upper()} imbalance detected")
            print(f"Ratio: {imbalance['imbalance_ratio']:.3f}")
            print(f"Confidence: {confidence}")

            # Additional confirmation from volume profile
            volume_metrics = await orderbook.get_volume_profile()
            if volume_metrics['poc']:
                bid_ask = await orderbook.get_best_bid_ask()
                current_price = bid_ask['mid']
                poc_price = volume_metrics['poc']['price']

                if direction == 'bullish' and current_price > poc_price:
                    print("✅ Volume profile confirms bullish signal")
                elif direction == 'bearish' and current_price < poc_price:
                    print("✅ Volume profile confirms bearish signal")

        await asyncio.sleep(1)  # Check every second

# Run monitoring as part of main async flow
# await monitor_market_imbalance(orderbook)

Data Structures

OrderBook Snapshot

The get_orderbook_snapshot() method returns:

{
    "bids": pl.DataFrame,           # Bid levels with price, volume, timestamp
    "asks": pl.DataFrame,           # Ask levels with price, volume, timestamp
    "metadata": {
        "best_bid": float,          # Highest bid price
        "best_ask": float,          # Lowest ask price
        "spread": float,            # Bid-ask spread
        "mid_price": float,         # Mid-market price
        "total_bid_volume": int,    # Total volume on bid side
        "total_ask_volume": int,    # Total volume on ask side
        "last_update": datetime,    # Last update timestamp
        "levels_count": {           # Number of price levels
            "bids": int,
            "asks": int
        }
    }
}

Market Imbalance Data

The get_market_imbalance() method returns:

{
    "imbalance_ratio": float,       # -1 to 1 (negative = bearish, positive = bullish)
    "direction": str,               # "bullish", "bearish", or "neutral"
    "confidence": str,              # "low", "medium", "high"
    "bid_pressure": float,          # Normalized bid pressure
    "ask_pressure": float,          # Normalized ask pressure
    "trade_flow_confirmation": float # Trade flow alignment
}

Iceberg Detection Result

The detect_iceberg_orders_advanced() method returns:

{
    "potential_icebergs": [
        {
            "price": float,                    # Price level
            "confidence": float,               # Detection confidence (0-1)
            "total_volume_seen": int,          # Total volume observed
            "refresh_count": int,              # Number of refreshes
            "avg_visible_size": float,         # Average visible size
            "volume_consistency": float,       # Volume consistency score
            "statistical_score": float,       # Statistical significance
            "side": str                        # "bid" or "ask"
        }
    ],
    "analysis_summary": {
        "total_suspicious_levels": int,
        "confidence_threshold": float,
        "time_window_analyzed": int
    }
}

Volume Profile Data

The get_volume_profile() method returns:

{
    "profile": [
        {
            "avg_price": float,        # Average price for bucket
            "total_volume": int,       # Total volume traded
            "buy_volume": int,         # Buy-side volume
            "sell_volume": int,        # Sell-side volume
            "trade_count": int         # Number of trades
        }
    ],
    "poc": {                          # Point of Control (highest volume)
        "price": float,
        "volume": int
    },
    "value_area": {                   # 70% of volume area
        "upper": float,
        "lower": float,
        "volume_percentage": float
    }
}

Performance Considerations

Memory Management

The OrderBook automatically manages memory by:

  • Limiting orderbook levels to top 100 per side

  • Maintaining rolling windows for trade data

  • Automatic cleanup of old data beyond time windows

# Configure memory settings
orderbook = OrderBook(
    "MGC",
    timezone="America/Chicago"
)

# The orderbook automatically:
# - Keeps top 100 levels per side
# - Maintains trade history for analysis windows
# - Cleans up data older than analysis periods

Async Concurrency

The OrderBook supports concurrent async access:

import asyncio

async def worker_task(orderbook, task_id):
    # Safe to call concurrently
    snapshot = await orderbook.get_orderbook_snapshot()
    metrics = await orderbook.get_advanced_market_metrics()
    print(f"Task {task_id} completed")

# Multiple async tasks can access orderbook concurrently
async def run_concurrent_tasks(orderbook):
    tasks = [worker_task(orderbook, i) for i in range(5)]
    await asyncio.gather(*tasks)

Optimization Tips

  1. Batch Analysis: Use get_advanced_market_metrics() for comprehensive analysis in one call

  2. Selective Levels: Request only needed levels (e.g., levels=5 instead of default 10)

  3. Time Windows: Adjust analysis time windows based on trading timeframe

  4. Callback Efficiency: Keep callback functions lightweight for real-time performance

# Efficient: Get all metrics at once
metrics = await orderbook.get_advanced_market_metrics()

# Less efficient: Multiple separate calls
imbalance = await orderbook.get_market_imbalance()
liquidity = await orderbook.get_liquidity_levels()
profile = await orderbook.get_volume_profile()

Error Handling

async def safe_orderbook_analysis():
    try:
        # OrderBook operations
        snapshot = await orderbook.get_orderbook_snapshot()

        if not snapshot['metadata']['best_bid']:
            print("No bid data available")
            return

        # Advanced analysis
        icebergs = await orderbook.detect_iceberg_orders_advanced()

    except Exception as e:
        print(f"OrderBook error: {e}")
        # Handle gracefully - orderbook returns empty/default data on errors

Integration with Trading

from project_x_py import ProjectX, create_order_manager, create_realtime_client

async def smart_limit_order():
    """Place limit order using orderbook analysis"""

    async with ProjectX.from_env() as client:
        await client.authenticate()

        # Get instrument and create components
        instrument = await client.get_instrument("MGC")
        realtime_client = create_realtime_client(client.session_token)
        order_manager = create_order_manager(client, realtime_client)
        orderbook = OrderBook(instrument)

        # Connect and get market data
        await realtime_client.connect()
        await realtime_client.subscribe_market_depth(instrument.id)
        await asyncio.sleep(2)  # Wait for data

        # Analyze current market structure
        snapshot = await orderbook.get_orderbook_snapshot(levels=10)
        imbalance = await orderbook.get_market_imbalance()
        liquidity = await orderbook.get_liquidity_levels()

        best_bid = snapshot['metadata']['best_bid']
        best_ask = snapshot['metadata']['best_ask']

        # Determine optimal order placement
        if imbalance['direction'] == 'bullish':
            # Place buy order closer to mid for better fill probability
            target_price = best_bid + (snapshot['metadata']['spread'] * 0.3)
            side = 0  # Buy
        else:
            # Place sell order closer to mid
            target_price = best_ask - (snapshot['metadata']['spread'] * 0.3)
            side = 1  # Sell

        # Check for significant liquidity levels
        liquidity_side = 'bid_liquidity' if side == 0 else 'ask_liquidity'
        for level in liquidity[liquidity_side].to_dicts():
            if abs(level['price'] - target_price) < 0.5:  # Within 0.5 of liquidity
                target_price = level['price']  # Join the liquidity
                break

        # Place the order
        response = await order_manager.place_limit_order(
            contract_id=instrument.id,
            side=side,
            size=1,
            limit_price=target_price
        )

        print(f"Order placed at ${target_price:.2f} based on orderbook analysis")
        return response

# Run the smart order placement
asyncio.run(smart_limit_order())

Best Practices

  1. Real-time Data: OrderBook requires real-time market depth data for accurate analysis

  2. Time Windows: Adjust analysis time windows based on market volatility and trading timeframe

  3. Statistical Confidence: Use higher confidence thresholds for iceberg detection in volatile markets

  4. Market Hours: Results are most reliable during active trading hours

  5. Instrument-Specific: Different instruments may require parameter tuning for optimal detection

# Example: Configure for different market conditions

async def configure_for_market_conditions():
    async with ProjectX.from_env() as client:
        await client.authenticate()

        # High-frequency scalping (short windows)
        mnq = await client.get_instrument("MNQ")
        scalping_orderbook = OrderBook(mnq)
        icebergs = await scalping_orderbook.detect_iceberg_orders_advanced(
            time_window_minutes=5,      # Short window
            min_refresh_count=3,        # Lower threshold
            statistical_confidence=0.90 # Slightly lower confidence
        )

        # Position trading (longer windows)
        mgc = await client.get_instrument("MGC")
        position_orderbook = OrderBook(mgc)
        icebergs = await position_orderbook.detect_iceberg_orders_advanced(
            time_window_minutes=120,    # Longer window
            min_refresh_count=10,       # Higher threshold
            statistical_confidence=0.95 # Higher confidence
        )