Utilities

Utility functions and helper tools for trading, analysis, and data processing.

ProjectX Utility Functions

Author: @TexasCoding Date: 2025-08-02

Overview:

Contains utility functions used throughout the ProjectX client. Provides generic, reusable functionality for data processing, error handling, logging, market analysis, and trading calculations. All utilities are stateless, pure functions designed for cross-module compatibility.

Key Features:
  • Generic utility functions for data processing and manipulation

  • Comprehensive error handling and logging utilities

  • Market analysis and pattern detection tools

  • Trading calculations and risk management utilities

  • Environment configuration and validation helpers

  • Rate limiting and performance optimization tools

Utility Categories:
  • Core Utilities: Rate limiting, data manipulation, environment handling

  • Error Handling: Decorators, context managers, standardized error messages

  • Logging: Structured logging, performance monitoring, API call tracking

  • Market Analysis: Session info, contract validation, pattern detection

  • Portfolio Analytics: Performance metrics, correlation analysis, drawdown calculation

  • Trading Calculations: Position sizing, risk management, tick value calculations

Example Usage:

```python from project_x_py.utils import (

# Error handling handle_errors, retry_on_network_error, ErrorContext, # Trading calculations calculate_tick_value, calculate_position_sizing, round_to_tick_size, # Market utilities is_market_hours, get_market_session_info, validate_contract_id, # Portfolio analytics calculate_sharpe_ratio, calculate_max_drawdown, calculate_portfolio_metrics, # Pattern detection detect_candlestick_patterns, detect_chart_patterns, # Data utilities create_data_snapshot, get_polars_last_value,

)

# Use error handling decorators @handle_errors(“fetch market data”) async def get_market_data():

# Implementation pass

# Use trading calculations tick_value = calculate_tick_value(0.5, 0.1, 1.0) # 5 ticks position_size = calculate_position_sizing(50000, 0.02, 2050, 2040)

# Use market utilities if is_market_hours():

print(“Market is open”)

# Use portfolio analytics sharpe = calculate_sharpe_ratio(returns_data) drawdown = calculate_max_drawdown(price_data) ```

Architecture Principles:
  • Generic and reusable across different contexts

  • Work with standard data types (DataFrames, numbers, strings)

  • No domain-specific knowledge or dependencies

  • Stateless and pure functions

  • Comprehensive error handling and logging

Note: Technical indicators have been moved to the indicators module for better organization.

See also

  • utils.async_rate_limiter: Rate limiting for API calls

  • utils.error_handler: Error handling decorators and context managers

  • utils.trading_calculations: Trading math and risk management

  • utils.portfolio_analytics: Performance metrics and analysis

  • utils.market_utils: Market session and contract utilities

  • utils.pattern_detection: Technical pattern detection

class ErrorCode[source]

Bases: object

Standard error codes for categorizing errors.

API_BAD_REQUEST = 3400
API_FORBIDDEN = 3403
API_NOT_FOUND = 3404
API_RATE_LIMIT = 3429
API_SERVER_ERROR = 3500
AUTH_EXPIRED = 1003
AUTH_INVALID = 1002
AUTH_PERMISSION = 1004
AUTH_REQUIRED = 1001
CONN_FAILED = 2001
CONN_LOST = 2003
CONN_SSL = 2004
CONN_TIMEOUT = 2002
DATA_INVALID = 4004
DATA_MISSING = 4003
DATA_PARSING = 4002
DATA_VALIDATION = 4001
INTERNAL_ERROR = 9001
INVALID_STATE = 9003
NOT_IMPLEMENTED = 9002
ORDER_INVALID = 5001
ORDER_NOT_FOUND = 5002
ORDER_REJECTED = 5003
POSITION_INVALID = 5101
POSITION_NOT_FOUND = 5102
WS_AUTH = 6002
WS_CONNECTION = 6001
WS_MESSAGE = 6004
WS_SUBSCRIPTION = 6003
class ErrorContext(operation, logger=None)[source]

Bases: object

Context manager for handling errors in batch operations.

Collects errors during batch processing and provides summary.

Example

async with ErrorContext(“process orders”) as ctx:
for order in orders:
try:

await process_order(order)

except Exception as e:

ctx.add_error(order.id, e)

if ctx.has_errors:

logger.error(f”Failed to process {ctx.error_count} orders”)

__init__(operation, logger=None)[source]
add_error(context, error)[source]

Add an error to the context.

Return type:

None

property error_count: int

Get the number of errors collected.

get_summary()[source]

Get a summary of all errors.

Return type:

str

property has_errors: bool

Check if any errors were collected.

class ErrorMessages[source]

Bases: object

Standardized error messages for common scenarios.

