Source code for project_x_py.indicators.fvg

"""
ProjectX Indicators - Fair Value Gap (FVG) Indicator

Author: @TexasCoding
Date: 2025-08-02

Overview:
    Implements the Fair Value Gap (FVG) indicator for identifying price imbalances
    and potential support/resistance zones. Detects gaps in price action with
    optional mitigation logic, helping traders spot areas likely to be revisited.

Key Features:
    - Identifies bullish/bearish fair value gaps from OHLC data
    - Supports configurable gap size, mitigation threshold, and custom columns
    - Returns gap boundary, size, and mitigation status for each bar
    - Callable as a class or via TA-Lib-style convenience function

Example Usage:
    ```python
    from project_x_py.indicators import FVG

    fvg = FVG()
    data_with_fvg = fvg.calculate(ohlcv_data, min_gap_size=0.001)
    gaps = data_with_fvg.filter(pl.col("fvg_bullish"))
    ```

See Also:
    - `project_x_py.indicators.order_block`
    - `project_x_py.indicators.base.BaseIndicator`
"""

from typing import Any

import polars as pl

from project_x_py.indicators.base import BaseIndicator


class FVG(BaseIndicator):
    """
    Fair Value Gap (FVG) indicator for identifying price imbalances.

    Fair Value Gaps are areas in price action where the market has moved so quickly
    that it has left a "gap" or imbalance. These areas often act as support/resistance
    zones as price tends to return to fill these gaps.

    The indicator identifies both bullish and bearish FVGs based on specific price
    action patterns and can optionally track whether gaps have been mitigated (filled).
    """

    def __init__(self) -> None:
        super().__init__(
            name="FVG",
            description="Fair Value Gap - identifies price imbalance areas that may act as support/resistance",
        )

    def calculate(
        self,
        data: pl.DataFrame,
        **kwargs: Any,
    ) -> pl.DataFrame:
        """
        Calculate Fair Value Gaps (FVG) using three-candle pattern.

        A bullish FVG occurs when:
        - Candle 3's low > Candle 1's high (creating an unfilled gap)
        - This gap represents an area of imbalance

        A bearish FVG occurs when:
        - Candle 3's high < Candle 1's low (creating an unfilled gap)
        - This gap represents an area of imbalance

        Args:
            data: DataFrame with OHLC data
            **kwargs: Additional parameters:
                high_column: High price column (default: "high")
                low_column: Low price column (default: "low")
                close_column: Close price column (default: "close")
                min_gap_size: Minimum gap size (in price units) to consider valid (default: 0.0)
                min_gap_percent: Minimum gap size as percentage (default: 0.0)
                check_mitigation: Whether to check if gaps have been mitigated (default: False)
                mitigation_threshold: Percentage of gap that needs to be filled to consider it mitigated (default: 0.5)

        Returns:
            DataFrame with FVG columns added:
            - fvg_bullish: Boolean indicating bullish FVG
            - fvg_bearish: Boolean indicating bearish FVG
            - fvg_bullish_start: Start of bullish gap (candle 1 high)
            - fvg_bullish_end: End of bullish gap (candle 3 low)
            - fvg_bearish_start: Start of bearish gap (candle 1 low)
            - fvg_bearish_end: End of bearish gap (candle 3 high)
            - fvg_gap_size: Size of the gap
            - fvg_gap_percent: Gap size as percentage
            - fvg_mitigated: Boolean indicating if gap has been mitigated (if check_mitigation=True)

        Example:
            >>> fvg = FVG()
            >>> data_with_fvg = fvg.calculate(ohlcv_data, min_gap_size=0.001)
            >>> bullish_gaps = data_with_fvg.filter(pl.col("fvg_bullish"))
        """
        # Extract parameters from kwargs with defaults
        high_column = kwargs.get("high_column", "high")
        low_column = kwargs.get("low_column", "low")
        close_column = kwargs.get("close_column", "close")
        min_gap_size = kwargs.get("min_gap_size", 0.0)
        min_gap_percent = kwargs.get("min_gap_percent", 0.0)
        check_mitigation = kwargs.get("check_mitigation", False)
        mitigation_threshold = kwargs.get("mitigation_threshold", 0.5)

        required_cols: list[str] = [high_column, low_column, close_column]
        self.validate_data(data, required_cols)
        self.validate_data_length(data, 3)  # Need at least 3 candles for pattern

        # Get values for three-candle pattern
        result = data.with_columns(
            [
                # Candle 1 (two bars ago)
                pl.col(high_column).shift(2).alias("candle1_high"),
                pl.col(low_column).shift(2).alias("candle1_low"),
                # Candle 2 (previous bar) - not used in gap detection but kept for potential future use
                pl.col(high_column).shift(1).alias("candle2_high"),
                pl.col(low_column).shift(1).alias("candle2_low"),
                # Candle 3 is current bar
            ]
        )

        # Identify FVGs using three-candle pattern
        result = result.with_columns(
            [
                # Bullish FVG: current low > candle 1 high
                (pl.col(low_column) > pl.col("candle1_high")).alias("fvg_bullish_raw"),
                # Bearish FVG: current high < candle 1 low
                (pl.col(high_column) < pl.col("candle1_low")).alias("fvg_bearish_raw"),
            ]
        )

        # Calculate gap boundaries and size
        result = result.with_columns(
            [
                # Bullish gap: from candle 1 high to current low
                pl.when(pl.col("fvg_bullish_raw"))
                .then(pl.col("candle1_high"))
                .otherwise(None)
                .alias("fvg_bullish_start"),
                pl.when(pl.col("fvg_bullish_raw"))
                .then(pl.col(low_column))
                .otherwise(None)
                .alias("fvg_bullish_end"),
                # Bearish gap: from candle 1 low to current high
                pl.when(pl.col("fvg_bearish_raw"))
                .then(pl.col("candle1_low"))
                .otherwise(None)
                .alias("fvg_bearish_start"),
                pl.when(pl.col("fvg_bearish_raw"))
                .then(pl.col(high_column))
                .otherwise(None)
                .alias("fvg_bearish_end"),
            ]
        )

        # Calculate gap size and percentage
        result = result.with_columns(
            [
                pl.when(pl.col("fvg_bullish_raw"))
                .then((pl.col("fvg_bullish_end") - pl.col("fvg_bullish_start")).abs())
                .when(pl.col("fvg_bearish_raw"))
                .then((pl.col("fvg_bearish_start") - pl.col("fvg_bearish_end")).abs())
                .otherwise(None)
                .alias("fvg_gap_size"),
                # Calculate gap as percentage of price
                pl.when(pl.col("fvg_bullish_raw"))
                .then(
                    (pl.col("fvg_bullish_end") - pl.col("fvg_bullish_start")).abs()
                    / pl.col("fvg_bullish_start")
                    * 100
                )
                .when(pl.col("fvg_bearish_raw"))
                .then(
                    (pl.col("fvg_bearish_start") - pl.col("fvg_bearish_end")).abs()
                    / pl.col("fvg_bearish_start")
                    * 100
                )
                .otherwise(None)
                .alias("fvg_gap_percent"),
            ]
        )

        # Apply minimum gap size and percentage filters
        result = result.with_columns(
            [
                (
                    pl.col("fvg_bullish_raw")
                    & (pl.col("fvg_gap_size") >= min_gap_size)
                    & (pl.col("fvg_gap_percent") >= min_gap_percent)
                ).alias("fvg_bullish"),
                (
                    pl.col("fvg_bearish_raw")
                    & (pl.col("fvg_gap_size") >= min_gap_size)
                    & (pl.col("fvg_gap_percent") >= min_gap_percent)
                ).alias("fvg_bearish"),
            ]
        )

        # Check for mitigation if requested
        if check_mitigation:
            # Add row index for tracking
            result = result.with_row_index("_row_idx")

            # Find gap indices
            gap_indices = result.filter(
                pl.col("fvg_bullish") | pl.col("fvg_bearish")
            ).select(
                "_row_idx",
                "fvg_bullish",
                "fvg_bullish_start",
                "fvg_bullish_end",
                "fvg_bearish_start",
                "fvg_bearish_end",
            )

            # Initialize mitigation column
            mitigated = pl.Series("fvg_mitigated", [False] * len(result))

            # Check each gap for mitigation
            for row in gap_indices.iter_rows(named=True):
                gap_idx = row["_row_idx"]
                is_bullish = row["fvg_bullish"]

                # Look at subsequent candles for mitigation
                future_data = result.filter(pl.col("_row_idx") > gap_idx)

                if is_bullish and row["fvg_bullish_start"] is not None:
                    gap_start = row["fvg_bullish_start"]
                    gap_end = row["fvg_bullish_end"]
                    gap_size = gap_end - gap_start
                    mitigation_amount = gap_size * mitigation_threshold
                    # Bullish gap is mitigated when price retraces into the gap
                    mitigation_level = gap_start + mitigation_amount
                    mitigated_rows = future_data.filter(
                        pl.col(low_column) <= mitigation_level
                    )
                elif not is_bullish and row["fvg_bearish_start"] is not None:
                    gap_start = row["fvg_bearish_start"]
                    gap_end = row["fvg_bearish_end"]
                    gap_size = gap_start - gap_end
                    mitigation_amount = gap_size * mitigation_threshold
                    # Bearish gap is mitigated when price retraces into the gap
                    mitigation_level = gap_start - mitigation_amount
                    mitigated_rows = future_data.filter(
                        pl.col(high_column) >= mitigation_level
                    )
                else:
                    continue

                if len(mitigated_rows) > 0:
                    mitigated[gap_idx] = True

            result = result.with_columns(mitigated)
            result = result.drop("_row_idx")

            # Update gap columns to exclude mitigated gaps if requested
            result = result.with_columns(
                [
                    (pl.col("fvg_bullish") & ~pl.col("fvg_mitigated")).alias(
                        "fvg_bullish"
                    ),
                    (pl.col("fvg_bearish") & ~pl.col("fvg_mitigated")).alias(
                        "fvg_bearish"
                    ),
                ]
            )

        # Clean up intermediate columns
        columns_to_drop: list[str] = [
            "candle1_high",
            "candle1_low",
            "candle2_high",
            "candle2_low",
            "fvg_bullish_raw",
            "fvg_bearish_raw",
        ]

        result = result.drop(columns_to_drop)

        return result


[docs] def calculate_fvg( data: pl.DataFrame, high_column: str = "high", low_column: str = "low", close_column: str = "close", min_gap_size: float = 0.0, min_gap_percent: float = 0.0, check_mitigation: bool = False, mitigation_threshold: float = 0.5, ) -> pl.DataFrame: """ Calculate Fair Value Gaps using three-candle pattern (convenience function). See FVG.calculate() for detailed documentation. Args: data: DataFrame with OHLC data high_column: High price column low_column: Low price column close_column: Close price column min_gap_size: Minimum gap size to consider valid min_gap_percent: Minimum gap size as percentage check_mitigation: Whether to check if gaps have been mitigated mitigation_threshold: Percentage of gap that needs to be filled to consider it mitigated Returns: DataFrame with FVG columns added """ indicator = FVG() return indicator.calculate( data, high_column=high_column, low_column=low_column, close_column=close_column, min_gap_size=min_gap_size, min_gap_percent=min_gap_percent, check_mitigation=check_mitigation, mitigation_threshold=mitigation_threshold, )