Source code for project_x_py.position_manager.core

"""
Core PositionManager class for comprehensive position operations.

Author: @TexasCoding
Date: 2025-08-02

Overview:
    Provides the main PositionManager class that handles all position-related
    operations including tracking, monitoring, analysis, and management.
    Integrates multiple mixins to provide comprehensive position lifecycle
    management with real-time capabilities and risk management.

Key Features:
    - Real-time position tracking via WebSocket integration
    - Portfolio-level position management and analytics
    - Automated P&L calculation and risk metrics
    - Position sizing and risk management tools
    - Event-driven position updates and closure detection
    - Async-safe operations for concurrent access
    - Comprehensive position operations (close, partial close)
    - Statistics, history, and report generation

Note:
    This class is the core implementation of the position manager. For most
    applications, it is recommended to interact with it through the `TradingSuite`
    (`suite.positions`), which handles its lifecycle and integration automatically.
    The example below demonstrates direct, low-level instantiation and usage.

Example Usage:
    ```python
    # V3: Initialize position manager with EventBus and real-time support
    import asyncio
    from project_x_py import ProjectX, create_realtime_client, EventBus
    from project_x_py.position_manager import PositionManager


    async def main():
        async with ProjectX.from_env() as client:
            await client.authenticate()

            # V3: Create dependencies
            event_bus = EventBus()
            realtime_client = await create_realtime_client(
                client.get_session_token(), str(client.get_account_info().id)
            )

            # V3: Initialize position manager
            pm = PositionManager(client, event_bus)
            await pm.initialize(realtime_client)

            # V3: Get current positions with detailed fields
            positions = await pm.get_all_positions()
            for pos in positions:
                print(f"{pos.contractId}: {pos.netPos} @ ${pos.buyAvgPrice}")

            # V3: Calculate P&L with market prices
            prices = {"MGC": 2050.0, "MNQ": 18500.0}
            pnl = await pm.calculate_portfolio_pnl(prices)
            print(f"Total P&L: ${pnl['total_pnl']:.2f}")

            # V3: Risk analysis
            risk = await pm.get_risk_metrics()
            print(f"Portfolio risk: {risk['portfolio_risk']:.2%}")

            # V3: Position operations
            await pm.close_position_direct("MGC")


    asyncio.run(main())
    ```

See Also:
    - `position_manager.analytics.PositionAnalyticsMixin`
    - `position_manager.risk.RiskManagementMixin`
    - `position_manager.monitoring.PositionMonitoringMixin`
    - `position_manager.operations.PositionOperationsMixin`
    - `position_manager.reporting.PositionReportingMixin`
    - `position_manager.tracking.PositionTrackingMixin`
"""

import asyncio
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional

from project_x_py.client.base import ProjectXBase
from project_x_py.models import Position
from project_x_py.position_manager.analytics import PositionAnalyticsMixin
from project_x_py.position_manager.monitoring import PositionMonitoringMixin
from project_x_py.position_manager.operations import PositionOperationsMixin
from project_x_py.position_manager.reporting import PositionReportingMixin
from project_x_py.position_manager.risk import RiskManagementMixin
from project_x_py.position_manager.tracking import PositionTrackingMixin
from project_x_py.types.config_types import PositionManagerConfig
from project_x_py.utils import (
    LogMessages,
    ProjectXLogger,
    handle_errors,
)

if TYPE_CHECKING:
    from project_x_py.client import ProjectXBase
    from project_x_py.order_manager import OrderManager
    from project_x_py.realtime import ProjectXRealtimeClient