ACCOUNT_NOT_FOUND = "Account '{account_name}' not found. Available accounts: {available_accounts}"
API_INVALID_ENDPOINT = 'Invalid API endpoint: {endpoint}'
API_INVALID_REQUEST = 'Invalid request: {reason}'
API_METHOD_NOT_ALLOWED = 'HTTP method {method} not allowed for {endpoint}'
API_RATE_LIMITED = 'Rate limit exceeded, retry after {retry_after}s'
API_REQUEST_FAILED = 'API request failed'
API_RESOURCE_NOT_FOUND = 'Resource not found: {resource}'
API_SERVER_ERROR = 'Server error: {status_code} - {message}'
AUTH_FAILED = 'Authentication failed'
AUTH_INVALID_CREDENTIALS = 'Invalid authentication credentials'
AUTH_MISSING_CREDENTIALS = 'Missing authentication credentials'
AUTH_NO_ACCOUNTS = 'No accounts found for user'
AUTH_PERMISSION_DENIED = 'Permission denied for this operation'
AUTH_SESSION_EXPIRED = 'Session has expired, please re-authenticate'
AUTH_TOKEN_EXPIRED = 'Authentication token has expired'
AUTH_TOKEN_INVALID = 'Invalid authentication token'
CONFIG_FILE_NOT_FOUND = 'Configuration file not found: {path}'
CONFIG_INVALID = 'Invalid configuration value for {key}: {value}'
CONFIG_MISSING = 'Missing configuration: {key}'
CONFIG_PARSE_ERROR = 'Failed to parse configuration: {reason}'
CONN_FAILED = 'Failed to connect to ProjectX API'
CONN_LOST = 'Lost connection to server'
CONN_REFUSED = 'Connection refused by server'
CONN_SSL_ERROR = 'SSL/TLS connection error'
CONN_TIMEOUT = 'Connection timed out'
DATA_INVALID_TYPE = 'Invalid type for {field}: expected {expected}, got {actual}'
DATA_INVALID_VALUE = 'Invalid value for {field}: {value}'
DATA_MISSING_FIELD = 'Missing required field: {field}'
DATA_PARSE_ERROR = 'Failed to parse {data_type}: {reason}'
DATA_VALIDATION_FAILED = 'Data validation failed: {errors}'
INSTRUMENT_INVALID_SYMBOL = 'Invalid symbol format: {symbol}'
INSTRUMENT_MARKET_CLOSED = 'Market closed for {symbol}'
INSTRUMENT_NOT_FOUND = 'Instrument not found: {symbol}'
INSTRUMENT_NOT_TRADEABLE = 'Instrument not tradeable: {symbol}'
INTERNAL_ERROR = 'Internal error: {reason}'
INVALID_STATE = 'Invalid state for operation: {state}'
NOT_IMPLEMENTED = 'Feature not implemented: {feature}'
OPERATION_FAILED = 'Operation failed: {operation}'
ORDER_ALREADY_CANCELLED = 'Order already cancelled: {order_id}'
ORDER_ALREADY_FILLED = 'Order already filled: {order_id}'
ORDER_CANCEL_FAILED = 'Failed to cancel order {order_id}: {reason}'
ORDER_FAILED = 'Order placement failed'
ORDER_INSUFFICIENT_MARGIN = 'Insufficient margin for order'
ORDER_INVALID_PRICE = 'Invalid order price: {price}'
ORDER_INVALID_SIDE = 'Invalid order side: {side}'
ORDER_INVALID_SIZE = 'Invalid order size: {size}'
ORDER_INVALID_TYPE = 'Invalid order type: {order_type}'
ORDER_MARKET_CLOSED = 'Market is closed for {instrument}'
ORDER_MODIFY_FAILED = 'Failed to modify order {order_id}: {reason}'
ORDER_NOT_FOUND = 'Order not found: {order_id}'
ORDER_NO_ACCOUNT = 'No account information available'
ORDER_RISK_EXCEEDED = 'Order exceeds risk limits'
ORDER_SEARCH_FAILED = 'Order search failed'
POSITION_ALREADY_CLOSED = 'Position already closed: {position_id}'
POSITION_INSUFFICIENT_SIZE = 'Insufficient position size for operation'
POSITION_NOT_FOUND = 'Position not found: {position_id}'
POSITION_WRONG_SIDE = 'Operation not allowed for {side} position'
TIMEOUT = 'Operation timed out after {timeout}s'
WS_AUTHENTICATION_FAILED = 'WebSocket authentication failed'
WS_CONNECTION_FAILED = 'WebSocket connection failed: {reason}'
WS_MESSAGE_PARSE_ERROR = 'Failed to parse WebSocket message'
WS_SUBSCRIPTION_FAILED = 'Failed to subscribe to {channel}: {reason}'
WS_UNEXPECTED_CLOSE = 'WebSocket closed unexpectedly: {code} - {reason}'
class LogContext(logger, **context)[source]

