Source code for project_x_py.realtime.core

"""
Async ProjectX Realtime Client for ProjectX Gateway API

Author: @TexasCoding
Date: 2025-08-02

Overview:
    Provides an async Python client for the ProjectX real-time API, which provides
    access to the ProjectX trading platform real-time events via SignalR WebSocket
    connections. Implements dual-hub architecture for user and market data streams.

Key Features:
    - Full async/await support for all operations
    - Asyncio-based connection management
    - Non-blocking event processing
    - Async callbacks for all events
    - Dual-hub SignalR connections (User + Market)
    - Automatic reconnection with exponential backoff
    - JWT token authentication and refresh handling
    - Thread-safe event processing and callback execution
    - Connection health monitoring and statistics

Architecture:
    - Pure event forwarding (no business logic)
    - No data caching (handled by managers)
    - No payload parsing (managers handle ProjectX formats)
    - Minimal stateful operations
    - Mixin-based design for modular functionality

Real-time Hubs:
    - User Hub: Account, position, and order updates
    - Market Hub: Quote, trade, and market depth data

Note:
    This class forms the low-level foundation for real-time data. For most applications,
    the `TradingSuite` is the recommended entry point as it abstracts away the direct
    management of this client, its connections, and its events.

Example Usage:
    The example below shows how to use the client directly. In a real application,
    the `position_manager`, `order_manager`, and `data_manager` would be instances
    of the SDK's manager classes.

    ```python
    # V3: Create async client with ProjectX Gateway URLs
    import asyncio
    from project_x_py import create_realtime_client


    async def main():
        # V3: Use factory function for proper initialization
        client = await create_realtime_client(jwt_token, account_id)

        # V3: Register async managers for event handling
        # await client.add_callback("position_update", position_manager.handle_update)
        # await client.add_callback("order_update", order_manager.handle_update)
        # await client.add_callback("quote_update", data_manager.handle_quote)

        # V3: Connect and check both hub connections
        if await client.connect():
            print(f"User Hub: {client.user_connected}")
            print(f"Market Hub: {client.market_connected}")

            # V3: Subscribe to user and market data
            await client.subscribe_user_updates()
            await client.subscribe_market_data(["MGC", "MNQ"])

            # V3: Process events
            await asyncio.sleep(60)

            # V3: Clean disconnect
            await client.disconnect()


    # asyncio.run(main())
    ```

Event Types (per ProjectX Gateway docs):
    User Hub: GatewayUserAccount, GatewayUserPosition, GatewayUserOrder, GatewayUserTrade
    Market Hub: GatewayQuote, GatewayDepth, GatewayTrade

Integration:
    - AsyncPositionManager handles position events and caching
    - AsyncOrderManager handles order events and tracking
    - AsyncRealtimeDataManager handles market data and caching
    - This client only handles connections and event forwarding
"""

import asyncio
import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any

from project_x_py.realtime.connection_management import ConnectionManagementMixin
from project_x_py.realtime.event_handling import EventHandlingMixin
from project_x_py.realtime.subscriptions import SubscriptionsMixin

if TYPE_CHECKING:
    from project_x_py.models import ProjectXConfig


