Data API
Real-time market data, Level 2 orderbook analysis, and institutional-grade market microstructure tools.
Real-time Data Management
Real-time Client
- class ProjectXRealtimeClient(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]
Bases:
ConnectionManagementMixin
,EventHandlingMixin
,SubscriptionsMixin
Async real-time client for ProjectX Gateway API WebSocket connections.
This class provides an async interface for ProjectX SignalR connections and forwards all events to registered managers. It does NOT cache data or perform business logic - that’s handled by the specialized managers.
- Features:
Async SignalR WebSocket connections to ProjectX Gateway hubs
Event forwarding to registered async managers
Automatic reconnection with exponential backoff
JWT token refresh and reconnection
Connection health monitoring
Async event callbacks
Thread-safe event processing and callback execution
Comprehensive connection statistics and health tracking
- Architecture:
Pure event forwarding (no business logic)
No data caching (handled by managers)
No payload parsing (managers handle ProjectX formats)
Minimal stateful operations
Mixin-based design for modular functionality
- Real-time Hubs (per ProjectX Gateway docs):
User Hub: Account, position, and order updates
Market Hub: Quote, trade, and market depth data
- Connection Management:
Dual-hub SignalR connections with automatic reconnection
JWT token authentication via Authorization headers
Connection health monitoring and error handling
Thread-safe operations with proper lock management
- Event Processing:
Cross-thread event scheduling for asyncio compatibility
Support for both async and sync callbacks
Error isolation to prevent callback failures
Event statistics and flow monitoring
Example
>>> # V3: Create async client with factory function >>> client = await create_realtime_client(jwt_token, account_id) >>> # V3: Register async callbacks for event handling >>> async def handle_position(data): ... print(f"Position: {data.get('contractId')} - {data.get('netPos')}") >>> async def handle_order(data): ... print(f"Order {data.get('id')}: {data.get('status')}") >>> async def handle_quote(data): ... print(f"Quote: {data.get('bid')} x {data.get('ask')}") >>> await client.add_callback("position_update", handle_position) >>> await client.add_callback("order_update", handle_order) >>> await client.add_callback("quote_update", handle_quote) >>> >>> # V3: Connect and subscribe with error handling >>> if await client.connect(): ... await client.subscribe_user_updates() ... await client.subscribe_market_data(["MGC", "MNQ"]) ... else: ... print("Connection failed")
- Event Types (per ProjectX Gateway docs):
User Hub: GatewayUserAccount, GatewayUserPosition, GatewayUserOrder, GatewayUserTrade Market Hub: GatewayQuote, GatewayDepth, GatewayTrade
- Integration:
AsyncPositionManager handles position events and caching
AsyncOrderManager handles order events and tracking
AsyncRealtimeDataManager handles market data and caching
This client only handles connections and event forwarding
- __init__(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]
Initialize async ProjectX real-time client with configurable SignalR connections.
Creates a dual-hub SignalR client for real-time ProjectX Gateway communication. Handles both user-specific events (positions, orders) and market data (quotes, trades).
- Parameters:
jwt_token (str) – JWT authentication token from AsyncProjectX.authenticate(). Must be valid and not expired for successful connection.
account_id (str) – ProjectX account ID for user-specific subscriptions. Used to filter position, order, and trade events.
user_hub_url (str, optional) – Override URL for user hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).
market_hub_url (str, optional) – Override URL for market hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).
config (ProjectXConfig, optional) – Configuration object with hub URLs. Provides default URLs if direct URLs not specified. Defaults to None (uses TopStepX defaults).
- URL Priority:
Direct parameters (user_hub_url, market_hub_url)
Config URLs (config.user_hub_url, config.market_hub_url)
Default TopStepX endpoints
Example
>>> # V3: Using factory function (recommended) >>> client = await create_realtime_client( ... jwt_token=client.get_session_token(), ... account_id=str(client.get_account_info().id), ... ) >>> # V3: Using direct instantiation with default endpoints >>> client = ProjectXRealtimeClient(jwt_token=jwt_token, account_id="12345") >>> >>> # V3: Using custom config for different environments >>> from project_x_py.models import ProjectXConfig >>> config = ProjectXConfig( ... user_hub_url="https://gateway.topstepx.com/hubs/user", ... market_hub_url="https://gateway.topstepx.com/hubs/market", ... ) >>> client = ProjectXRealtimeClient( ... jwt_token=jwt_token, account_id="12345", config=config ... ) >>> >>> # V3: Override specific URL for testing >>> client = ProjectXRealtimeClient( ... jwt_token=jwt_token, ... account_id="12345", ... market_hub_url="https://test.topstepx.com/hubs/market", ... )
Note
JWT token is passed securely via Authorization header
Both hubs must connect successfully for full functionality
SignalR connections are established lazily on connect()
Orderbook Analysis
Note
Level 2 OrderBook API: For comprehensive Level 2 orderbook analysis including iceberg detection, market imbalance analysis, and institutional-grade market microstructure tools, see the dedicated OrderBook API module.
- class OrderBook(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]
Bases:
OrderBookBase
Async Level 2 Orderbook with comprehensive market analysis.
This class combines all orderbook functionality into a single interface, providing a unified API for accessing real-time market depth data, advanced analytics, detection algorithms, and volume profiling. It uses a component-based architecture where specialized functionality is delegated to dedicated components while maintaining a simple, cohesive interface for the client code.
- Key Components:
realtime_handler: Manages WebSocket connections and real-time data processing
analytics: Provides market analytics (imbalance, depth, delta, liquidity)
detection: Implements detection algorithms (iceberg, clusters)
profile: Handles volume profiling and support/resistance analysis
memory_manager: Manages memory usage and cleanup tasks
- Thread Safety:
All methods are thread-safe and can be called concurrently from multiple asyncio tasks. Data consistency is maintained through internal locks.
- Memory Management:
The orderbook implements automatic memory management through the MemoryManager component, which periodically cleans up historical data based on configurable parameters to prevent memory leaks during long-running sessions.
- Real-time Features:
WebSocket-based Level 2 market depth updates
Immediate trade execution detection and classification
Real-time spread and price level monitoring
Event-driven callback system for custom logic
Automatic data validation and error handling
- Analytics Capabilities:
Market imbalance analysis and ratio calculations
Orderbook depth analysis within price ranges
Cumulative delta tracking and trade flow statistics
Liquidity level identification and concentration analysis
Comprehensive orderbook statistics and health metrics
- Detection Algorithms:
Iceberg order detection with confidence scoring
Order clustering analysis for institutional activity
Advanced market microstructure metrics
Hidden liquidity and volume pattern recognition
Example
>>> # V3: Create orderbook with EventBus >>> event_bus = EventBus() >>> orderbook = OrderBook("MNQ", event_bus, project_x_client) >>> await orderbook.initialize(realtime_client) >>> >>> # V3: Register event handlers >>> @event_bus.on(EventType.MARKET_DEPTH_UPDATE) >>> async def handle_depth(data): ... print(f"Depth: {data['bids'][0]['price']} @ {data['bids'][0]['size']}") >>> >>> # Get basic orderbook data >>> snapshot = await orderbook.get_orderbook_snapshot() >>> print(f"Spread: {snapshot['spread']}") >>> >>> # Advanced analytics >>> imbalance = await orderbook.get_market_imbalance() >>> liquidity = await orderbook.get_liquidity_levels() >>> >>> # Detection algorithms >>> icebergs = await orderbook.detect_iceberg_orders() >>> clusters = await orderbook.detect_order_clusters() >>> >>> # Volume profiling >>> profile = await orderbook.get_volume_profile() >>> support_resistance = await orderbook.get_support_resistance_levels() >>> >>> # Cleanup when done >>> await orderbook.cleanup()
- __init__(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]
Initialize the orderbook.
- Parameters:
instrument (
str
) – Trading instrument symbolevent_bus (
Any
) – EventBus instance for unified event handling. Required for all event emissions including market depth updates and trade ticks.project_x (
ProjectXBase
|None
) – Optional ProjectX client for tick size lookuptimezone_str (
str
) – Timezone for timestamps (default: America/Chicago)config (
OrderbookConfig
|None
) – Optional configuration for orderbook behavior
- async initialize(realtime_client=None, subscribe_to_depth=True, subscribe_to_quotes=True)[source]
Initialize the orderbook with optional real-time data feed.
This method configures the orderbook for operation, sets up the memory manager, and optionally connects to the real-time data feed. It must be called after creating an OrderBook instance and before using any other methods.
The initialization process performs the following steps: 1. Starts the memory manager for automatic cleanup 2. If a realtime_client is provided:
Registers callbacks for market depth and quote updates
Subscribes to the specified data channels
Sets up WebSocket connection handlers
- Parameters:
realtime_client (
ProjectXRealtimeClient
|None
) – Async real-time client for WebSocket data. If provided, the orderbook will receive live market data updates. If None, the orderbook will function in historical/static mode only.subscribe_to_depth (
bool
) – Subscribe to market depth updates (Level 2 data). Set to False only if you don’t need full order book data.subscribe_to_quotes (
bool
) – Subscribe to quote updates (top of book data). Set to False only if you don’t need quote data.
- Returns:
- True if initialization successful, False if any part of the
initialization failed.
- Return type:
Example
>>> # V3: Initialize with EventBus and realtime client >>> event_bus = EventBus() >>> orderbook = OrderBook("MNQ", event_bus, client) >>> # V3: Create realtime client with factory >>> realtime_client = await create_realtime_client( ... jwt_token=client.jwt_token, account_id=str(client.account_id) ... ) >>> success = await orderbook.initialize( ... realtime_client=realtime_client, ... subscribe_to_depth=True, ... subscribe_to_quotes=True, ... ) >>> if success: ... print("Orderbook initialized and receiving real-time data") ... else: ... print("Failed to initialize orderbook")
- async get_market_imbalance(levels=10)[source]
Calculate order flow imbalance between bid and ask sides.
Delegates to MarketAnalytics.get_market_imbalance(). See MarketAnalytics.get_market_imbalance() for complete documentation.
- Return type:
LiquidityAnalysisResponse
- async get_orderbook_depth(price_range)[source]
Analyze orderbook depth within a price range.
Delegates to MarketAnalytics.get_orderbook_depth(). See MarketAnalytics.get_orderbook_depth() for complete documentation.
- Return type:
MarketImpactResponse
- async get_cumulative_delta(time_window_minutes=60)[source]
Get cumulative delta (buy volume - sell volume) over time window.
Delegates to MarketAnalytics.get_cumulative_delta(). See MarketAnalytics.get_cumulative_delta() for complete documentation.
- async get_trade_flow_summary()[source]
Get comprehensive trade flow statistics.
Delegates to MarketAnalytics.get_trade_flow_summary(). See MarketAnalytics.get_trade_flow_summary() for complete documentation.
- async get_liquidity_levels(min_volume=100, levels=20)[source]
Identify significant liquidity levels in the orderbook.
Delegates to MarketAnalytics.get_liquidity_levels(). See MarketAnalytics.get_liquidity_levels() for complete documentation.
- async get_statistics()[source]
Get comprehensive orderbook statistics.
Delegates to MarketAnalytics.get_statistics(). See MarketAnalytics.get_statistics() for complete documentation.
- async detect_iceberg_orders(min_refreshes=None, volume_threshold=None, time_window_minutes=None)[source]
Detect potential iceberg orders based on price level refresh patterns.
Delegates to OrderDetection.detect_iceberg_orders(). See OrderDetection.detect_iceberg_orders() for complete documentation.
- async detect_order_clusters(min_cluster_size=3, price_tolerance=0.1)[source]
Detect clusters of orders at similar price levels.
Delegates to OrderDetection.detect_order_clusters(). See OrderDetection.detect_order_clusters() for complete documentation.
- async get_advanced_market_metrics()[source]
Calculate advanced market microstructure metrics.
Delegates to OrderDetection.get_advanced_market_metrics(). See OrderDetection.get_advanced_market_metrics() for complete documentation.
- Return type:
OrderbookAnalysisResponse
- async get_volume_profile(time_window_minutes=60, price_bins=20)[source]
Calculate volume profile showing volume distribution by price.
Delegates to VolumeProfile.get_volume_profile(). See VolumeProfile.get_volume_profile() for complete documentation.
- async get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]
Identify support and resistance levels based on price history.
Delegates to VolumeProfile.get_support_resistance_levels(). See VolumeProfile.get_support_resistance_levels() for complete documentation.
- async get_spread_analysis(window_minutes=30)[source]
Analyze bid-ask spread patterns over time.
Delegates to VolumeProfile.get_spread_analysis(). See VolumeProfile.get_spread_analysis() for complete documentation.
- Return type:
LiquidityAnalysisResponse
Data Factory Functions
- create_orderbook(instrument, event_bus, project_x=None, realtime_client=None, timezone_str='America/Chicago')[source]
Factory function to create an orderbook.
This factory function creates and returns an OrderBook instance for the specified instrument. It simplifies the process of creating an orderbook by handling the initial configuration. Note that the returned orderbook is not yet initialized - you must call the initialize() method separately to start the orderbook’s functionality.
The factory approach provides several benefits: 1. Ensures consistent orderbook creation across the application 2. Allows for future extension with pre-configured orderbook variants 3. Simplifies the API for common use cases
- Parameters:
instrument (
str
) – Trading instrument symbol (e.g., “ES”, “NQ”, “MES”, “MNQ”). This should be the base symbol without contract-specific extensions.project_x (
ProjectXBase
|None
) – Optional AsyncProjectX client for tick size lookup and API access. If provided, the orderbook will be able to look up tick sizes and other contract details automatically.realtime_client (
ProjectXRealtimeClient
|None
) – Optional real-time client for WebSocket data. This is kept for compatibility but should be passed to initialize() instead.timezone_str (
str
) – Timezone for timestamps (default: “America/Chicago”). All timestamps in the orderbook will be converted to this timezone.
- Returns:
Orderbook instance that must be initialized with a call to initialize() before use.
- Return type:
Example
>>> # V3: Create an orderbook with EventBus >>> event_bus = EventBus() >>> orderbook = create_orderbook( ... instrument="MNQ", # V3: Using actual contract symbols ... event_bus=event_bus, ... project_x=client, ... timezone_str="America/Chicago", # V3: Using CME timezone ... ) >>> >>> # V3: Initialize with factory-created realtime client >>> realtime_client = await create_realtime_client( ... jwt_token=client.jwt_token, account_id=str(client.account_id) ... ) >>> await orderbook.initialize(realtime_client=realtime_client) >>> >>> # Start using the orderbook >>> snapshot = await orderbook.get_orderbook_snapshot()
Instrument Models
- class Instrument(id, name, description, tickSize, tickValue, activeContract, symbolId=None)[source]
Bases:
object
Represents a tradeable financial instrument/contract.
Example
>>> print(f"Trading {instrument.name}") >>> print( ... f"Tick size: ${instrument.tickSize}, Tick value: ${instrument.tickValue}" ... )
- __init__(id, name, description, tickSize, tickValue, activeContract, symbolId=None)
Note
Technical Indicators: For comprehensive technical analysis with 55+ indicators, see the Technical Indicators module which provides TA-Lib compatible indicators optimized for Polars DataFrames.
Data Utilities
- create_data_snapshot(data, description='')[source]
Create a comprehensive snapshot of DataFrame for debugging/analysis.
- Parameters:
data (
DataFrame
) – Polars DataFramedescription (
str
) – Optional description
- Returns:
Data snapshot with statistics
- Return type:
Example
>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data") >>> print(f"Rows: {snapshot['row_count']}") >>> print(f"Timespan: {snapshot['timespan']}")
- convert_timeframe_to_seconds(timeframe)[source]
Convert timeframe string to seconds.
- Parameters:
timeframe (
str
) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)- Returns:
Timeframe in seconds
- Return type:
Example
>>> convert_timeframe_to_seconds("5min") 300 >>> convert_timeframe_to_seconds("1hr") 3600