Bases: object

Context manager for adding consistent context to log messages.

Example

with LogContext(logger, operation=”fetch_orders”, user_id=123):

# All log messages in this block will include the context logger.info(“Starting order fetch”)

__init__(logger, **context)[source]
class LogMessages[source]

Bases: object

Standard log messages for common operations.

API_ERROR = 'API request failed'
API_REQUEST = 'Making API request'
API_RESPONSE = 'Received API response'
AUTH_FAILED = 'Authentication failed'
AUTH_REFRESH = 'Refreshing authentication token'
AUTH_START = 'Starting authentication'
AUTH_SUCCESS = 'Authentication successful'
AUTH_TOKEN_PARSE_FAILED = 'Failed to parse authentication token expiry'
CACHE_CLEAR = 'Clearing cache'
CACHE_HIT = 'Cache hit'
CACHE_MISS = 'Cache miss'
CACHE_UPDATE = 'Updating cache'
CALLBACK_REGISTERED = 'Callback registered'
CALLBACK_REMOVED = 'Callback removed'
CLEANUP_COMPLETE = 'Cleanup completed'
DATA_ERROR = 'Market data fetch failed'
DATA_FETCH = 'Fetching market data'
DATA_RECEIVED = 'Market data received'
DATA_SUBSCRIBE = 'Subscribing to market data'
DATA_UNSUBSCRIBE = 'Unsubscribing from market data'
ERROR_HANDLED = 'Error handled'
ERROR_MAX_RETRY = 'Maximum retries exceeded'
ERROR_RETRY = 'Retrying after error'
ERROR_UNHANDLED = 'Unhandled error'
MANAGER_INITIALIZED = 'Manager initialized'
ORDER_CANCEL = 'Cancelling order'
ORDER_CANCELLED = 'Order cancelled successfully'
ORDER_CANCEL_ALL = 'Cancelling all orders'
ORDER_CANCEL_ALL_COMPLETE = 'Cancel all orders complete'
ORDER_ERROR = 'Order operation failed'
ORDER_MODIFIED = 'Order modified successfully'
ORDER_MODIFY = 'Modifying order'
ORDER_PLACE = 'Placing order'
ORDER_PLACED = 'Order placed successfully'
POSITION_CLOSE = 'Closing position'
POSITION_CLOSED = 'Position closed successfully'
POSITION_ERROR = 'Position operation failed'
POSITION_OPEN = 'Opening position'
POSITION_OPENED = 'Position opened successfully'
POSITION_REFRESH = 'Refreshing positions'
POSITION_SEARCH = 'Searching positions'
POSITION_UPDATE = 'Position updated'
RATE_LIMIT_HIT = 'Rate limit reached'
RATE_LIMIT_RESET = 'Rate limit reset'
RATE_LIMIT_WAIT = 'Waiting for rate limit reset'
WS_CONNECT = 'Connecting to WebSocket'
WS_CONNECTED = 'WebSocket connected'
WS_DISCONNECT = 'Disconnecting from WebSocket'
WS_DISCONNECTED = 'WebSocket disconnected'
WS_ERROR = 'WebSocket error'
WS_RECONNECT = 'Reconnecting WebSocket'
class ProjectXLogger[source]

Bases: object

Factory for creating configured loggers with consistent settings.

static get_logger(name, level=None, handler=None)[source]

Get a configured logger instance.

Parameters:
  • name (str) – Logger name (usually __name__)

  • level (int | None) – Logging level (defaults to INFO)

  • handler (Handler | None) – Custom handler (defaults to console)

Return type:

Logger

Returns:

Configured logger instance

class RateLimiter(max_requests, window_seconds)[source]

Bases: object

Async rate limiter using sliding window algorithm.

This rate limiter implements a sliding window algorithm that tracks the exact timestamp of each request. It ensures that at any point in time, no more than max_requests have been made in the past window_seconds.

Features:
  • Thread-safe using asyncio locks

  • Accurate sliding window implementation

  • Automatic cleanup of old request timestamps

  • Memory-efficient with bounded history

  • Zero CPU usage while waiting

Parameters:
  • max_requests (int) – Maximum number of requests allowed in the window

  • window_seconds (int) – Size of the sliding window in seconds

Example

