Data API

Real-time market data, Level 2 orderbook analysis, and institutional-grade market microstructure tools.

Real-time Data Management

Real-time Client

class ProjectXRealtimeClient(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]

Bases: ConnectionManagementMixin, EventHandlingMixin, SubscriptionsMixin

Async real-time client for ProjectX Gateway API WebSocket connections.

This class provides an async interface for ProjectX SignalR connections and forwards all events to registered managers. It does NOT cache data or perform business logic - that’s handled by the specialized managers.

Features:
  • Async SignalR WebSocket connections to ProjectX Gateway hubs

  • Event forwarding to registered async managers

  • Automatic reconnection with exponential backoff

  • JWT token refresh and reconnection

  • Connection health monitoring

  • Async event callbacks

  • Thread-safe event processing and callback execution

  • Comprehensive connection statistics and health tracking

Architecture:
  • Pure event forwarding (no business logic)

  • No data caching (handled by managers)

  • No payload parsing (managers handle ProjectX formats)

  • Minimal stateful operations

  • Mixin-based design for modular functionality

Real-time Hubs (per ProjectX Gateway docs):
  • User Hub: Account, position, and order updates

  • Market Hub: Quote, trade, and market depth data

Connection Management:
  • Dual-hub SignalR connections with automatic reconnection

  • JWT token authentication via Authorization headers

  • Connection health monitoring and error handling

  • Thread-safe operations with proper lock management

Event Processing:
  • Cross-thread event scheduling for asyncio compatibility

  • Support for both async and sync callbacks

  • Error isolation to prevent callback failures

  • Event statistics and flow monitoring

Example

>>> # V3: Create async client with factory function
>>> client = await create_realtime_client(jwt_token, account_id)
>>> # V3: Register async callbacks for event handling
>>> async def handle_position(data):
...     print(f"Position: {data.get('contractId')} - {data.get('netPos')}")
>>> async def handle_order(data):
...     print(f"Order {data.get('id')}: {data.get('status')}")
>>> async def handle_quote(data):
...     print(f"Quote: {data.get('bid')} x {data.get('ask')}")
>>> await client.add_callback("position_update", handle_position)
>>> await client.add_callback("order_update", handle_order)
>>> await client.add_callback("quote_update", handle_quote)
>>>
>>> # V3: Connect and subscribe with error handling
>>> if await client.connect():
...     await client.subscribe_user_updates()
...     await client.subscribe_market_data(["MGC", "MNQ"])
... else:
...     print("Connection failed")
Event Types (per ProjectX Gateway docs):

User Hub: GatewayUserAccount, GatewayUserPosition, GatewayUserOrder, GatewayUserTrade Market Hub: GatewayQuote, GatewayDepth, GatewayTrade

Integration:
  • AsyncPositionManager handles position events and caching

  • AsyncOrderManager handles order events and tracking

  • AsyncRealtimeDataManager handles market data and caching

  • This client only handles connections and event forwarding

__init__(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]

Initialize async ProjectX real-time client with configurable SignalR connections.

Creates a dual-hub SignalR client for real-time ProjectX Gateway communication. Handles both user-specific events (positions, orders) and market data (quotes, trades).

Parameters:
  • jwt_token (str) – JWT authentication token from AsyncProjectX.authenticate(). Must be valid and not expired for successful connection.

  • account_id (str) – ProjectX account ID for user-specific subscriptions. Used to filter position, order, and trade events.

  • user_hub_url (str, optional) – Override URL for user hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).

  • market_hub_url (str, optional) – Override URL for market hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).

  • config (ProjectXConfig, optional) – Configuration object with hub URLs. Provides default URLs if direct URLs not specified. Defaults to None (uses TopStepX defaults).

URL Priority:
  1. Direct parameters (user_hub_url, market_hub_url)

  2. Config URLs (config.user_hub_url, config.market_hub_url)

  3. Default TopStepX endpoints

Example

>>> # V3: Using factory function (recommended)
>>> client = await create_realtime_client(
...     jwt_token=client.get_session_token(),
...     account_id=str(client.get_account_info().id),
... )
>>> # V3: Using direct instantiation with default endpoints
>>> client = ProjectXRealtimeClient(jwt_token=jwt_token, account_id="12345")
>>>
>>> # V3: Using custom config for different environments
>>> from project_x_py.models import ProjectXConfig
>>> config = ProjectXConfig(
...     user_hub_url="https://gateway.topstepx.com/hubs/user",
...     market_hub_url="https://gateway.topstepx.com/hubs/market",
... )
>>> client = ProjectXRealtimeClient(
...     jwt_token=jwt_token, account_id="12345", config=config
... )
>>>
>>> # V3: Override specific URL for testing
>>> client = ProjectXRealtimeClient(
...     jwt_token=jwt_token,
...     account_id="12345",
...     market_hub_url="https://test.topstepx.com/hubs/market",
... )