[docs] class PositionManager( PositionTrackingMixin, PositionAnalyticsMixin, RiskManagementMixin, PositionMonitoringMixin, PositionOperationsMixin, PositionReportingMixin, ): """ Async comprehensive position management system for ProjectX trading operations. This class handles all position-related operations including tracking, monitoring, analysis, and management using async/await patterns. It integrates with both the AsyncProjectX client and the async real-time client for live position monitoring. Features: - Complete async position lifecycle management - Real-time position tracking and monitoring via WebSocket - Portfolio-level position management and analytics - Automated P&L calculation and risk metrics - Position sizing and risk management tools - Event-driven position updates (closures detected from size=0) - Async-safe operations for concurrent access - Comprehensive position operations (close, partial close, bulk operations) - Statistics, history, and report generation Real-time Capabilities: - WebSocket-based position updates and closure detection - Immediate position change notifications - Event-driven callbacks for custom monitoring - Automatic position synchronization with order management Risk Management: - Portfolio risk assessment and concentration analysis - Position sizing calculations with configurable risk parameters - Risk warnings and threshold monitoring - Diversification scoring and portfolio health metrics Example Usage: >>> # V3: Create position manager with EventBus integration >>> event_bus = EventBus() >>> position_manager = PositionManager(project_x_client, event_bus) >>> # V3: Initialize with real-time client for WebSocket updates >>> realtime_client = await create_realtime_client( ... client.get_session_token(), str(client.get_account_info().id) ... ) >>> await position_manager.initialize(realtime_client=realtime_client) >>> # V3: Get current positions with actual field names >>> positions = await position_manager.get_all_positions() >>> mgc_position = await position_manager.get_position("MGC") >>> if mgc_position: >>> print(f"Size: {mgc_position.netPos}") >>> print(f"Avg Price: ${mgc_position.buyAvgPrice}") >>> # V3: Portfolio analytics with market prices >>> market_prices = {"MGC": 2050.0, "MNQ": 18500.0} >>> portfolio_pnl = await position_manager.calculate_portfolio_pnl( ... market_prices ... ) >>> risk_metrics = await position_manager.get_risk_metrics() >>> # V3: Position monitoring with alerts >>> await position_manager.add_position_alert("MGC", max_loss=-500.0) >>> await position_manager.start_monitoring(interval_seconds=5) >>> # V3: Position sizing with risk management >>> suggested_size = await position_manager.calculate_position_size( ... "MGC", risk_amount=100.0, entry_price=2045.0, stop_price=2040.0 ... ) """
[docs] def __init__( self, project_x_client: "ProjectXBase", event_bus: Any, config: PositionManagerConfig | None = None, ): """ Initialize the PositionManager with an ProjectX client and optional configuration. Creates a comprehensive position management system with tracking, monitoring, alerts, risk management, and optional real-time/order synchronization. Args: project_x_client (ProjectX): The authenticated ProjectX client instance used for all API operations. Must be properly authenticated before use. event_bus: EventBus instance for unified event handling. Required for all event emissions including position updates, P&L changes, and risk alerts. config: Optional configuration for position management behavior. If not provided, default values will be used for all configuration options. Attributes: project_x (ProjectX): Reference to the ProjectX client logger (logging.Logger): Logger instance for this manager position_lock (asyncio.Lock): Thread-safe lock for position operations realtime_client (ProjectXRealtimeClient | None): Optional real-time client order_manager (OrderManager | None): Optional order manager for sync tracked_positions (dict[str, Position]): Current positions by contract ID position_history (dict[str, list[dict]]): Historical position changes event_bus (Any): EventBus instance for unified event handling position_alerts (dict[str, dict]): Active position alerts by contract stats (dict): Comprehensive tracking statistics risk_settings (dict): Risk management configuration Example: >>> # V3: Initialize with EventBus for unified event handling >>> async with ProjectX.from_env() as client: ... await client.authenticate() ... event_bus = EventBus() ... position_manager = PositionManager(client, event_bus) ... ... # V3: Optional - add order manager for synchronization ... order_manager = OrderManager(client, event_bus) ... await position_manager.initialize( ... realtime_client=realtime_client, order_manager=order_manager ... ) """ # Initialize all mixins PositionTrackingMixin.__init__(self) PositionMonitoringMixin.__init__(self) self.project_x = project_x_client self.event_bus = event_bus # Store the event bus for emitting events self.logger = ProjectXLogger.get_logger(__name__) # Store configuration with defaults self.config = config or {} self._apply_config_defaults() # Async lock for thread safety self.position_lock = asyncio.Lock() # Real-time integration (optional) self.realtime_client: ProjectXRealtimeClient | None = None self._realtime_enabled = False # Order management integration (optional) self.order_manager: OrderManager | None = None self._order_sync_enabled = False # Comprehensive statistics tracking self.stats = { "open_positions": 0, "closed_positions": 0, "total_positions": 0, "total_pnl": 0.0, "realized_pnl": 0.0, "unrealized_pnl": 0.0, "best_position_pnl": 0.0, "worst_position_pnl": 0.0, "avg_position_size": 0.0, "largest_position": 0, "avg_hold_time_minutes": 0.0, "longest_hold_time_minutes": 0.0, "win_rate": 0.0, "profit_factor": 0.0, "sharpe_ratio": 0.0, "max_drawdown": 0.0, "total_risk": 0.0, "max_position_risk": 0.0, "portfolio_correlation": 0.0, "var_95": 0.0, "position_updates": 0, "risk_calculations": 0, "last_position_update": None, # Legacy fields for backward compatibility in other methods "positions_tracked": 0, "positions_partially_closed": 0, "last_update_time": None, "monitoring_started": None, } self.logger.info( LogMessages.MANAGER_INITIALIZED, extra={"manager": "PositionManager"} )
def _apply_config_defaults(self) -> None: """Apply default values for configuration options.""" # Position management settings self.enable_risk_monitoring = self.config.get("enable_risk_monitoring", True) self.auto_stop_loss = self.config.get("auto_stop_loss", False) self.auto_take_profit = self.config.get("auto_take_profit", False) self.max_position_size = self.config.get("max_position_size", 100) self.max_portfolio_risk = self.config.get("max_portfolio_risk", 0.02) self.position_sizing_method = self.config.get("position_sizing_method", "fixed") self.enable_correlation_analysis = self.config.get( "enable_correlation_analysis", True ) self.enable_portfolio_rebalancing = self.config.get( "enable_portfolio_rebalancing", False ) self.rebalance_frequency_minutes = self.config.get( "rebalance_frequency_minutes", 60 ) self.risk_calculation_interval = self.config.get("risk_calculation_interval", 5) # Update risk settings from configuration self.risk_settings = { "max_portfolio_risk": self.max_portfolio_risk, "max_position_risk": self.config.get( "max_position_risk", 0.01 ), # 1% per position "max_correlation": self.config.get( "max_correlation", 0.7 ), # Maximum correlation between positions "alert_threshold": self.config.get( "alert_threshold", 0.005 ), # 0.5% threshold for alerts }
[docs] @handle_errors("initialize position manager", reraise=False, default_return=False) async def initialize( self, realtime_client: Optional["ProjectXRealtimeClient"] = None, order_manager: Optional["OrderManager"] = None, ) -> bool: """ Initialize the PositionManager with optional real-time capabilities and order synchronization. This method sets up advanced features including real-time position tracking via WebSocket and automatic order synchronization. Must be called before using real-time features. Args: realtime_client (ProjectXRealtimeClient, optional): Real-time client instance for WebSocket-based position updates. When provided, enables live position tracking without polling. Defaults to None (polling mode). order_manager (OrderManager, optional): Order manager instance for automatic order synchronization. When provided, orders are automatically updated when positions change. Defaults to None (no order sync). Returns: bool: True if initialization successful, False if any errors occurred Raises: Exception: Logged but not raised - returns False on failure Example: >>> # V3: Initialize with real-time tracking >>> rt_client = await create_realtime_client( ... client.get_session_token(), str(client.get_account_info().id) ... ) >>> success = await position_manager.initialize(realtime_client=rt_client) >>> >>> # V3: Initialize with both real-time and order sync >>> event_bus = EventBus() >>> order_mgr = OrderManager(client, event_bus) >>> success = await position_manager.initialize( ... realtime_client=rt_client, order_manager=order_mgr ... ) Note: - Real-time mode provides instant position updates via WebSocket - Polling mode refreshes positions periodically (see start_monitoring) - Order synchronization helps maintain order/position consistency """ # Set up real-time integration if provided if realtime_client: self.realtime_client = realtime_client await self._setup_realtime_callbacks() self._realtime_enabled = True self.logger.info( LogMessages.MANAGER_INITIALIZED, extra={"manager": "PositionManager", "mode": "realtime"}, ) else: self.logger.info( LogMessages.MANAGER_INITIALIZED, extra={"manager": "PositionManager", "mode": "polling"}, ) # Set up order management integration if provided if order_manager: self.order_manager = order_manager self._order_sync_enabled = True self.logger.info( LogMessages.MANAGER_INITIALIZED, extra={"feature": "order_synchronization", "enabled": True}, ) # Load initial positions await self.refresh_positions() return True
# ================================================================================ # CORE POSITION RETRIEVAL METHODS # ================================================================================
[docs] @handle_errors("get all positions", reraise=False, default_return=[]) async def get_all_positions(self, account_id: int | None = None) -> list[Position]: """ Get all current positions from the API and update tracking. Retrieves all open positions for the specified account, updates the internal tracking cache, and returns the position list. This is the primary method for fetching position data. Args: account_id (int, optional): The account ID to get positions for. If None, uses the default account from authentication. Defaults to None. Returns: list[Position]: List of all current open positions. Each Position object contains id, accountId, contractId, type, size, averagePrice, and creationTimestamp. Empty list if no positions or on error. Side effects: - Updates self.tracked_positions with current data - Updates statistics (positions_tracked, last_update_time) Example: >>> # V3: Get all positions with actual field names >>> positions = await position_manager.get_all_positions() >>> for pos in positions: ... print(f"Contract: {pos.contractId}") ... print(f" Net Position: {pos.netPos}") ... print(f" Buy Avg Price: ${pos.buyAvgPrice:.2f}") ... print(f" Unrealized P&L: ${pos.unrealizedPnl:.2f}") >>> # V3: Get positions for specific account >>> positions = await position_manager.get_all_positions(account_id=12345) Note: In real-time mode, tracked positions are also updated via WebSocket, but this method always fetches fresh data from the API. """ self.logger.info(LogMessages.POSITION_SEARCH, extra={"account_id": account_id}) positions = await self.project_x.search_open_positions(account_id=account_id) # Update tracked positions async with self.position_lock: for position in positions: self.tracked_positions[position.contractId] = position # Update statistics self.stats["positions_tracked"] = len(positions) self.stats["last_update_time"] = datetime.now() self.logger.info( LogMessages.POSITION_UPDATE, extra={"position_count": len(positions)} ) return positions
[docs] @handle_errors("get position", reraise=False, default_return=None) async def get_position( self, contract_id: str, account_id: int | None = None ) -> Position | None: """ Get a specific position by contract ID. Searches for a position matching the given contract ID. In real-time mode, checks the local cache first for better performance before falling back to an API call. Args: contract_id (str): The contract ID to search for (e.g., "MGC", "NQ") account_id (int, optional): The account ID to search within. If None, uses the default account from authentication. Defaults to None. Returns: Position | None: Position object if found, containing all position details (id, size, averagePrice, type, etc.). Returns None if no position exists for the contract. Example: >>> # V3: Check if we have a Gold position >>> mgc_position = await position_manager.get_position("MGC") >>> if mgc_position: ... print(f"MGC position: {mgc_position.netPos} contracts") ... print(f"Buy Avg Price: ${mgc_position.buyAvgPrice:.2f}") ... print(f"Sell Avg Price: ${mgc_position.sellAvgPrice:.2f}") ... print(f"Unrealized P&L: ${mgc_position.unrealizedPnl:.2f}") ... print(f"Realized P&L: ${mgc_position.realizedPnl:.2f}") ... else: ... print("No MGC position found") Performance: - Real-time mode: O(1) cache lookup, falls back to API if miss - Polling mode: Always makes API call via get_all_positions() """ # Try cached data first if real-time enabled if self._realtime_enabled: async with self.position_lock: cached_position = self.tracked_positions.get(contract_id) if cached_position: return cached_position # Fallback to API search positions = await self.get_all_positions(account_id=account_id) for position in positions: if position.contractId == contract_id: return position return None
[docs] @handle_errors("refresh positions", reraise=False, default_return=False) async def refresh_positions(self, account_id: int | None = None) -> bool: """ Refresh all position data from the API. Forces a fresh fetch of all positions from the API, updating the internal tracking cache. Useful for ensuring data is current after external changes or when real-time updates may have been missed. Args: account_id (int, optional): The account ID to refresh positions for. If None, uses the default account from authentication. Defaults to None. Returns: bool: True if refresh was successful, False if any error occurred Side effects: - Updates self.tracked_positions with fresh data - Updates position statistics - Logs refresh results Example: >>> # Manually refresh positions >>> success = await position_manager.refresh_positions() >>> if success: ... print("Positions refreshed successfully") >>> # Refresh specific account >>> await position_manager.refresh_positions(account_id=12345) Note: This method is called automatically during initialization and by the monitoring loop in polling mode. """ self.logger.info(LogMessages.POSITION_REFRESH, extra={"account_id": account_id}) positions = await self.get_all_positions(account_id=account_id) self.logger.info( LogMessages.POSITION_UPDATE, extra={"refreshed_count": len(positions)} ) return True
[docs] async def is_position_open( self, contract_id: str, account_id: int | None = None ) -> bool: """ Check if a position exists for the given contract. Convenience method to quickly check if you have an open position in a specific contract without retrieving the full position details. Args: contract_id (str): The contract ID to check (e.g., "MGC", "NQ") account_id (int, optional): The account ID to check within. If None, uses the default account from authentication. Defaults to None. Returns: bool: True if an open position exists (size != 0), False otherwise Example: >>> # Check before placing an order >>> if await position_manager.is_position_open("MGC"): ... print("Already have MGC position") ... else: ... # Safe to open new position ... await order_manager.place_market_order("MGC", 0, 1) Note: A position with size=0 is considered closed and returns False. """ position = await self.get_position(contract_id, account_id) return position is not None and position.size != 0
[docs] async def cleanup(self) -> None: """ Clean up resources and connections when shutting down. Performs complete cleanup of the AsyncPositionManager, including stopping monitoring tasks, clearing tracked data, and releasing all resources. Should be called when the manager is no longer needed to prevent memory leaks and ensure graceful shutdown. Cleanup operations: 1. Stops position monitoring (cancels async tasks) 2. Clears all tracked positions 3. Clears position history 4. Removes all callbacks 5. Clears all alerts 6. Disconnects order manager integration Example: >>> # Basic cleanup >>> await position_manager.cleanup() >>> # Cleanup in finally block >>> position_manager = AsyncPositionManager(client) >>> try: ... await position_manager.initialize(realtime_client) ... # ... use position manager ... ... finally: ... await position_manager.cleanup() >>> # Context manager pattern (if implemented) >>> async with AsyncPositionManager(client) as pm: ... await pm.initialize(realtime_client) ... # ... automatic cleanup on exit ... Note: - Safe to call multiple times - Logs successful cleanup - Does not close underlying client connections """ await self.stop_monitoring() async with self.position_lock: self.tracked_positions.clear() self.position_history.clear() # EventBus handles all callbacks now self.position_alerts.clear() # Clear order manager integration self.order_manager = None self._order_sync_enabled = False self.logger.info("✅ AsyncPositionManager cleanup completed")