>>> # Create a rate limiter for 10 requests per second
>>> limiter = RateLimiter(max_requests=10, window_seconds=1)
>>>
>>> # Use in an async function
>>> async def rate_limited_operation():
...     await limiter.acquire()
...     # Perform operation here
...     return "Success"
>>>
>>> # The limiter will automatically delay if needed
>>> async def bulk_operations():
...     tasks = [rate_limited_operation() for _ in range(50)]
...     results = await asyncio.gather(*tasks)
...     # This will take ~5 seconds (50 requests / 10 per second)
__init__(max_requests, window_seconds)[source]
async acquire()[source]

Wait if necessary to stay within rate limits.

Return type:

None

calculate_correlation_matrix(data, columns=None, method='pearson')[source]

Calculate correlation matrix for specified columns.

Parameters:
  • data (DataFrame) – DataFrame with numeric data

  • columns (list[str] | None) – Columns to include (default: all numeric columns)

  • method (str) – Correlation method (“pearson”, “spearman”)

Return type:

DataFrame

Returns:

DataFrame with correlation matrix

Example

>>> corr_matrix = calculate_correlation_matrix(
...     ohlcv_data, ["open", "high", "low", "close"]
... )
>>> print(corr_matrix)
calculate_max_drawdown(data, price_column='close')[source]

Calculate maximum drawdown.

Parameters:
  • data (DataFrame) – DataFrame with price data

  • price_column (str) – Price column name

Return type:

dict[str, Any]

Returns:

Dict with drawdown metrics

Example

>>> dd_metrics = calculate_max_drawdown(ohlcv_data)
>>> print(f"Max Drawdown: {dd_metrics['max_drawdown']:.2%}")
calculate_portfolio_metrics(trades, initial_balance=100000.0)[source]

Calculate comprehensive portfolio performance metrics.

Parameters:
  • trades (list[dict[str, Any]]) – List of trade dictionaries with ‘pnl’, ‘size’, ‘timestamp’ fields

  • initial_balance (float) – Starting portfolio balance

Return type:

dict[str, Any]

Returns:

Dict with portfolio metrics

Example

>>> trades = [
...     {"pnl": 500, "size": 1, "timestamp": "2024-01-01"},
...     {"pnl": -200, "size": 2, "timestamp": "2024-01-02"},
... ]
>>> metrics = calculate_portfolio_metrics(trades)
>>> print(f"Total Return: {metrics['total_return']:.2%}")
calculate_position_sizing(account_balance, risk_per_trade, entry_price, stop_loss_price, tick_value=1.0)[source]

Calculate optimal position size based on risk management.

Parameters:
  • account_balance (float) – Current account balance

  • risk_per_trade (float) – Risk per trade as decimal (e.g., 0.02 for 2%)

  • entry_price (float) – Entry price for the trade

  • stop_loss_price (float) – Stop loss price

  • tick_value (float) – Dollar value per tick

Return type:

dict[str, Any]

Returns:

Dict with position sizing information

Example

>>> sizing = calculate_position_sizing(50000, 0.02, 2050, 2040, 1.0)
>>> print(f"Position size: {sizing['position_size']} contracts")
calculate_position_value(size, price, tick_value, tick_size)[source]

Calculate total dollar value of a position.

Parameters:
  • size (int) – Number of contracts

  • price (float) – Current price

  • tick_value (float) – Dollar value per tick

  • tick_size (float) – Minimum price movement

Returns:

Total position value in dollars

Return type:

float

Example

>>> # 5 MGC contracts at $2050
>>> calculate_position_value(5, 2050.0, 1.0, 0.1)
102500.0
calculate_risk_reward_ratio(entry_price, stop_price, target_price)[source]

Calculate risk/reward ratio for a trade setup.

Parameters:
  • entry_price (float) – Entry price

  • stop_price (float) – Stop loss price

  • target_price (float) – Profit target price

Returns:

Risk/reward ratio (reward / risk)

Return type:

float

Raises:

ValueError – If prices are invalid (e.g., stop/target inversion)

Example

>>> # Long trade: entry=2050, stop=2045, target=2065
>>> calculate_risk_reward_ratio(2050, 2045, 2065)
3.0
calculate_sharpe_ratio(data, return_column='returns', risk_free_rate=0.02, periods_per_year=252)[source]

Calculate Sharpe ratio.

Parameters:
  • data (DataFrame) – DataFrame with returns data

  • return_column (str) – Returns column name

  • risk_free_rate (float) – Annual risk-free rate

  • periods_per_year (int) – Number of periods per year

Return type:

float

Returns:

Sharpe ratio

Example

>>> # First calculate returns
>>> data = data.with_columns(pl.col("close").pct_change().alias("returns"))
>>> sharpe = calculate_sharpe_ratio(data)
>>> print(f"Sharpe Ratio: {sharpe:.2f}")
calculate_tick_value(price_change, tick_size, tick_value)[source]