Note

  • JWT token is passed securely via Authorization header

  • Both hubs must connect successfully for full functionality

  • SignalR connections are established lazily on connect()

Orderbook Analysis

Note

Level 2 OrderBook API: For comprehensive Level 2 orderbook analysis including iceberg detection, market imbalance analysis, and institutional-grade market microstructure tools, see the dedicated OrderBook API module.

class OrderBook(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Bases: OrderBookBase

Async Level 2 Orderbook with comprehensive market analysis.

This class combines all orderbook functionality into a single interface, providing a unified API for accessing real-time market depth data, advanced analytics, detection algorithms, and volume profiling. It uses a component-based architecture where specialized functionality is delegated to dedicated components while maintaining a simple, cohesive interface for the client code.

Key Components:
  • realtime_handler: Manages WebSocket connections and real-time data processing

  • analytics: Provides market analytics (imbalance, depth, delta, liquidity)

  • detection: Implements detection algorithms (iceberg, clusters)

  • profile: Handles volume profiling and support/resistance analysis

  • memory_manager: Manages memory usage and cleanup tasks

Thread Safety:

All methods are thread-safe and can be called concurrently from multiple asyncio tasks. Data consistency is maintained through internal locks.

Memory Management:

The orderbook implements automatic memory management through the MemoryManager component, which periodically cleans up historical data based on configurable parameters to prevent memory leaks during long-running sessions.

Real-time Features:
  • WebSocket-based Level 2 market depth updates

  • Immediate trade execution detection and classification

  • Real-time spread and price level monitoring

  • Event-driven callback system for custom logic

  • Automatic data validation and error handling

Analytics Capabilities:
  • Market imbalance analysis and ratio calculations

  • Orderbook depth analysis within price ranges

  • Cumulative delta tracking and trade flow statistics

  • Liquidity level identification and concentration analysis

  • Comprehensive orderbook statistics and health metrics

Detection Algorithms:
  • Iceberg order detection with confidence scoring

  • Order clustering analysis for institutional activity

  • Advanced market microstructure metrics

  • Hidden liquidity and volume pattern recognition

Example

>>> # V3: Create orderbook with EventBus
>>> event_bus = EventBus()
>>> orderbook = OrderBook("MNQ", event_bus, project_x_client)
>>> await orderbook.initialize(realtime_client)
>>>
>>> # V3: Register event handlers
>>> @event_bus.on(EventType.MARKET_DEPTH_UPDATE)
>>> async def handle_depth(data):
...     print(f"Depth: {data['bids'][0]['price']} @ {data['bids'][0]['size']}")
>>>
>>> # Get basic orderbook data
>>> snapshot = await orderbook.get_orderbook_snapshot()
>>> print(f"Spread: {snapshot['spread']}")
>>>
>>> # Advanced analytics
>>> imbalance = await orderbook.get_market_imbalance()
>>> liquidity = await orderbook.get_liquidity_levels()
>>>
>>> # Detection algorithms
>>> icebergs = await orderbook.detect_iceberg_orders()
>>> clusters = await orderbook.detect_order_clusters()
>>>
>>> # Volume profiling
>>> profile = await orderbook.get_volume_profile()
>>> support_resistance = await orderbook.get_support_resistance_levels()
>>>
>>> # Cleanup when done
>>> await orderbook.cleanup()
__init__(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Initialize the orderbook.

Parameters:
  • instrument (str) – Trading instrument symbol

  • event_bus (Any) – EventBus instance for unified event handling. Required for all event emissions including market depth updates and trade ticks.

  • project_x (ProjectXBase | None) – Optional ProjectX client for tick size lookup

  • timezone_str (str) – Timezone for timestamps (default: America/Chicago)

  • config (OrderbookConfig | None) – Optional configuration for orderbook behavior

async initialize(realtime_client=None, subscribe_to_depth=True, subscribe_to_quotes=True)[source]

Initialize the orderbook with optional real-time data feed.

This method configures the orderbook for operation, sets up the memory manager, and optionally connects to the real-time data feed. It must be called after creating an OrderBook instance and before using any other methods.

The initialization process performs the following steps: 1. Starts the memory manager for automatic cleanup 2. If a realtime_client is provided:

  • Registers callbacks for market depth and quote updates

  • Subscribes to the specified data channels

  • Sets up WebSocket connection handlers

Parameters:
  • realtime_client (ProjectXRealtimeClient | None) – Async real-time client for WebSocket data. If provided, the orderbook will receive live market data updates. If None, the orderbook will function in historical/static mode only.

  • subscribe_to_depth (bool) – Subscribe to market depth updates (Level 2 data). Set to False only if you don’t need full order book data.

  • subscribe_to_quotes (bool) – Subscribe to quote updates (top of book data). Set to False only if you don’t need quote data.

Returns:

True if initialization successful, False if any part of the

initialization failed.

Return type:

bool

Example

>>> # V3: Initialize with EventBus and realtime client
>>> event_bus = EventBus()
>>> orderbook = OrderBook("MNQ", event_bus, client)
>>> # V3: Create realtime client with factory
>>> realtime_client = await create_realtime_client(
...     jwt_token=client.jwt_token, account_id=str(client.account_id)
... )
>>> success = await orderbook.initialize(
...     realtime_client=realtime_client,
...     subscribe_to_depth=True,
...     subscribe_to_quotes=True,
... )
>>> if success:
...     print("Orderbook initialized and receiving real-time data")
... else:
...     print("Failed to initialize orderbook")
async get_market_imbalance(levels=10)[source]

Calculate order flow imbalance between bid and ask sides.

Delegates to MarketAnalytics.get_market_imbalance(). See MarketAnalytics.get_market_imbalance() for complete documentation.

Return type:

LiquidityAnalysisResponse

async get_orderbook_depth(price_range)[source]

Analyze orderbook depth within a price range.

Delegates to MarketAnalytics.get_orderbook_depth(). See MarketAnalytics.get_orderbook_depth() for complete documentation.

Return type:

MarketImpactResponse

async get_cumulative_delta(time_window_minutes=60)[source]

Get cumulative delta (buy volume - sell volume) over time window.

Delegates to MarketAnalytics.get_cumulative_delta(). See MarketAnalytics.get_cumulative_delta() for complete documentation.

Return type:

dict[str, Any]

async get_trade_flow_summary()[source]

Get comprehensive trade flow statistics.

Delegates to MarketAnalytics.get_trade_flow_summary(). See MarketAnalytics.get_trade_flow_summary() for complete documentation.

Return type:

dict[str, Any]

async get_liquidity_levels(min_volume=100, levels=20)[source]

Identify significant liquidity levels in the orderbook.

Delegates to MarketAnalytics.get_liquidity_levels(). See MarketAnalytics.get_liquidity_levels() for complete documentation.

Return type:

dict[str, Any]

async get_statistics()[source]

Get comprehensive orderbook statistics.

Delegates to MarketAnalytics.get_statistics(). See MarketAnalytics.get_statistics() for complete documentation.

Return type:

dict[str, Any]

async detect_iceberg_orders(min_refreshes=None, volume_threshold=None, time_window_minutes=None)[source]

Detect potential iceberg orders based on price level refresh patterns.

Delegates to OrderDetection.detect_iceberg_orders(). See OrderDetection.detect_iceberg_orders() for complete documentation.

Return type:

dict[str, Any]

async detect_order_clusters(min_cluster_size=3, price_tolerance=0.1)[source]

Detect clusters of orders at similar price levels.

Delegates to OrderDetection.detect_order_clusters(). See OrderDetection.detect_order_clusters() for complete documentation.

Return type:

list[dict[str, Any]]

async get_advanced_market_metrics()[source]

Calculate advanced market microstructure metrics.

Delegates to OrderDetection.get_advanced_market_metrics(). See OrderDetection.get_advanced_market_metrics() for complete documentation.

Return type:

OrderbookAnalysisResponse

async get_volume_profile(time_window_minutes=60, price_bins=20)[source]

Calculate volume profile showing volume distribution by price.

Delegates to VolumeProfile.get_volume_profile(). See VolumeProfile.get_volume_profile() for complete documentation.

Return type:

dict[str, Any]

async get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]

Identify support and resistance levels based on price history.

Delegates to VolumeProfile.get_support_resistance_levels(). See VolumeProfile.get_support_resistance_levels() for complete documentation.

Return type:

dict[str, Any]

async get_spread_analysis(window_minutes=30)[source]

Analyze bid-ask spread patterns over time.

Delegates to VolumeProfile.get_spread_analysis(). See VolumeProfile.get_spread_analysis() for complete documentation.

Return type:

LiquidityAnalysisResponse

async get_memory_stats()[source]

Get comprehensive memory usage statistics.

Delegates to MemoryManager.get_memory_stats(). See MemoryManager.get_memory_stats() for complete documentation.

Return type:

OrderbookStats

async cleanup()[source]

Clean up resources and disconnect from real-time feeds.

Return type:

None

Data Factory Functions

create_orderbook(instrument, event_bus, project_x=None, realtime_client=None, timezone_str='America/Chicago')[source]

Factory function to create an orderbook.

This factory function creates and returns an OrderBook instance for the specified instrument. It simplifies the process of creating an orderbook by handling the initial configuration. Note that the returned orderbook is not yet initialized - you must call the initialize() method separately to start the orderbook’s functionality.

The factory approach provides several benefits: 1. Ensures consistent orderbook creation across the application 2. Allows for future extension with pre-configured orderbook variants 3. Simplifies the API for common use cases

Parameters:
  • instrument (str) – Trading instrument symbol (e.g., “ES”, “NQ”, “MES”, “MNQ”). This should be the base symbol without contract-specific extensions.

  • project_x (ProjectXBase | None) – Optional AsyncProjectX client for tick size lookup and API access. If provided, the orderbook will be able to look up tick sizes and other contract details automatically.

  • realtime_client (ProjectXRealtimeClient | None) – Optional real-time client for WebSocket data. This is kept for compatibility but should be passed to initialize() instead.

  • timezone_str (str) – Timezone for timestamps (default: “America/Chicago”). All timestamps in the orderbook will be converted to this timezone.

Returns:

Orderbook instance that must be initialized with a call to initialize() before use.

Return type:

OrderBook

Example

>>> # V3: Create an orderbook with EventBus
>>> event_bus = EventBus()
>>> orderbook = create_orderbook(
...     instrument="MNQ",  # V3: Using actual contract symbols
...     event_bus=event_bus,
...     project_x=client,
...     timezone_str="America/Chicago",  # V3: Using CME timezone
... )
>>>
>>> # V3: Initialize with factory-created realtime client
>>> realtime_client = await create_realtime_client(
...     jwt_token=client.jwt_token, account_id=str(client.account_id)
... )
>>> await orderbook.initialize(realtime_client=realtime_client)
>>>
>>> # Start using the orderbook
>>> snapshot = await orderbook.get_orderbook_snapshot()

Instrument Models

class Instrument(id, name, description, tickSize, tickValue, activeContract, symbolId=None)[source]

Bases: object

Represents a tradeable financial instrument/contract.

id

Unique contract identifier used in API calls

Type:

str

name

Contract name/symbol (e.g., “MGCH25”)

Type:

str

description

Human-readable description of the contract

Type:

str

tickSize

Minimum price movement (e.g., 0.1)

Type:

float

tickValue

Dollar value per tick movement

Type:

float

activeContract

Whether the contract is currently active for trading

Type:

bool

Example

>>> print(f"Trading {instrument.name}")
>>> print(
...     f"Tick size: ${instrument.tickSize}, Tick value: ${instrument.tickValue}"
... )
id: str
name: str
description: str
tickSize: float
tickValue: float
activeContract: bool
symbolId: str | None = None
__init__(id, name, description, tickSize, tickValue, activeContract, symbolId=None)

Note

Technical Indicators: For comprehensive technical analysis with 55+ indicators, see the Technical Indicators module which provides TA-Lib compatible indicators optimized for Polars DataFrames.

Data Utilities

create_data_snapshot(data, description='')[source]

Create a comprehensive snapshot of DataFrame for debugging/analysis.

Parameters:
  • data (DataFrame) – Polars DataFrame

  • description (str) – Optional description

Returns:

Data snapshot with statistics

Return type:

dict

Example

>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data")
>>> print(f"Rows: {snapshot['row_count']}")
>>> print(f"Timespan: {snapshot['timespan']}")
convert_timeframe_to_seconds(timeframe)[source]

Convert timeframe string to seconds.

Parameters:

timeframe (str) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)

Returns:

Timeframe in seconds

Return type:

int

Example

>>> convert_timeframe_to_seconds("5min")
300
>>> convert_timeframe_to_seconds("1hr")
3600
get_market_session_info(timezone='America/Chicago')[source]

Get detailed market session information.

Parameters:

timezone (str) – Market timezone

Returns:

Market session details

Return type:

dict

Example

>>> info = get_market_session_info()
>>> print(f"Market open: {info['is_open']}")
>>> print(f"Next session: {info['next_session_start']}")
is_market_hours(timezone='America/Chicago')[source]

Check if it’s currently market hours (CME futures).

Parameters:

timezone (str) – Timezone to check (default: CME time)

Returns:

True if market is open

Return type:

bool