Utilities
Utility functions and helper tools for trading, analysis, and data processing.
ProjectX Utility Functions
Author: @TexasCoding Date: 2025-08-02
- Overview:
Contains utility functions used throughout the ProjectX client. Provides generic, reusable functionality for data processing, error handling, logging, market analysis, and trading calculations. All utilities are stateless, pure functions designed for cross-module compatibility.
- Key Features:
Generic utility functions for data processing and manipulation
Comprehensive error handling and logging utilities
Market analysis and pattern detection tools
Trading calculations and risk management utilities
Environment configuration and validation helpers
Rate limiting and performance optimization tools
- Utility Categories:
Core Utilities: Rate limiting, data manipulation, environment handling
Error Handling: Decorators, context managers, standardized error messages
Logging: Structured logging, performance monitoring, API call tracking
Market Analysis: Session info, contract validation, pattern detection
Portfolio Analytics: Performance metrics, correlation analysis, drawdown calculation
Trading Calculations: Position sizing, risk management, tick value calculations
- Example Usage:
```python from project_x_py.utils import (
# Error handling handle_errors, retry_on_network_error, ErrorContext, # Trading calculations calculate_tick_value, calculate_position_sizing, round_to_tick_size, # Market utilities is_market_hours, get_market_session_info, validate_contract_id, # Portfolio analytics calculate_sharpe_ratio, calculate_max_drawdown, calculate_portfolio_metrics, # Pattern detection detect_candlestick_patterns, detect_chart_patterns, # Data utilities create_data_snapshot, get_polars_last_value,
)
# Use error handling decorators @handle_errors(“fetch market data”) async def get_market_data():
# Implementation pass
# Use trading calculations tick_value = calculate_tick_value(0.5, 0.1, 1.0) # 5 ticks position_size = calculate_position_sizing(50000, 0.02, 2050, 2040)
# Use market utilities if is_market_hours():
print(“Market is open”)
# Use portfolio analytics sharpe = calculate_sharpe_ratio(returns_data) drawdown = calculate_max_drawdown(price_data) ```
- Architecture Principles:
Generic and reusable across different contexts
Work with standard data types (DataFrames, numbers, strings)
No domain-specific knowledge or dependencies
Stateless and pure functions
Comprehensive error handling and logging
Note: Technical indicators have been moved to the indicators module for better organization.
See also
utils.async_rate_limiter: Rate limiting for API calls
utils.error_handler: Error handling decorators and context managers
utils.trading_calculations: Trading math and risk management
utils.portfolio_analytics: Performance metrics and analysis
utils.market_utils: Market session and contract utilities
utils.pattern_detection: Technical pattern detection
- class ErrorCode[source]
Bases:
object
Standard error codes for categorizing errors.
- API_BAD_REQUEST = 3400
- API_FORBIDDEN = 3403
- API_NOT_FOUND = 3404
- API_RATE_LIMIT = 3429
- API_SERVER_ERROR = 3500
- AUTH_EXPIRED = 1003
- AUTH_INVALID = 1002
- AUTH_PERMISSION = 1004
- AUTH_REQUIRED = 1001
- CONN_FAILED = 2001
- CONN_LOST = 2003
- CONN_SSL = 2004
- CONN_TIMEOUT = 2002
- DATA_INVALID = 4004
- DATA_MISSING = 4003
- DATA_PARSING = 4002
- DATA_VALIDATION = 4001
- INTERNAL_ERROR = 9001
- INVALID_STATE = 9003
- NOT_IMPLEMENTED = 9002
- ORDER_INVALID = 5001
- ORDER_NOT_FOUND = 5002
- ORDER_REJECTED = 5003
- POSITION_INVALID = 5101
- POSITION_NOT_FOUND = 5102
- WS_AUTH = 6002
- WS_CONNECTION = 6001
- WS_MESSAGE = 6004
- WS_SUBSCRIPTION = 6003
- class ErrorContext(operation, logger=None)[source]
Bases:
object
Context manager for handling errors in batch operations.
Collects errors during batch processing and provides summary.
Example
- async with ErrorContext(“process orders”) as ctx:
- for order in orders:
- try:
await process_order(order)
- except Exception as e:
ctx.add_error(order.id, e)
- if ctx.has_errors:
logger.error(f”Failed to process {ctx.error_count} orders”)
- __init__(operation, logger=None)[source]
- property error_count: int
Get the number of errors collected.
- property has_errors: bool
Check if any errors were collected.
- class ErrorMessages[source]
Bases:
object
Standardized error messages for common scenarios.
- ACCOUNT_NOT_FOUND = "Account '{account_name}' not found. Available accounts: {available_accounts}"
- API_INVALID_ENDPOINT = 'Invalid API endpoint: {endpoint}'
- API_INVALID_REQUEST = 'Invalid request: {reason}'
- API_METHOD_NOT_ALLOWED = 'HTTP method {method} not allowed for {endpoint}'
- API_RATE_LIMITED = 'Rate limit exceeded, retry after {retry_after}s'
- API_REQUEST_FAILED = 'API request failed'
- API_RESOURCE_NOT_FOUND = 'Resource not found: {resource}'
- API_SERVER_ERROR = 'Server error: {status_code} - {message}'
- AUTH_FAILED = 'Authentication failed'
- AUTH_INVALID_CREDENTIALS = 'Invalid authentication credentials'
- AUTH_MISSING_CREDENTIALS = 'Missing authentication credentials'
- AUTH_NO_ACCOUNTS = 'No accounts found for user'
- AUTH_PERMISSION_DENIED = 'Permission denied for this operation'
- AUTH_SESSION_EXPIRED = 'Session has expired, please re-authenticate'
- AUTH_TOKEN_EXPIRED = 'Authentication token has expired'
- AUTH_TOKEN_INVALID = 'Invalid authentication token'
- CONFIG_FILE_NOT_FOUND = 'Configuration file not found: {path}'
- CONFIG_INVALID = 'Invalid configuration value for {key}: {value}'
- CONFIG_MISSING = 'Missing configuration: {key}'
- CONFIG_PARSE_ERROR = 'Failed to parse configuration: {reason}'
- CONN_FAILED = 'Failed to connect to ProjectX API'
- CONN_LOST = 'Lost connection to server'
- CONN_REFUSED = 'Connection refused by server'
- CONN_SSL_ERROR = 'SSL/TLS connection error'
- CONN_TIMEOUT = 'Connection timed out'
- DATA_INVALID_TYPE = 'Invalid type for {field}: expected {expected}, got {actual}'
- DATA_INVALID_VALUE = 'Invalid value for {field}: {value}'
- DATA_MISSING_FIELD = 'Missing required field: {field}'
- DATA_PARSE_ERROR = 'Failed to parse {data_type}: {reason}'
- DATA_VALIDATION_FAILED = 'Data validation failed: {errors}'
- INSTRUMENT_INVALID_SYMBOL = 'Invalid symbol format: {symbol}'
- INSTRUMENT_MARKET_CLOSED = 'Market closed for {symbol}'
- INSTRUMENT_NOT_FOUND = 'Instrument not found: {symbol}'
- INSTRUMENT_NOT_TRADEABLE = 'Instrument not tradeable: {symbol}'
- INTERNAL_ERROR = 'Internal error: {reason}'
- INVALID_STATE = 'Invalid state for operation: {state}'
- NOT_IMPLEMENTED = 'Feature not implemented: {feature}'
- OPERATION_FAILED = 'Operation failed: {operation}'
- ORDER_ALREADY_CANCELLED = 'Order already cancelled: {order_id}'
- ORDER_ALREADY_FILLED = 'Order already filled: {order_id}'
- ORDER_CANCEL_FAILED = 'Failed to cancel order {order_id}: {reason}'
- ORDER_FAILED = 'Order placement failed'
- ORDER_INSUFFICIENT_MARGIN = 'Insufficient margin for order'
- ORDER_INVALID_PRICE = 'Invalid order price: {price}'
- ORDER_INVALID_SIDE = 'Invalid order side: {side}'
- ORDER_INVALID_SIZE = 'Invalid order size: {size}'
- ORDER_INVALID_TYPE = 'Invalid order type: {order_type}'
- ORDER_MARKET_CLOSED = 'Market is closed for {instrument}'
- ORDER_MODIFY_FAILED = 'Failed to modify order {order_id}: {reason}'
- ORDER_NOT_FOUND = 'Order not found: {order_id}'
- ORDER_NO_ACCOUNT = 'No account information available'
- ORDER_RISK_EXCEEDED = 'Order exceeds risk limits'
- ORDER_SEARCH_FAILED = 'Order search failed'
- POSITION_ALREADY_CLOSED = 'Position already closed: {position_id}'
- POSITION_INSUFFICIENT_SIZE = 'Insufficient position size for operation'
- POSITION_NOT_FOUND = 'Position not found: {position_id}'
- POSITION_WRONG_SIDE = 'Operation not allowed for {side} position'
- TIMEOUT = 'Operation timed out after {timeout}s'
- WS_AUTHENTICATION_FAILED = 'WebSocket authentication failed'
- WS_CONNECTION_FAILED = 'WebSocket connection failed: {reason}'
- WS_MESSAGE_PARSE_ERROR = 'Failed to parse WebSocket message'
- WS_SUBSCRIPTION_FAILED = 'Failed to subscribe to {channel}: {reason}'
- WS_UNEXPECTED_CLOSE = 'WebSocket closed unexpectedly: {code} - {reason}'
- class LogContext(logger, **context)[source]
Bases:
object
Context manager for adding consistent context to log messages.
Example
- with LogContext(logger, operation=”fetch_orders”, user_id=123):
# All log messages in this block will include the context logger.info(“Starting order fetch”)
- __init__(logger, **context)[source]
- class LogMessages[source]
Bases:
object
Standard log messages for common operations.
- API_ERROR = 'API request failed'
- API_REQUEST = 'Making API request'
- API_RESPONSE = 'Received API response'
- AUTH_FAILED = 'Authentication failed'
- AUTH_REFRESH = 'Refreshing authentication token'
- AUTH_START = 'Starting authentication'
- AUTH_SUCCESS = 'Authentication successful'
- AUTH_TOKEN_PARSE_FAILED = 'Failed to parse authentication token expiry'
- CACHE_CLEAR = 'Clearing cache'
- CACHE_HIT = 'Cache hit'
- CACHE_MISS = 'Cache miss'
- CACHE_UPDATE = 'Updating cache'
- CALLBACK_REGISTERED = 'Callback registered'
- CALLBACK_REMOVED = 'Callback removed'
- CLEANUP_COMPLETE = 'Cleanup completed'
- DATA_ERROR = 'Market data fetch failed'
- DATA_FETCH = 'Fetching market data'
- DATA_RECEIVED = 'Market data received'
- DATA_SUBSCRIBE = 'Subscribing to market data'
- DATA_UNSUBSCRIBE = 'Unsubscribing from market data'
- ERROR_HANDLED = 'Error handled'
- ERROR_MAX_RETRY = 'Maximum retries exceeded'
- ERROR_RETRY = 'Retrying after error'
- ERROR_UNHANDLED = 'Unhandled error'
- MANAGER_INITIALIZED = 'Manager initialized'
- ORDER_CANCEL = 'Cancelling order'
- ORDER_CANCELLED = 'Order cancelled successfully'
- ORDER_CANCEL_ALL = 'Cancelling all orders'
- ORDER_CANCEL_ALL_COMPLETE = 'Cancel all orders complete'
- ORDER_ERROR = 'Order operation failed'
- ORDER_MODIFIED = 'Order modified successfully'
- ORDER_MODIFY = 'Modifying order'
- ORDER_PLACE = 'Placing order'
- ORDER_PLACED = 'Order placed successfully'
- POSITION_CLOSE = 'Closing position'
- POSITION_CLOSED = 'Position closed successfully'
- POSITION_ERROR = 'Position operation failed'
- POSITION_OPEN = 'Opening position'
- POSITION_OPENED = 'Position opened successfully'
- POSITION_REFRESH = 'Refreshing positions'
- POSITION_SEARCH = 'Searching positions'
- POSITION_UPDATE = 'Position updated'
- RATE_LIMIT_HIT = 'Rate limit reached'
- RATE_LIMIT_RESET = 'Rate limit reset'
- RATE_LIMIT_WAIT = 'Waiting for rate limit reset'
- WS_CONNECT = 'Connecting to WebSocket'
- WS_CONNECTED = 'WebSocket connected'
- WS_DISCONNECT = 'Disconnecting from WebSocket'
- WS_DISCONNECTED = 'WebSocket disconnected'
- WS_ERROR = 'WebSocket error'
- WS_RECONNECT = 'Reconnecting WebSocket'
- class ProjectXLogger[source]
Bases:
object
Factory for creating configured loggers with consistent settings.
- static get_logger(name, level=None, handler=None)[source]
Get a configured logger instance.
- class RateLimiter(max_requests, window_seconds)[source]
Bases:
object
Async rate limiter using sliding window algorithm.
This rate limiter implements a sliding window algorithm that tracks the exact timestamp of each request. It ensures that at any point in time, no more than max_requests have been made in the past window_seconds.
- Features:
Thread-safe using asyncio locks
Accurate sliding window implementation
Automatic cleanup of old request timestamps
Memory-efficient with bounded history
Zero CPU usage while waiting
- Parameters:
Example
>>> # Create a rate limiter for 10 requests per second >>> limiter = RateLimiter(max_requests=10, window_seconds=1) >>> >>> # Use in an async function >>> async def rate_limited_operation(): ... await limiter.acquire() ... # Perform operation here ... return "Success" >>> >>> # The limiter will automatically delay if needed >>> async def bulk_operations(): ... tasks = [rate_limited_operation() for _ in range(50)] ... results = await asyncio.gather(*tasks) ... # This will take ~5 seconds (50 requests / 10 per second)
- __init__(max_requests, window_seconds)[source]
- calculate_correlation_matrix(data, columns=None, method='pearson')[source]
Calculate correlation matrix for specified columns.
- Parameters:
- Return type:
DataFrame
- Returns:
DataFrame with correlation matrix
Example
>>> corr_matrix = calculate_correlation_matrix( ... ohlcv_data, ["open", "high", "low", "close"] ... ) >>> print(corr_matrix)
- calculate_max_drawdown(data, price_column='close')[source]
Calculate maximum drawdown.
- Parameters:
data (
DataFrame
) – DataFrame with price dataprice_column (
str
) – Price column name
- Return type:
- Returns:
Dict with drawdown metrics
Example
>>> dd_metrics = calculate_max_drawdown(ohlcv_data) >>> print(f"Max Drawdown: {dd_metrics['max_drawdown']:.2%}")
- calculate_portfolio_metrics(trades, initial_balance=100000.0)[source]
Calculate comprehensive portfolio performance metrics.
- Parameters:
- Return type:
- Returns:
Dict with portfolio metrics
Example
>>> trades = [ ... {"pnl": 500, "size": 1, "timestamp": "2024-01-01"}, ... {"pnl": -200, "size": 2, "timestamp": "2024-01-02"}, ... ] >>> metrics = calculate_portfolio_metrics(trades) >>> print(f"Total Return: {metrics['total_return']:.2%}")
- calculate_position_sizing(account_balance, risk_per_trade, entry_price, stop_loss_price, tick_value=1.0)[source]
Calculate optimal position size based on risk management.
- Parameters:
- Return type:
- Returns:
Dict with position sizing information
Example
>>> sizing = calculate_position_sizing(50000, 0.02, 2050, 2040, 1.0) >>> print(f"Position size: {sizing['position_size']} contracts")
- calculate_position_value(size, price, tick_value, tick_size)[source]
Calculate total dollar value of a position.
- Parameters:
- Returns:
Total position value in dollars
- Return type:
Example
>>> # 5 MGC contracts at $2050 >>> calculate_position_value(5, 2050.0, 1.0, 0.1) 102500.0
- calculate_risk_reward_ratio(entry_price, stop_price, target_price)[source]
Calculate risk/reward ratio for a trade setup.
- Parameters:
- Returns:
Risk/reward ratio (reward / risk)
- Return type:
- Raises:
ValueError – If prices are invalid (e.g., stop/target inversion)
Example
>>> # Long trade: entry=2050, stop=2045, target=2065 >>> calculate_risk_reward_ratio(2050, 2045, 2065) 3.0
- calculate_sharpe_ratio(data, return_column='returns', risk_free_rate=0.02, periods_per_year=252)[source]
Calculate Sharpe ratio.
- Parameters:
- Return type:
- Returns:
Sharpe ratio
Example
>>> # First calculate returns >>> data = data.with_columns(pl.col("close").pct_change().alias("returns")) >>> sharpe = calculate_sharpe_ratio(data) >>> print(f"Sharpe Ratio: {sharpe:.2f}")
- calculate_tick_value(price_change, tick_size, tick_value)[source]
Calculate dollar value of a price change.
- Parameters:
- Returns:
Dollar value of the price change
- Return type:
Example
>>> # MGC moves 5 ticks >>> calculate_tick_value(0.5, 0.1, 1.0) 5.0
- calculate_volatility_metrics(data, price_column='close', return_column=None, window=20)[source]
Calculate various volatility metrics.
- Parameters:
- Return type:
- Returns:
Dict with volatility metrics
Example
>>> vol_metrics = calculate_volatility_metrics(ohlcv_data) >>> print(f"Annualized Volatility: {vol_metrics['annualized_volatility']:.2%}")
- configure_sdk_logging(level=20, format_json=False, log_file=None)[source]
Configure logging for the entire SDK.
- convert_timeframe_to_seconds(timeframe)[source]
Convert timeframe string to seconds.
- Parameters:
timeframe (
str
) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)- Returns:
Timeframe in seconds
- Return type:
Example
>>> convert_timeframe_to_seconds("5min") 300 >>> convert_timeframe_to_seconds("1hr") 3600
- create_data_snapshot(data, description='')[source]
Create a comprehensive snapshot of DataFrame for debugging/analysis.
- Parameters:
data (
DataFrame
) – Polars DataFramedescription (
str
) – Optional description
- Returns:
Data snapshot with statistics
- Return type:
Example
>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data") >>> print(f"Rows: {snapshot['row_count']}") >>> print(f"Timespan: {snapshot['timespan']}")
- detect_candlestick_patterns(data, open_col='open', high_col='high', low_col='low', close_col='close')[source]
Detect basic candlestick patterns.
- Parameters:
- Return type:
DataFrame
- Returns:
DataFrame with pattern detection columns added
Example
>>> patterns = detect_candlestick_patterns(ohlcv_data) >>> doji_count = patterns.filter(pl.col("doji") == True).height >>> print(f"Doji patterns found: {doji_count}")
- detect_chart_patterns(data, price_column='close', window=20)[source]
Detect basic chart patterns.
- Parameters:
- Return type:
- Returns:
Dict with detected patterns and their locations
Example
>>> patterns = detect_chart_patterns(ohlcv_data) >>> print(f"Double tops found: {len(patterns['double_tops'])}")
- extract_symbol_from_contract_id(contract_id)[source]
Extract the base symbol from a full contract ID.
- Parameters:
contract_id (
str
) – Full contract ID or symbol- Returns:
Base symbol (e.g., “MGC” from “CON.F.US.MGC.M25”) None: If extraction fails
- Return type:
Example
>>> extract_symbol_from_contract_id("CON.F.US.MGC.M25") 'MGC' >>> extract_symbol_from_contract_id("MGC") 'MGC'
- format_error_message(template, **kwargs)[source]
Format an error message template with provided values.
- Parameters:
- Return type:
- Returns:
Formatted error message
Example
>>> format_error_message( ... ErrorMessages.API_RESOURCE_NOT_FOUND, resource="order/123" ... ) "Resource not found: order/123"
- get_env_var(name, default=None, required=False)[source]
Get environment variable with optional default and validation.
- Parameters:
- Return type:
- Returns:
Environment variable value
- Raises:
ValueError – If required variable is missing
- get_market_session_info(timezone='America/Chicago')[source]
Get detailed market session information.
Example
>>> info = get_market_session_info() >>> print(f"Market open: {info['is_open']}") >>> print(f"Next session: {info['next_session_start']}")
- get_polars_last_value(df, column)[source]
Get the last value from a polars DataFrame column safely.
- Return type:
- handle_errors(operation, logger=None, reraise=True, default_return=None)[source]
Decorator for consistent error handling across the SDK.
This decorator catches exceptions, logs them consistently, and optionally re-raises them with additional context.
Example
@handle_errors(“fetch market data”) async def get_bars(self, symbol: str):
# Implementation pass
- handle_rate_limit(logger=None, fallback_delay=60.0)[source]
Decorator to handle rate limit errors with automatic retry.
Example
@handle_rate_limit(fallback_delay=30.0) async def make_api_call(self):
# Implementation pass
- is_market_hours(timezone='America/Chicago')[source]
Check if it’s currently market hours (CME futures).
- log_api_call(logger, method, endpoint, status_code=None, duration=None, error=None, **extra)[source]
Log API call with standard format.
- Parameters:
- Return type:
- retry_on_network_error(max_attempts=3, backoff_factor=2.0, initial_delay=1.0, max_delay=60.0, retry_on=(<class 'httpx.ConnectError'>, <class 'httpx.TimeoutException'>, <class 'project_x_py.exceptions.ProjectXConnectionError'>, <class 'project_x_py.exceptions.ProjectXServerError'>))[source]
Decorator to retry operations on network errors with exponential backoff.
- Parameters:
max_attempts (
int
) – Maximum number of retry attemptsbackoff_factor (
float
) – Multiplier for exponential backoffinitial_delay (
float
) – Initial delay between retries in secondsmax_delay (
float
) – Maximum delay between retries in secondsretry_on (
tuple
[type
[Exception
],...
]) – Tuple of exception types to retry on
- Return type:
Callable
[[Callable
[...
,TypeVar
(T
)]],Callable
[...
,TypeVar
(T
)]]
Example
@retry_on_network_error(max_attempts=5, initial_delay=0.5) async def api_call(self):
# Implementation pass
- round_to_tick_size(price, tick_size)[source]
Round price to nearest valid tick.
- Parameters:
- Returns:
Price rounded to nearest tick
- Return type:
- Raises:
ValueError – If tick_size is not positive or price is negative
Example
>>> round_to_tick_size(2050.37, 0.1) 2050.4
- setup_logging(level='INFO', format_string=None, filename=None)[source]
Set up logging configuration for the ProjectX client.
- validate_contract_id(contract_id)[source]
Validate ProjectX contract ID format.
- Parameters:
contract_id (
str
) – Contract ID to validate- Returns:
True if valid format
- Return type:
Example
>>> validate_contract_id("CON.F.US.MGC.M25") True >>> validate_contract_id("MGC") True >>> validate_contract_id("invalid.contract") False
- validate_response(required_fields=None, response_type=None)[source]
Decorator to validate API response structure.
Example
@validate_response(required_fields=[“id”, “status”], response_type=dict) async def get_order(self, order_id: str):
# Implementation pass
Price and Position Utilities
- calculate_tick_value(price_change, tick_size, tick_value)[source]
Calculate dollar value of a price change.
- Parameters:
- Returns:
Dollar value of the price change
- Return type:
Example
>>> # MGC moves 5 ticks >>> calculate_tick_value(0.5, 0.1, 1.0) 5.0
- calculate_position_value(size, price, tick_value, tick_size)[source]
Calculate total dollar value of a position.
- Parameters:
- Returns:
Total position value in dollars
- Return type:
Example
>>> # 5 MGC contracts at $2050 >>> calculate_position_value(5, 2050.0, 1.0, 0.1) 102500.0
- round_to_tick_size(price, tick_size)[source]
Round price to nearest valid tick.
- Parameters:
- Returns:
Price rounded to nearest tick
- Return type:
- Raises:
ValueError – If tick_size is not positive or price is negative
Example
>>> round_to_tick_size(2050.37, 0.1) 2050.4
Risk Management
- calculate_risk_reward_ratio(entry_price, stop_price, target_price)[source]
Calculate risk/reward ratio for a trade setup.
- Parameters:
- Returns:
Risk/reward ratio (reward / risk)
- Return type:
- Raises:
ValueError – If prices are invalid (e.g., stop/target inversion)
Example
>>> # Long trade: entry=2050, stop=2045, target=2065 >>> calculate_risk_reward_ratio(2050, 2045, 2065) 3.0
- calculate_position_sizing(account_balance, risk_per_trade, entry_price, stop_loss_price, tick_value=1.0)[source]
Calculate optimal position size based on risk management.
- Parameters:
- Return type:
- Returns:
Dict with position sizing information
Example
>>> sizing = calculate_position_sizing(50000, 0.02, 2050, 2040, 1.0) >>> print(f"Position size: {sizing['position_size']} contracts")
- calculate_max_drawdown(data, price_column='close')[source]
Calculate maximum drawdown.
- Parameters:
data (
DataFrame
) – DataFrame with price dataprice_column (
str
) – Price column name
- Return type:
- Returns:
Dict with drawdown metrics
Example
>>> dd_metrics = calculate_max_drawdown(ohlcv_data) >>> print(f"Max Drawdown: {dd_metrics['max_drawdown']:.2%}")
- calculate_sharpe_ratio(data, return_column='returns', risk_free_rate=0.02, periods_per_year=252)[source]
Calculate Sharpe ratio.
- Parameters:
- Return type:
- Returns:
Sharpe ratio
Example
>>> # First calculate returns >>> data = data.with_columns(pl.col("close").pct_change().alias("returns")) >>> sharpe = calculate_sharpe_ratio(data) >>> print(f"Sharpe Ratio: {sharpe:.2f}")
Portfolio Analysis
- calculate_portfolio_metrics(trades, initial_balance=100000.0)[source]
Calculate comprehensive portfolio performance metrics.
- Parameters:
- Return type:
- Returns:
Dict with portfolio metrics
Example
>>> trades = [ ... {"pnl": 500, "size": 1, "timestamp": "2024-01-01"}, ... {"pnl": -200, "size": 2, "timestamp": "2024-01-02"}, ... ] >>> metrics = calculate_portfolio_metrics(trades) >>> print(f"Total Return: {metrics['total_return']:.2%}")
- calculate_correlation_matrix(data, columns=None, method='pearson')[source]
Calculate correlation matrix for specified columns.
- Parameters:
- Return type:
DataFrame
- Returns:
DataFrame with correlation matrix
Example
>>> corr_matrix = calculate_correlation_matrix( ... ohlcv_data, ["open", "high", "low", "close"] ... ) >>> print(corr_matrix)
Contract Utilities
- validate_contract_id(contract_id)[source]
Validate ProjectX contract ID format.
- Parameters:
contract_id (
str
) – Contract ID to validate- Returns:
True if valid format
- Return type:
Example
>>> validate_contract_id("CON.F.US.MGC.M25") True >>> validate_contract_id("MGC") True >>> validate_contract_id("invalid.contract") False
- extract_symbol_from_contract_id(contract_id)[source]
Extract the base symbol from a full contract ID.
- Parameters:
contract_id (
str
) – Full contract ID or symbol- Returns:
Base symbol (e.g., “MGC” from “CON.F.US.MGC.M25”) None: If extraction fails
- Return type:
Example
>>> extract_symbol_from_contract_id("CON.F.US.MGC.M25") 'MGC' >>> extract_symbol_from_contract_id("MGC") 'MGC'
Market Session
Data Processing
- create_data_snapshot(data, description='')[source]
Create a comprehensive snapshot of DataFrame for debugging/analysis.
- Parameters:
data (
DataFrame
) – Polars DataFramedescription (
str
) – Optional description
- Returns:
Data snapshot with statistics
- Return type:
Example
>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data") >>> print(f"Rows: {snapshot['row_count']}") >>> print(f"Timespan: {snapshot['timespan']}")
- convert_timeframe_to_seconds(timeframe)[source]
Convert timeframe string to seconds.
- Parameters:
timeframe (
str
) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)- Returns:
Timeframe in seconds
- Return type:
Example
>>> convert_timeframe_to_seconds("5min") 300 >>> convert_timeframe_to_seconds("1hr") 3600
Logging and Configuration
- setup_logging(level='INFO', format_string=None, filename=None)[source]
Set up logging configuration for the ProjectX client.
- get_env_var(name, default=None, required=False)[source]
Get environment variable with optional default and validation.
- Parameters:
- Return type:
- Returns:
Environment variable value
- Raises:
ValueError – If required variable is missing
Rate Limiting
- class RateLimiter(max_requests, window_seconds)[source]
Bases:
object
Async rate limiter using sliding window algorithm.
This rate limiter implements a sliding window algorithm that tracks the exact timestamp of each request. It ensures that at any point in time, no more than max_requests have been made in the past window_seconds.
- Features:
Thread-safe using asyncio locks
Accurate sliding window implementation
Automatic cleanup of old request timestamps
Memory-efficient with bounded history
Zero CPU usage while waiting
- Parameters:
Example
>>> # Create a rate limiter for 10 requests per second >>> limiter = RateLimiter(max_requests=10, window_seconds=1) >>> >>> # Use in an async function >>> async def rate_limited_operation(): ... await limiter.acquire() ... # Perform operation here ... return "Success" >>> >>> # The limiter will automatically delay if needed >>> async def bulk_operations(): ... tasks = [rate_limited_operation() for _ in range(50)] ... results = await asyncio.gather(*tasks) ... # This will take ~5 seconds (50 requests / 10 per second)