Calculate dollar value of a price change.

Parameters:
  • price_change (float) – Price difference

  • tick_size (float) – Minimum price movement

  • tick_value (float) – Dollar value per tick

Returns:

Dollar value of the price change

Return type:

float

Example

>>> # MGC moves 5 ticks
>>> calculate_tick_value(0.5, 0.1, 1.0)
5.0
calculate_volatility_metrics(data, price_column='close', return_column=None, window=20)[source]

Calculate various volatility metrics.

Parameters:
  • data (DataFrame) – DataFrame with price data

  • price_column (str) – Price column for calculations

  • return_column (str | None) – Pre-calculated returns column (optional)

  • window (int) – Window for rolling calculations

Return type:

dict[str, Any]

Returns:

Dict with volatility metrics

Example

>>> vol_metrics = calculate_volatility_metrics(ohlcv_data)
>>> print(f"Annualized Volatility: {vol_metrics['annualized_volatility']:.2%}")
configure_sdk_logging(level=20, format_json=False, log_file=None)[source]

Configure logging for the entire SDK.

Parameters:
  • level (int) – Logging level

  • format_json (bool) – Use JSON formatting

  • log_file (str | None) – Optional log file path

Return type:

None

convert_timeframe_to_seconds(timeframe)[source]

Convert timeframe string to seconds.

Parameters:

timeframe (str) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)

Returns:

Timeframe in seconds

Return type:

int

Example

>>> convert_timeframe_to_seconds("5min")
300
>>> convert_timeframe_to_seconds("1hr")
3600
create_data_snapshot(data, description='')[source]

Create a comprehensive snapshot of DataFrame for debugging/analysis.

Parameters:
  • data (DataFrame) – Polars DataFrame

  • description (str) – Optional description

Returns:

Data snapshot with statistics

Return type:

dict

Example

>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data")
>>> print(f"Rows: {snapshot['row_count']}")
>>> print(f"Timespan: {snapshot['timespan']}")
detect_candlestick_patterns(data, open_col='open', high_col='high', low_col='low', close_col='close')[source]

Detect basic candlestick patterns.

Parameters:
  • data (DataFrame) – DataFrame with OHLCV data

  • open_col (str) – Open price column

  • high_col (str) – High price column

  • low_col (str) – Low price column

  • close_col (str) – Close price column

Return type:

DataFrame

Returns:

DataFrame with pattern detection columns added

Example

>>> patterns = detect_candlestick_patterns(ohlcv_data)
>>> doji_count = patterns.filter(pl.col("doji") == True).height
>>> print(f"Doji patterns found: {doji_count}")
detect_chart_patterns(data, price_column='close', window=20)[source]

Detect basic chart patterns.

Parameters:
  • data (DataFrame) – DataFrame with price data

  • price_column (str) – Price column to analyze

  • window (int) – Window size for pattern detection

Return type:

dict[str, Any]

Returns:

Dict with detected patterns and their locations

Example

>>> patterns = detect_chart_patterns(ohlcv_data)
>>> print(f"Double tops found: {len(patterns['double_tops'])}")
extract_symbol_from_contract_id(contract_id)[source]

Extract the base symbol from a full contract ID.

Parameters:

contract_id (str) – Full contract ID or symbol

Returns:

Base symbol (e.g., “MGC” from “CON.F.US.MGC.M25”) None: If extraction fails

Return type:

str

Example

>>> extract_symbol_from_contract_id("CON.F.US.MGC.M25")
'MGC'
>>> extract_symbol_from_contract_id("MGC")
'MGC'
format_error_message(template, **kwargs)[source]

Format an error message template with provided values.

Parameters:
  • template (str) – Error message template with {placeholders}

  • **kwargs (Any) – Values to substitute in template

Return type:

str

Returns:

Formatted error message

Example

>>> format_error_message(
...     ErrorMessages.API_RESOURCE_NOT_FOUND, resource="order/123"
... )
"Resource not found: order/123"
format_price(price, decimals=2)[source]

Format price for display.

Return type:

str

format_volume(volume)[source]

Format volume for display.

Return type:

str

get_env_var(name, default=None, required=False)[source]

Get environment variable with optional default and validation.

Parameters:
  • name (str) – Environment variable name

  • default (Any) – Default value if not found

  • required (bool) – Whether the variable is required

Return type:

str

Returns:

Environment variable value

Raises:

ValueError – If required variable is missing

get_market_session_info(timezone='America/Chicago')[source]

Get detailed market session information.

Parameters:

timezone (str) – Market timezone

Returns:

Market session details

Return type:

dict