[docs] class ProjectXRealtimeClient( ConnectionManagementMixin, EventHandlingMixin, SubscriptionsMixin, ): """ Async real-time client for ProjectX Gateway API WebSocket connections. This class provides an async interface for ProjectX SignalR connections and forwards all events to registered managers. It does NOT cache data or perform business logic - that's handled by the specialized managers. Features: - Async SignalR WebSocket connections to ProjectX Gateway hubs - Event forwarding to registered async managers - Automatic reconnection with exponential backoff - JWT token refresh and reconnection - Connection health monitoring - Async event callbacks - Thread-safe event processing and callback execution - Comprehensive connection statistics and health tracking Architecture: - Pure event forwarding (no business logic) - No data caching (handled by managers) - No payload parsing (managers handle ProjectX formats) - Minimal stateful operations - Mixin-based design for modular functionality Real-time Hubs (per ProjectX Gateway docs): - User Hub: Account, position, and order updates - Market Hub: Quote, trade, and market depth data Connection Management: - Dual-hub SignalR connections with automatic reconnection - JWT token authentication via Authorization headers - Connection health monitoring and error handling - Thread-safe operations with proper lock management Event Processing: - Cross-thread event scheduling for asyncio compatibility - Support for both async and sync callbacks - Error isolation to prevent callback failures - Event statistics and flow monitoring Example: >>> # V3: Create async client with factory function >>> client = await create_realtime_client(jwt_token, account_id) >>> # V3: Register async callbacks for event handling >>> async def handle_position(data): ... print(f"Position: {data.get('contractId')} - {data.get('netPos')}") >>> async def handle_order(data): ... print(f"Order {data.get('id')}: {data.get('status')}") >>> async def handle_quote(data): ... print(f"Quote: {data.get('bid')} x {data.get('ask')}") >>> await client.add_callback("position_update", handle_position) >>> await client.add_callback("order_update", handle_order) >>> await client.add_callback("quote_update", handle_quote) >>> >>> # V3: Connect and subscribe with error handling >>> if await client.connect(): ... await client.subscribe_user_updates() ... await client.subscribe_market_data(["MGC", "MNQ"]) ... else: ... print("Connection failed") Event Types (per ProjectX Gateway docs): User Hub: GatewayUserAccount, GatewayUserPosition, GatewayUserOrder, GatewayUserTrade Market Hub: GatewayQuote, GatewayDepth, GatewayTrade Integration: - AsyncPositionManager handles position events and caching - AsyncOrderManager handles order events and tracking - AsyncRealtimeDataManager handles market data and caching - This client only handles connections and event forwarding """
[docs] def __init__( self, jwt_token: str, account_id: str, user_hub_url: str | None = None, market_hub_url: str | None = None, config: "ProjectXConfig | None" = None, ): """ Initialize async ProjectX real-time client with configurable SignalR connections. Creates a dual-hub SignalR client for real-time ProjectX Gateway communication. Handles both user-specific events (positions, orders) and market data (quotes, trades). Args: jwt_token (str): JWT authentication token from AsyncProjectX.authenticate(). Must be valid and not expired for successful connection. account_id (str): ProjectX account ID for user-specific subscriptions. Used to filter position, order, and trade events. user_hub_url (str, optional): Override URL for user hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default). market_hub_url (str, optional): Override URL for market hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default). config (ProjectXConfig, optional): Configuration object with hub URLs. Provides default URLs if direct URLs not specified. Defaults to None (uses TopStepX defaults). URL Priority: 1. Direct parameters (user_hub_url, market_hub_url) 2. Config URLs (config.user_hub_url, config.market_hub_url) 3. Default TopStepX endpoints Example: >>> # V3: Using factory function (recommended) >>> client = await create_realtime_client( ... jwt_token=client.get_session_token(), ... account_id=str(client.get_account_info().id), ... ) >>> # V3: Using direct instantiation with default endpoints >>> client = ProjectXRealtimeClient(jwt_token=jwt_token, account_id="12345") >>> >>> # V3: Using custom config for different environments >>> from project_x_py.models import ProjectXConfig >>> config = ProjectXConfig( ... user_hub_url="https://gateway.topstepx.com/hubs/user", ... market_hub_url="https://gateway.topstepx.com/hubs/market", ... ) >>> client = ProjectXRealtimeClient( ... jwt_token=jwt_token, account_id="12345", config=config ... ) >>> >>> # V3: Override specific URL for testing >>> client = ProjectXRealtimeClient( ... jwt_token=jwt_token, ... account_id="12345", ... market_hub_url="https://test.topstepx.com/hubs/market", ... ) Note: - JWT token is passed securely via Authorization header - Both hubs must connect successfully for full functionality - SignalR connections are established lazily on connect() """ # Initialize parent mixins super().__init__() self.jwt_token = jwt_token self.account_id = account_id # Determine URLs with priority: params > config > defaults if config: default_user_url = config.user_hub_url default_market_url = config.market_hub_url else: # Default to TopStepX endpoints default_user_url = "https://rtc.topstepx.com/hubs/user" default_market_url = "https://rtc.topstepx.com/hubs/market" final_user_url = user_hub_url or default_user_url final_market_url = market_hub_url or default_market_url # Store URLs without tokens (tokens will be passed in headers) self.user_hub_url = final_user_url self.market_hub_url = final_market_url # Set up base URLs for token refresh if config: # Use config URLs if provided self.base_user_url = config.user_hub_url self.base_market_url = config.market_hub_url elif user_hub_url and market_hub_url: # Use provided URLs self.base_user_url = user_hub_url self.base_market_url = market_hub_url else: # Default to TopStepX endpoints self.base_user_url = "https://rtc.topstepx.com/hubs/user" self.base_market_url = "https://rtc.topstepx.com/hubs/market" # SignalR connection objects self.user_connection: Any | None = None self.market_connection: Any | None = None # Connection state tracking self.user_connected = False self.market_connected = False self.setup_complete = False # Event callbacks (pure forwarding, no caching) - already initialized in mixin if not hasattr(self, "callbacks"): self.callbacks: defaultdict[str, list[Any]] = defaultdict(list) # Basic statistics (no business logic) self.stats = { "events_received": 0, "connection_errors": 0, "last_event_time": None, "connected_time": None, } # Track subscribed contracts for reconnection self._subscribed_contracts: list[str] = [] # Logger self.logger = logging.getLogger(__name__) self.logger.info("AsyncProjectX real-time client initialized") self.logger.info(f"User Hub: {final_user_url}") self.logger.info(f"Market Hub: {final_market_url}") # Async locks for thread-safe operations - check if not already initialized if not hasattr(self, "_callback_lock"): self._callback_lock = asyncio.Lock() if not hasattr(self, "_connection_lock"): self._connection_lock = asyncio.Lock()