Source code for project_x_py.utils.error_handler

"""
Centralized error handling utilities for ProjectX SDK.

Author: @TexasCoding
Date: 2025-08-02

Overview:
    Provides consistent error handling patterns, logging, and retry logic
    across the entire SDK. Implements decorators, context managers, and
    utilities for robust error handling in async and sync operations.

Key Features:
    - Decorator-based error handling for functions and methods
    - Automatic retry logic with exponential backoff
    - Rate limit handling with automatic delay
    - Response validation and error context
    - Comprehensive logging and error reporting
    - Support for both async and sync operations

Error Handling Patterns:
    - @handle_errors: General error handling with logging
    - @retry_on_network_error: Automatic retry for network issues
    - @handle_rate_limit: Rate limit handling with automatic delay
    - @validate_response: Response structure validation
    - ErrorContext: Context manager for batch operations

Example Usage:
    ```python
    from project_x_py.utils import (
        handle_errors,
        retry_on_network_error,
        handle_rate_limit,
        validate_response,
        ErrorContext,
    )


    # General error handling
    @handle_errors("fetch market data", reraise=False)
    async def get_market_data():
        # Implementation
        pass


    # Network retry with exponential backoff
    @retry_on_network_error(max_attempts=3, initial_delay=1.0)
    async def api_call():
        # Implementation
        pass


    # Rate limit handling
    @handle_rate_limit(fallback_delay=60.0)
    async def make_api_call():
        # Implementation
        pass


    # Response validation
    @validate_response(required_fields=["id", "status"])
    async def get_order(order_id: str):
        # Implementation
        pass


    # Batch operation error handling
    async with ErrorContext("process orders") as ctx:
        for order in orders:
            try:
                await process_order(order)
            except Exception as e:
                ctx.add_error(order.id, e)
    ```

Error Handling Benefits:
    - Consistent error handling across all SDK operations
    - Automatic retry logic for transient failures
    - Rate limit compliance with automatic delays
    - Comprehensive logging for debugging
    - Graceful degradation with fallback options
    - Type-safe error handling with proper exceptions

Performance Characteristics:
    - Minimal overhead for error handling
    - Efficient retry logic with exponential backoff
    - Memory-efficient error context management
    - Thread-safe operations for concurrent access
    - Optimized for high-frequency trading scenarios

See Also:
    - `utils.error_messages`: Standardized error messages
    - `utils.logging_config`: Logging for error reporting
    - `utils.async_rate_limiter`: Rate limiting utilities
"""

import asyncio
import functools
import logging
from collections.abc import Callable
from typing import Any, Literal, TypeVar, cast

import httpx

from project_x_py.exceptions import (
    ProjectXConnectionError,
    ProjectXDataError,
    ProjectXError,
    ProjectXRateLimitError,
    ProjectXServerError,
)

T = TypeVar("T")