Example

>>> info = get_market_session_info()
>>> print(f"Market open: {info['is_open']}")
>>> print(f"Next session: {info['next_session_start']}")
get_polars_last_value(df, column)[source]

Get the last value from a polars DataFrame column safely.

Return type:

Any

get_polars_rows(df)[source]

Get number of rows from polars DataFrame safely.

Return type:

int

handle_errors(operation, logger=None, reraise=True, default_return=None)[source]

Decorator for consistent error handling across the SDK.

This decorator catches exceptions, logs them consistently, and optionally re-raises them with additional context.

Parameters:
  • operation (str) – Description of the operation being performed

  • logger (Logger | None) – Logger instance to use (defaults to module logger)

  • reraise (bool) – Whether to re-raise exceptions after logging

  • default_return (Any) – Default value to return if exception occurs and reraise=False

Return type:

Callable[[Callable[..., TypeVar(T)]], Callable[..., TypeVar(T)]]

Example

@handle_errors(“fetch market data”) async def get_bars(self, symbol: str):

# Implementation pass

handle_rate_limit(logger=None, fallback_delay=60.0)[source]

Decorator to handle rate limit errors with automatic retry.

Parameters:
  • logger (Logger | None) – Logger instance to use

  • fallback_delay (float) – Default delay if rate limit reset time is not available

Return type:

Callable[[Callable[..., TypeVar(T)]], Callable[..., TypeVar(T)]]

Example

@handle_rate_limit(fallback_delay=30.0) async def make_api_call(self):

# Implementation pass

is_market_hours(timezone='America/Chicago')[source]

Check if it’s currently market hours (CME futures).

Parameters:

timezone (str) – Timezone to check (default: CME time)

Returns:

True if market is open

Return type:

bool

log_api_call(logger, method, endpoint, status_code=None, duration=None, error=None, **extra)[source]

Log API call with standard format.

Parameters:
  • logger (Logger) – Logger instance

  • method (str) – HTTP method

  • endpoint (str) – API endpoint

  • status_code (int | None) – Response status code

  • duration (float | None) – Request duration in seconds

  • error (Exception | None) – Exception if call failed

  • **extra (Any) – Additional context

Return type:

None

retry_on_network_error(max_attempts=3, backoff_factor=2.0, initial_delay=1.0, max_delay=60.0, retry_on=(<class 'httpx.ConnectError'>, <class 'httpx.TimeoutException'>, <class 'project_x_py.exceptions.ProjectXConnectionError'>, <class 'project_x_py.exceptions.ProjectXServerError'>))[source]

Decorator to retry operations on network errors with exponential backoff.

Parameters:
  • max_attempts (int) – Maximum number of retry attempts

  • backoff_factor (float) – Multiplier for exponential backoff

  • initial_delay (float) – Initial delay between retries in seconds

  • max_delay (float) – Maximum delay between retries in seconds

  • retry_on (tuple[type[Exception], ...]) – Tuple of exception types to retry on

Return type:

Callable[[Callable[..., TypeVar(T)]], Callable[..., TypeVar(T)]]

Example

@retry_on_network_error(max_attempts=5, initial_delay=0.5) async def api_call(self):

# Implementation pass

round_to_tick_size(price, tick_size)[source]

Round price to nearest valid tick.

Parameters:
  • price (float) – Price to round

  • tick_size (float) – Minimum price movement

Returns:

Price rounded to nearest tick

Return type:

float

Raises:

ValueError – If tick_size is not positive or price is negative

Example

>>> round_to_tick_size(2050.37, 0.1)
2050.4
setup_logging(level='INFO', format_string=None, filename=None)[source]

Set up logging configuration for the ProjectX client.

Parameters:
  • level (str) – Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

  • format_string (str | None) – Custom format string for log messages

  • filename (str | None) – Optional filename to write logs to

Return type:

Logger

Returns:

Logger instance

validate_contract_id(contract_id)[source]

Validate ProjectX contract ID format.

Parameters:

contract_id (str) – Contract ID to validate

Returns:

True if valid format

Return type:

bool

Example

>>> validate_contract_id("CON.F.US.MGC.M25")
True
>>> validate_contract_id("MGC")
True
>>> validate_contract_id("invalid.contract")
False
validate_response(required_fields=None, response_type=None)[source]

Decorator to validate API response structure.

Parameters:
  • required_fields (list[str] | None) – List of required fields in the response

  • response_type (type | None) – Expected type of the response

Return type:

Callable[[Callable[..., TypeVar(T)]], Callable[..., TypeVar(T)]]

Example

@validate_response(required_fields=[“id”, “status”], response_type=dict) async def get_order(self, order_id: str):

# Implementation pass

Price and Position Utilities

calculate_tick_value(price_change, tick_size, tick_value)[source]

Calculate dollar value of a price change.

Parameters:
  • price_change (float) – Price difference

  • tick_size (float) – Minimum price movement

  • tick_value (float) – Dollar value per tick

Returns:

Dollar value of the price change

Return type:

float

Example

>>> # MGC moves 5 ticks
>>> calculate_tick_value(0.5, 0.1, 1.0)
5.0
calculate_position_value(size, price, tick_value, tick_size)[source]

Calculate total dollar value of a position.

Parameters:
  • size (int) – Number of contracts

  • price (float) – Current price

  • tick_value (float) – Dollar value per tick

  • tick_size (float) – Minimum price movement

Returns:

Total position value in dollars

Return type:

float

Example

>>> # 5 MGC contracts at $2050
>>> calculate_position_value(5, 2050.0, 1.0, 0.1)
102500.0
round_to_tick_size(price, tick_size)[source]

Round price to nearest valid tick.

Parameters:
  • price (float) – Price to round

  • tick_size (float) – Minimum price movement

Returns:

Price rounded to nearest tick

Return type:

float

Raises:

ValueError – If tick_size is not positive or price is negative

Example

>>> round_to_tick_size(2050.37, 0.1)
2050.4
format_price(price, decimals=2)[source]

Format price for display.

Return type:

str

format_volume(volume)[source]

Format volume for display.

Return type:

str

Risk Management

calculate_risk_reward_ratio(entry_price, stop_price, target_price)[source]

Calculate risk/reward ratio for a trade setup.

Parameters:
  • entry_price (float) – Entry price

  • stop_price (float) – Stop loss price

  • target_price (float) – Profit target price

Returns:

Risk/reward ratio (reward / risk)

Return type:

float

Raises:

ValueError – If prices are invalid (e.g., stop/target inversion)

Example

>>> # Long trade: entry=2050, stop=2045, target=2065
>>> calculate_risk_reward_ratio(2050, 2045, 2065)
3.0
calculate_position_sizing(account_balance, risk_per_trade, entry_price, stop_loss_price, tick_value=1.0)[source]

Calculate optimal position size based on risk management.

Parameters:
  • account_balance (float) – Current account balance

  • risk_per_trade (float) – Risk per trade as decimal (e.g., 0.02 for 2%)

  • entry_price (float) – Entry price for the trade

  • stop_loss_price (float) – Stop loss price

  • tick_value (float) – Dollar value per tick

Return type:

dict[str, Any]

Returns:

Dict with position sizing information

Example

>>> sizing = calculate_position_sizing(50000, 0.02, 2050, 2040, 1.0)
>>> print(f"Position size: {sizing['position_size']} contracts")
calculate_max_drawdown(data, price_column='close')[source]

Calculate maximum drawdown.

Parameters:
  • data (DataFrame) – DataFrame with price data

  • price_column (str) – Price column name

Return type:

dict[str, Any]

Returns:

Dict with drawdown metrics

Example

>>> dd_metrics = calculate_max_drawdown(ohlcv_data)
>>> print(f"Max Drawdown: {dd_metrics['max_drawdown']:.2%}")
calculate_sharpe_ratio(data, return_column='returns', risk_free_rate=0.02, periods_per_year=252)[source]

Calculate Sharpe ratio.

Parameters:
  • data (DataFrame) – DataFrame with returns data

  • return_column (str) – Returns column name

  • risk_free_rate (float) – Annual risk-free rate

  • periods_per_year (int) – Number of periods per year

Return type:

float

Returns:

Sharpe ratio

Example

>>> # First calculate returns
>>> data = data.with_columns(pl.col("close").pct_change().alias("returns"))
>>> sharpe = calculate_sharpe_ratio(data)
>>> print(f"Sharpe Ratio: {sharpe:.2f}")

Portfolio Analysis

calculate_portfolio_metrics(trades, initial_balance=100000.0)[source]

Calculate comprehensive portfolio performance metrics.

Parameters:
  • trades (list[dict[str, Any]]) – List of trade dictionaries with ‘pnl’, ‘size’, ‘timestamp’ fields

  • initial_balance (float) – Starting portfolio balance

Return type:

dict[str, Any]

Returns:

Dict with portfolio metrics

Example

>>> trades = [
...     {"pnl": 500, "size": 1, "timestamp": "2024-01-01"},
...     {"pnl": -200, "size": 2, "timestamp": "2024-01-02"},
... ]
>>> metrics = calculate_portfolio_metrics(trades)
>>> print(f"Total Return: {metrics['total_return']:.2%}")
calculate_correlation_matrix(data, columns=None, method='pearson')[source]