[docs] def handle_errors( operation: str, logger: logging.Logger | None = None, reraise: bool = True, default_return: Any = None, ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator for consistent error handling across the SDK. This decorator catches exceptions, logs them consistently, and optionally re-raises them with additional context. Args: operation: Description of the operation being performed logger: Logger instance to use (defaults to module logger) reraise: Whether to re-raise exceptions after logging default_return: Default value to return if exception occurs and reraise=False Example: @handle_errors("fetch market data") async def get_bars(self, symbol: str): # Implementation pass """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @functools.wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> T: nonlocal logger if logger is None: logger = logging.getLogger(func.__module__) try: return await func(*args, **kwargs) # type: ignore[misc,no-any-return] except ProjectXError as e: # Already a ProjectX error, just add context logger.error( f"ProjectX error during {operation}: {e}", extra={ "error_type": type(e).__name__, "error_code": getattr(e, "error_code", None), "operation": operation, "function": func.__name__, }, ) if reraise: raise return cast(T, default_return) except httpx.HTTPError as e: # Convert HTTP errors to ProjectX errors logger.error( f"HTTP error during {operation}: {e}", extra={ "error_type": type(e).__name__, "operation": operation, "function": func.__name__, }, ) if reraise: raise ProjectXConnectionError( f"HTTP error during {operation}: {e}" ) from e return cast(T, default_return) except Exception as e: # Unexpected errors logger.exception( f"Unexpected error during {operation}", extra={ "error_type": type(e).__name__, "operation": operation, "function": func.__name__, }, ) if reraise: raise ProjectXError( f"Unexpected error during {operation}: {e}" ) from e return cast(T, default_return) @functools.wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> T: nonlocal logger if logger is None: logger = logging.getLogger(func.__module__) try: return func(*args, **kwargs) except ProjectXError as e: logger.error( f"ProjectX error during {operation}: {e}", extra={ "error_type": type(e).__name__, "error_code": getattr(e, "error_code", None), "operation": operation, "function": func.__name__, }, ) if reraise: raise return cast(T, default_return) except Exception as e: logger.exception( f"Unexpected error during {operation}", extra={ "error_type": type(e).__name__, "operation": operation, "function": func.__name__, }, ) if reraise: raise ProjectXError( f"Unexpected error during {operation}: {e}" ) from e return cast(T, default_return) # Return appropriate wrapper based on function type if asyncio.iscoroutinefunction(func): return cast(Callable[..., T], async_wrapper) else: return cast(Callable[..., T], sync_wrapper) return decorator
[docs] def retry_on_network_error( max_attempts: int = 3, backoff_factor: float = 2.0, initial_delay: float = 1.0, max_delay: float = 60.0, retry_on: tuple[type[Exception], ...] = ( httpx.ConnectError, httpx.TimeoutException, ProjectXConnectionError, ProjectXServerError, ), ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator to retry operations on network errors with exponential backoff. Args: max_attempts: Maximum number of retry attempts backoff_factor: Multiplier for exponential backoff initial_delay: Initial delay between retries in seconds max_delay: Maximum delay between retries in seconds retry_on: Tuple of exception types to retry on Example: @retry_on_network_error(max_attempts=5, initial_delay=0.5) async def api_call(self): # Implementation pass """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @functools.wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> T: logger = logging.getLogger(func.__module__) last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) # type: ignore[misc,no-any-return] except retry_on as e: last_exception = e if attempt < max_attempts - 1: delay = min( initial_delay * (backoff_factor**attempt), max_delay ) logger.warning( f"Retry {attempt + 1}/{max_attempts} for {func.__name__} " f"after {type(e).__name__}, waiting {delay:.1f}s", extra={ "attempt": attempt + 1, "max_attempts": max_attempts, "delay": delay, "error_type": type(e).__name__, "function": func.__name__, }, ) await asyncio.sleep(delay) else: logger.error( f"Max retries ({max_attempts}) exceeded for {func.__name__}", extra={ "max_attempts": max_attempts, "error_type": type(e).__name__, "function": func.__name__, }, ) # Re-raise the last exception if last_exception: raise last_exception else: # Should never reach here, but just in case raise RuntimeError( f"Unexpected state in retry logic for {func.__name__}" ) @functools.wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> T: logger = logging.getLogger(func.__module__) last_exception = None for attempt in range(max_attempts): try: return func(*args, **kwargs) except retry_on as e: last_exception = e if attempt < max_attempts - 1: delay = min( initial_delay * (backoff_factor**attempt), max_delay ) logger.warning( f"Retry {attempt + 1}/{max_attempts} for {func.__name__} " f"after {type(e).__name__}, waiting {delay:.1f}s" ) # For sync functions, we can't use asyncio.sleep import time time.sleep(delay) else: logger.error( f"Max retries ({max_attempts}) exceeded for {func.__name__}" ) if last_exception: raise last_exception else: raise RuntimeError( f"Unexpected state in retry logic for {func.__name__}" ) # Return appropriate wrapper based on function type if asyncio.iscoroutinefunction(func): return cast(Callable[..., T], async_wrapper) else: return cast(Callable[..., T], sync_wrapper) return decorator
[docs] def handle_rate_limit( logger: logging.Logger | None = None, fallback_delay: float = 60.0, ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator to handle rate limit errors with automatic retry. Args: logger: Logger instance to use fallback_delay: Default delay if rate limit reset time is not available Example: @handle_rate_limit(fallback_delay=30.0) async def make_api_call(self): # Implementation pass """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @functools.wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> T: nonlocal logger if logger is None: logger = logging.getLogger(func.__module__) try: return await func(*args, **kwargs) # type: ignore[misc,no-any-return] except ProjectXRateLimitError as e: # Check if we have a reset time in the response reset_time = None if hasattr(e, "response_data") and e.response_data: reset_time = e.response_data.get("reset_at") if reset_time: # Calculate delay until reset from datetime import datetime try: reset_dt = datetime.fromisoformat( reset_time.replace("Z", "+00:00") ) now = datetime.now(reset_dt.tzinfo) delay = max((reset_dt - now).total_seconds(), 1.0) except Exception: delay = fallback_delay else: delay = fallback_delay logger.warning( f"Rate limit hit in {func.__name__}, waiting {delay:.1f}s", extra={ "delay": delay, "function": func.__name__, "reset_time": reset_time, }, ) await asyncio.sleep(delay) # Retry once after waiting return await func(*args, **kwargs) # type: ignore[misc,no-any-return] @functools.wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> T: # Sync version is simpler - just re-raise # (rate limiting is primarily an async concern) return func(*args, **kwargs) # Return appropriate wrapper based on function type if asyncio.iscoroutinefunction(func): return cast(Callable[..., T], async_wrapper) else: return cast(Callable[..., T], sync_wrapper) return decorator
[docs] def validate_response( required_fields: list[str] | None = None, response_type: type | None = None, ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator to validate API response structure. Args: required_fields: List of required fields in the response response_type: Expected type of the response Example: @validate_response(required_fields=["id", "status"], response_type=dict) async def get_order(self, order_id: str): # Implementation pass """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @functools.wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> T: result = await func(*args, **kwargs) # type: ignore[misc] # Validate type if response_type is not None and not isinstance(result, response_type): raise ProjectXDataError( f"Invalid response type from {func.__name__}: " f"expected {response_type.__name__}, got {type(result).__name__}" ) # Validate required fields if required_fields and isinstance(result, dict): missing_fields = [f for f in required_fields if f not in result] if missing_fields: raise ProjectXDataError( f"Missing required fields in response from {func.__name__}: " f"{', '.join(missing_fields)}" ) return result # type: ignore[no-any-return] @functools.wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> T: result = func(*args, **kwargs) # Validate type if response_type is not None and not isinstance(result, response_type): raise ProjectXDataError( f"Invalid response type from {func.__name__}: " f"expected {response_type.__name__}, got {type(result).__name__}" ) # Validate required fields if required_fields and isinstance(result, dict): missing_fields = [f for f in required_fields if f not in result] if missing_fields: raise ProjectXDataError( f"Missing required fields in response from {func.__name__}: " f"{', '.join(missing_fields)}" ) return result # Return appropriate wrapper based on function type if asyncio.iscoroutinefunction(func): return cast(Callable[..., T], async_wrapper) else: return cast(Callable[..., T], sync_wrapper) return decorator
# Error context manager for batch operations
[docs] class ErrorContext: """ Context manager for handling errors in batch operations. Collects errors during batch processing and provides summary. Example: async with ErrorContext("process orders") as ctx: for order in orders: try: await process_order(order) except Exception as e: ctx.add_error(order.id, e) if ctx.has_errors: logger.error(f"Failed to process {ctx.error_count} orders") """
[docs] def __init__(self, operation: str, logger: logging.Logger | None = None): self.operation = operation self.logger = logger or logging.getLogger(__name__) self.errors: list[tuple[str, Exception]] = []
[docs] def add_error(self, context: str, error: Exception) -> None: """Add an error to the context.""" self.errors.append((context, error))
@property def has_errors(self) -> bool: """Check if any errors were collected.""" return len(self.errors) > 0 @property def error_count(self) -> int: """Get the number of errors collected.""" return len(self.errors)
[docs] def get_summary(self) -> str: """Get a summary of all errors.""" if not self.errors: return "No errors" error_types: dict[str, int] = {} for _, error in self.errors: error_type = type(error).__name__ error_types[error_type] = error_types.get(error_type, 0) + 1 summary_parts = [f"{count} {etype}" for etype, count in error_types.items()] return f"{self.error_count} errors: {', '.join(summary_parts)}"
def __enter__(self) -> "ErrorContext": return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: if self.has_errors: self.logger.error( f"Errors during {self.operation}: {self.get_summary()}", extra={ "operation": self.operation, "error_count": self.error_count, "errors": [ (ctx, str(e)) for ctx, e in self.errors[:10] ], # First 10 }, ) return False # Don't suppress exceptions async def __aenter__(self) -> "ErrorContext": return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: return self.__exit__(exc_type, exc_val, exc_tb)