Calculate correlation matrix for specified columns.

Parameters:
  • data (DataFrame) – DataFrame with numeric data

  • columns (list[str] | None) – Columns to include (default: all numeric columns)

  • method (str) – Correlation method (“pearson”, “spearman”)

Return type:

DataFrame

Returns:

DataFrame with correlation matrix

Example

>>> corr_matrix = calculate_correlation_matrix(
...     ohlcv_data, ["open", "high", "low", "close"]
... )
>>> print(corr_matrix)

Contract Utilities

validate_contract_id(contract_id)[source]

Validate ProjectX contract ID format.

Parameters:

contract_id (str) – Contract ID to validate

Returns:

True if valid format

Return type:

bool

Example

>>> validate_contract_id("CON.F.US.MGC.M25")
True
>>> validate_contract_id("MGC")
True
>>> validate_contract_id("invalid.contract")
False
extract_symbol_from_contract_id(contract_id)[source]

Extract the base symbol from a full contract ID.

Parameters:

contract_id (str) – Full contract ID or symbol

Returns:

Base symbol (e.g., “MGC” from “CON.F.US.MGC.M25”) None: If extraction fails

Return type:

str

Example

>>> extract_symbol_from_contract_id("CON.F.US.MGC.M25")
'MGC'
>>> extract_symbol_from_contract_id("MGC")
'MGC'

Market Session

get_market_session_info(timezone='America/Chicago')[source]

Get detailed market session information.

Parameters:

timezone (str) – Market timezone

Returns:

Market session details

Return type:

dict

Example

>>> info = get_market_session_info()
>>> print(f"Market open: {info['is_open']}")
>>> print(f"Next session: {info['next_session_start']}")
is_market_hours(timezone='America/Chicago')[source]

Check if it’s currently market hours (CME futures).

Parameters:

timezone (str) – Timezone to check (default: CME time)

Returns:

True if market is open

Return type:

bool

Data Processing

create_data_snapshot(data, description='')[source]

Create a comprehensive snapshot of DataFrame for debugging/analysis.

Parameters:
  • data (DataFrame) – Polars DataFrame

  • description (str) – Optional description

Returns:

Data snapshot with statistics

Return type:

dict

Example

>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data")
>>> print(f"Rows: {snapshot['row_count']}")
>>> print(f"Timespan: {snapshot['timespan']}")
convert_timeframe_to_seconds(timeframe)[source]

Convert timeframe string to seconds.

Parameters:

timeframe (str) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)

Returns:

Timeframe in seconds

Return type:

int

Example

>>> convert_timeframe_to_seconds("5min")
300
>>> convert_timeframe_to_seconds("1hr")
3600

Logging and Configuration

setup_logging(level='INFO', format_string=None, filename=None)[source]

Set up logging configuration for the ProjectX client.

Parameters:
  • level (str) – Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

  • format_string (str | None) – Custom format string for log messages

  • filename (str | None) – Optional filename to write logs to

Return type:

Logger

Returns:

Logger instance

get_env_var(name, default=None, required=False)[source]

Get environment variable with optional default and validation.

Parameters:
  • name (str) – Environment variable name

  • default (Any) – Default value if not found

  • required (bool) – Whether the variable is required

Return type:

str

Returns:

Environment variable value

Raises:

ValueError – If required variable is missing

Rate Limiting

class RateLimiter(max_requests, window_seconds)[source]

Bases: object

Async rate limiter using sliding window algorithm.

This rate limiter implements a sliding window algorithm that tracks the exact timestamp of each request. It ensures that at any point in time, no more than max_requests have been made in the past window_seconds.

Features:
  • Thread-safe using asyncio locks

  • Accurate sliding window implementation

  • Automatic cleanup of old request timestamps

  • Memory-efficient with bounded history

  • Zero CPU usage while waiting

Parameters:
  • max_requests (int) – Maximum number of requests allowed in the window

  • window_seconds (int) – Size of the sliding window in seconds

Example

>>> # Create a rate limiter for 10 requests per second
>>> limiter = RateLimiter(max_requests=10, window_seconds=1)
>>>
>>> # Use in an async function
>>> async def rate_limited_operation():
...     await limiter.acquire()
...     # Perform operation here
...     return "Success"
>>>
>>> # The limiter will automatically delay if needed
>>> async def bulk_operations():
...     tasks = [rate_limited_operation() for _ in range(50)]
...     results = await asyncio.gather(*tasks)
...     # This will take ~5 seconds (50 requests / 10 per second)
__init__(max_requests, window_seconds)[source]
async acquire()[source]

Wait if necessary to stay within rate limits.

Return type:

None