Skip to content

场景四 — 传统策略迁移与自定义策略

完整可运行示例: examples/16_traditional_strategy/traditional_strategy_demo.py 覆盖动量策略、均值回归、趋势跟踪三种经典策略的信号生成与回测对比。

本文档展示如何在 AXON 框架中实现和运行传统量化策略(不依赖 RL),以及如何将传统信号与 AI 模块结合,构建混合模式交易系统。


1. 纯 Python 策略(不依赖 RL)

AXON 的交易环境 (TradingEnv) 和回测引擎 (BacktestEngine) 完全支持纯规则策略。你可以像使用普通 Python 类一样实现策略,无需了解 RL 的任何细节。

1.1 SimpleMomentumStrategy 完整代码

"""
SimpleMomentumStrategy - 纯 Python 动量策略示例

本策略不依赖任何 RL 模块,直接基于价格数据计算动量信号,
通过 AXON 的 BacktestEngine 进行回测验证。
"""

from dataclasses import dataclass
from typing import List, Optional, Dict
from decimal import Decimal
import numpy as np

from axon_quant import (
    BacktestEngine,
    MarketBar,
    Order,
    OrderId,
    OrderType,
    Side,
    TimeInForce,
    Symbol,
    ExchangeId,
)


@dataclass
class Signal:
    """策略信号结构体。"""
    action: str           # "buy" / "sell" / "hold"
    confidence: float     # 0.0 ~ 1.0
    target_position: float  # 目标仓位比例 0.0 ~ 1.0
    reason: str           # 信号原因说明


class SimpleMomentumStrategy:
    """
    简单动量策略:基于双均线交叉 + 成交量确认。

    策略逻辑:
    1. 计算短期均线 (MA5) 和长期均线 (MA20)
    2. 当 MA5 上穿 MA20 且成交量放大 1.5 倍时,产生买入信号
    3. 当 MA5 下穿 MA20 且成交量放大 1.5 倍时,产生卖出信号
    4. 其他情况持仓不动

    参数:
        short_window: 短期均线窗口,默认 5
        long_window: 长期均线窗口,默认 20
        volume_threshold: 成交量放大阈值,默认 1.5
        position_size: 每次交易仓位比例,默认 0.5 (50%)
    """

    def __init__(
        self,
        short_window: int = 5,
        long_window: int = 20,
        volume_threshold: float = 1.5,
        position_size: float = 0.5,
    ):
        self.short_window = short_window
        self.long_window = long_window
        self.volume_threshold = volume_threshold
        self.position_size = position_size

        # 状态跟踪
        self.price_history: List[float] = []
        self.volume_history: List[float] = []
        self.current_position: float = 0.0  # 当前仓位比例 -1.0 ~ 1.0
        self.signals: List[Dict] = []

    def on_bar(self, bar: MarketBar) -> Optional[Order]:
        """
        每根 K 线到达时调用,返回交易订单或 None。

        Args:
            bar: 当前 K 线数据,包含 open/high/low/close/volume

        Returns:
            Order 对象(有信号时)或 None(无信号时)
        """
        # 记录历史数据
        self.price_history.append(bar.close)
        self.volume_history.append(bar.volume)

        # 数据不足时不交易
        if len(self.price_history) < self.long_window:
            return None

        # 保持窗口长度,防止内存无限增长
        if len(self.price_history) > self.long_window * 2:
            self.price_history = self.price_history[-self.long_window * 2:]
            self.volume_history = self.volume_history[-self.long_window * 2:]

        # 计算均线
        ma_short = np.mean(self.price_history[-self.short_window:])
        ma_long = np.mean(self.price_history[-self.long_window:])

        # 计算成交量均值
        avg_volume = np.mean(self.volume_history[-self.long_window:])
        volume_ratio = bar.volume / avg_volume if avg_volume > 0 else 1.0

        # 计算上一期的均线(用于判断交叉)
        if len(self.price_history) >= self.long_window + 1:
            prev_ma_short = np.mean(self.price_history[-(self.short_window + 1):-1])
            prev_ma_long = np.mean(self.price_history[-(self.long_window + 1):-1])
        else:
            prev_ma_short = ma_short
            prev_ma_long = ma_long

        # 判断均线交叉
        golden_cross = prev_ma_short <= prev_ma_long and ma_short > ma_long
        death_cross = prev_ma_short >= prev_ma_long and ma_short < ma_long

        # 成交量确认
        volume_confirmed = volume_ratio >= self.volume_threshold

        # 生成信号
        order = None
        if golden_cross and volume_confirmed and self.current_position <= 0:
            # 金叉 + 放量:买入
            signal = Signal(
                action="buy",
                confidence=min(volume_ratio / 2.0, 1.0),
                target_position=self.position_size,
                reason=f"MA{self.short_window} 上穿 MA{self.long_window}, "
                       f"成交量放大 {volume_ratio:.2f} 倍",
            )
            order = self._create_order(signal, bar)
            self.current_position = self.position_size

        elif death_cross and volume_confirmed and self.current_position >= 0:
            # 死叉 + 放量:卖出
            signal = Signal(
                action="sell",
                confidence=min(volume_ratio / 2.0, 1.0),
                target_position=-self.position_size,
                reason=f"MA{self.short_window} 下穿 MA{self.long_window}, "
                       f"成交量放大 {volume_ratio:.2f} 倍",
            )
            order = self._create_order(signal, bar)
            self.current_position = -self.position_size

        # 记录信号日志
        if order:
            self.signals.append({
                "timestamp": bar.timestamp,
                "action": signal.action,
                "confidence": signal.confidence,
                "reason": signal.reason,
                "price": bar.close,
            })

        return order

    def _create_order(self, signal: Signal, bar: MarketBar) -> Order:
        """根据信号创建订单对象。"""
        side = Side.Buy if signal.action == "buy" else Side.Sell

        # 计算订单数量(简化:按目标仓位比例计算)
        notional = 10000.0 * abs(signal.target_position)  # 假设本金 10000 USDT
        quantity = Decimal(str(notional / bar.close))

        return Order(
            client_order_id=OrderId.new(),
            symbol=Symbol("BTCUSDT"),
            side=side,
            order_type=OrderType.Market,
            price=None,
            quantity=quantity,
            time_in_force=TimeInForce.Gtc,
            exchange=ExchangeId.Binance,
            meta={
                "strategy": "SimpleMomentum",
                "signal_confidence": str(signal.confidence),
                "signal_reason": signal.reason,
            },
        )

    def get_stats(self) -> Dict:
        """获取策略统计信息。"""
        buy_signals = [s for s in self.signals if s["action"] == "buy"]
        sell_signals = [s for s in self.signals if s["action"] == "sell"]

        return {
            "total_signals": len(self.signals),
            "buy_signals": len(buy_signals),
            "sell_signals": len(sell_signals),
            "avg_confidence": np.mean([s["confidence"] for s in self.signals]) if self.signals else 0.0,
        }


# ==================== 回测执行 ====================

def run_backtest():
    """
    使用 AXON BacktestEngine 对 SimpleMomentumStrategy 进行回测。
    """
    # 准备历史 K 线数据(实际使用时应从 axon-data 加载)
    bars = load_historical_bars("BTCUSDT", "1h", start="2024-01-01", end="2024-06-01")

    # 创建策略实例
    strategy = SimpleMomentumStrategy(
        short_window=5,
        long_window=20,
        volume_threshold=1.5,
        position_size=0.5,
    )

    # 创建回测引擎
    engine = BacktestEngine(
        initial_capital=10000.0,
        transaction_cost=0.001,  # 10 bps
        slippage=0.0005,         # 5 bps
    )

    # 运行回测
    for bar in bars:
        order = strategy.on_bar(bar)
        if order:
            engine.submit_order(order)
        engine.on_bar(bar)

    # 输出结果
    stats = strategy.get_stats()
    print(f"信号总数: {stats['total_signals']}")
    print(f"买入信号: {stats['buy_signals']}")
    print(f"卖出信号: {stats['sell_signals']}")
    print(f"平均置信度: {stats['avg_confidence']:.2%}")
    print(f"最终净值: {engine.portfolio_value:.2f} USDT")
    print(f"收益率: {(engine.portfolio_value / 10000.0 - 1) * 100:.2f}%")

2. AI 辅助策略:传统信号触发 LLM 复核

混合模式的核心思想是:传统策略负责快速发现机会,LLM 负责深度验证。这样既保留了传统策略的低延迟,又获得了 AI 的语义理解能力。

2.1 混合模式架构

┌─────────────────────────────────────────────────────────────┐
│                    HybridStrategy                           │
│                                                             │
│  ┌─────────────────┐      ┌─────────────────────────────┐  │
│  │  Traditional    │      │  LLM Verification           │  │
│  │  Signal Layer   │ ───► │  (News / Sentiment / Macro)│  │
│  │                 │      │                             │  │
│  │  - MA Cross     │      │  - "BTC 突破 2 月新高,      │  │
│  │  - Bollinger    │      │    但美联储即将加息,        │  │
│  │  - RSI Diverge  │      │    是否应跟进?"             │  │
│  └─────────────────┘      └─────────────────────────────┘  │
│           │                           │                     │
│           └───────────┬───────────────┘                     │
│                       ▼                                     │
│              ┌─────────────────┐                           │
│              │  Decision Gate  │                           │
│              │  (信号 + LLM)   │                           │
│              └─────────────────┘                           │
└─────────────────────────────────────────────────────────────┘

2.2 混合策略完整代码

"""
HybridStrategy - 传统信号 + LLM 复核的混合策略

工作流程:
1. 传统技术指标产生初始信号
2. LLM 分析当前市场环境(新闻、情绪、宏观)
3. 只有当传统信号和 LLM 意见一致时才执行交易
4. LLM 同时生成交易理由,存入审计日志
"""

import asyncio
from typing import Optional, List
from decimal import Decimal

from axon_quant import (
    MarketBar, Order, OrderId, OrderType, Side, TimeInForce,
    Symbol, ExchangeId,
    LLMBackend, Message, ToolDefinition,
    DecisionRecorder, DecisionRecord,
)


class HybridStrategy:
    """
    混合策略:传统动量信号 + LLM 市场复核。

    参数:
        llm_backend: LLM 后端实例 (OpenAI / DeepSeek / 本地模型)
        strategy: 底层传统策略实例
        consensus_threshold: 共识阈值,LLM 置信度需超过此值才执行
        max_llm_latency_ms: LLM 最大等待时间,超时则仅按传统信号执行
    """

    def __init__(
        self,
        llm_backend: LLMBackend,
        strategy: SimpleMomentumStrategy,
        consensus_threshold: float = 0.7,
        max_llm_latency_ms: float = 2000.0,
    ):
        self.llm = llm_backend
        self.strategy = strategy
        self.consensus_threshold = consensus_threshold
        self.max_llm_latency_ms = max_llm_latency_ms
        self.recorder = None  # 可选:决策记录器

        # 统计
        self.traditional_signals = 0
        self.llm_approved = 0
        self.llm_rejected = 0
        self.llm_timeouts = 0

    async def on_bar(self, bar: MarketBar) -> Optional[Order]:
        """
        每根 K 线到达时的完整决策流程。

        步骤:
        1. 传统策略产生信号
        2. 构造 LLM 提示词(包含价格数据 + 信号信息)
        3. 异步调用 LLM 进行复核
        4. 综合决策:传统信号 + LLM 意见
        5. 记录决策理由
        """
        # 步骤 1: 传统策略信号
        raw_order = self.strategy.on_bar(bar)
        if raw_order is None:
            return None

        self.traditional_signals += 1

        # 提取信号信息
        signal_info = self.strategy.signals[-1]

        # 步骤 2: 构造 LLM 提示词
        prompt = self._build_prompt(bar, signal_info)

        # 步骤 3: 异步调用 LLM(带超时)
        try:
            llm_response = await asyncio.wait_for(
                self._query_llm(prompt),
                timeout=self.max_llm_latency_ms / 1000.0,
            )
            llm_approved = llm_response.get("approve", False)
            llm_confidence = llm_response.get("confidence", 0.0)
            llm_reason = llm_response.get("reason", "")

        except asyncio.TimeoutError:
            # LLM 超时:保守策略,仅按传统信号执行(但降低仓位)
            self.llm_timeouts += 1
            print(f"[警告] LLM 复核超时,降低仓位至 30% 执行")
            raw_order.quantity = raw_order.quantity * Decimal("0.3")
            self._record_decision(bar, signal_info, "timeout", 0.0, "LLM 超时,保守执行")
            return raw_order

        # 步骤 4: 综合决策
        if llm_approved and llm_confidence >= self.consensus_threshold:
            # 双重确认通过:全额执行
            self.llm_approved += 1
            print(f"[通过] 传统信号 + LLM 一致 (置信度 {llm_confidence:.1%})")
            self._record_decision(bar, signal_info, "approved", llm_confidence, llm_reason)
            return raw_order
        else:
            # LLM 反对或置信度不足:放弃交易
            self.llm_rejected += 1
            print(f"[放弃] LLM 反对 ({llm_reason}), 不执行交易")
            self._record_decision(bar, signal_info, "rejected", llm_confidence, llm_reason)
            return None

    def _build_prompt(self, bar: MarketBar, signal: dict) -> str:
        """
        构造 LLM 复核提示词。

        包含信息:
        - 当前价格和技术指标
        - 传统策略的信号和理由
        - 要求 LLM 从宏观/新闻角度评估
        """
        prices = self.strategy.price_history[-20:]
        price_change_24h = (prices[-1] - prices[-24]) / prices[-24] * 100 if len(prices) >= 24 else 0

        prompt = f"""你是一位资深加密货币交易员,请对以下交易信号进行复核评估。

## 市场数据
- 交易对: BTCUSDT
- 当前价格: {bar.close:.2f} USDT
- 24h 涨跌: {price_change_24h:+.2f}%
- 成交量: {bar.volume:.2f}

## 技术信号
- 信号类型: {signal['action'].upper()}
- 信号置信度: {signal['confidence']:.1%}
- 信号理由: {signal['reason']}

## 你的任务
1. 从宏观经济、市场情绪、新闻事件角度分析当前是否适合执行该交易
2. 给出明确的通过/拒绝建议
3. 提供你的分析理由(100 字以内)

请以 JSON 格式回复:
{{
    "approve": true/false,
    "confidence": 0.0-1.0,
    "reason": "你的分析理由"
}}
"""
        return prompt

    async def _query_llm(self, prompt: str) -> dict:
        """
        调用 LLM 后端获取复核意见。

        实际实现中,这里会调用 axon-llm 的 complete() 或 complete_with_tools()。
        """
        messages = [
            Message(role="system", content="你是一位专业的加密货币交易分析师。"),
            Message(role="user", content=prompt),
        ]

        response = await self.llm.complete(messages)

        # 解析 LLM 返回的 JSON
        import json
        try:
            result = json.loads(response.content)
            return result
        except json.JSONDecodeError:
            # 如果 LLM 没有返回合法 JSON,保守处理为拒绝
            return {"approve": False, "confidence": 0.0, "reason": "LLM 返回格式错误"}

    def _record_decision(self, bar: MarketBar, signal: dict, outcome: str, llm_conf: float, reason: str):
        """记录决策到审计系统(可选)。"""
        if self.recorder:
            self.recorder.record_async(DecisionRecord(
                decision_id=f"hybrid_{bar.timestamp.isoformat()}",
                timestamp=bar.timestamp.timestamp(),
                observation={"price": bar.close, "volume": bar.volume},
                action=signal,
                model_version="hybrid_v1",
                metadata={
                    "outcome": outcome,
                    "llm_confidence": llm_conf,
                    "llm_reason": reason,
                },
            ))

    def get_stats(self) -> dict:
        """获取混合策略统计。"""
        return {
            "traditional_signals": self.traditional_signals,
            "llm_approved": self.llm_approved,
            "llm_rejected": self.llm_rejected,
            "llm_timeouts": self.llm_timeouts,
            "approval_rate": self.llm_approved / max(self.traditional_signals, 1),
        }


# ==================== 使用示例 ====================

async def run_hybrid_strategy():
    """运行混合策略示例。"""
    from axon_quant import OpenAIBackend

    # 创建 LLM 后端
    llm = OpenAIBackend(
        api_key="YOUR_API_KEY",
        model="deepseek-chat",
        base_url="https://api.deepseek.com",
    )

    # 创建传统策略
    traditional = SimpleMomentumStrategy(
        short_window=5,
        long_window=20,
        volume_threshold=1.5,
        position_size=0.5,
    )

    # 创建混合策略
    hybrid = HybridStrategy(
        llm_backend=llm,
        strategy=traditional,
        consensus_threshold=0.7,
        max_llm_latency_ms=2000,
    )

    # 模拟 K 线流
    bars = load_historical_bars("BTCUSDT", "1h")

    for bar in bars:
        order = await hybrid.on_bar(bar)
        if order:
            print(f"执行订单: {order.side} {order.quantity} @ {bar.close}")

    # 输出统计
    stats = hybrid.get_stats()
    print(f"\n混合策略统计:")
    print(f"  传统信号数: {stats['traditional_signals']}")
    print(f"  LLM 通过: {stats['llm_approved']}")
    print(f"  LLM 拒绝: {stats['llm_rejected']}")
    print(f"  LLM 超时: {stats['llm_timeouts']}")
    print(f"  通过率: {stats['approval_rate']:.1%}")

3. 业内常见策略模式

以下展示三种经典策略模式在 AXON 中的完整实现,每种都包含与 AI 模块的集成方式。

3.1 趋势跟踪:均线突破 + LLM 新闻验证

"""
TrendFollowingStrategy - 趋势跟踪策略

核心逻辑:
- 入场: 价格突破 N 周期高点 + 均线多头排列
- 出场: 价格跌破 N 周期低点 或 均线空头排列
- LLM 增强: 突破时查询相关新闻,验证突破有效性
"""

import numpy as np
from typing import Optional, List
from decimal import Decimal

from axon_quant import (
    MarketBar, Order, OrderId, OrderType, Side, TimeInForce,
    Symbol, ExchangeId,
)


class TrendFollowingStrategy:
    """
    趋势跟踪策略:均线突破 + LLM 新闻验证。

    参数:
        lookback: 突破回看周期,默认 20
        ma_fast: 快速均线周期,默认 10
        ma_slow: 慢速均线周期,默认 30
        atr_period: ATR 周期(用于止损),默认 14
        risk_per_trade: 每笔交易风险比例,默认 1%
    """

    def __init__(
        self,
        lookback: int = 20,
        ma_fast: int = 10,
        ma_slow: int = 30,
        atr_period: int = 14,
        risk_per_trade: float = 0.01,
    ):
        self.lookback = lookback
        self.ma_fast = ma_fast
        self.ma_slow = ma_slow
        self.atr_period = atr_period
        self.risk_per_trade = risk_per_trade

        self.prices: List[float] = []
        self.highs: List[float] = []
        self.lows: List[float] = []
        self.position: Optional[dict] = None

    def on_bar(self, bar: MarketBar) -> Optional[Order]:
        """主决策逻辑。"""
        self.prices.append(bar.close)
        self.highs.append(bar.high)
        self.lows.append(bar.low)

        if len(self.prices) < self.ma_slow:
            return None

        # 计算指标
        ma_fast = np.mean(self.prices[-self.ma_fast:])
        ma_slow = np.mean(self.prices[-self.ma_slow:])
        atr = self._calculate_atr()

        # 计算突破水平
        highest_high = max(self.highs[-self.lookback:])
        lowest_low = min(self.lows[-self.lookback:])

        order = None

        # 多头入场: 突破高点 + 均线多头排列
        if bar.close > highest_high and ma_fast > ma_slow and self.position is None:
            stop_loss = bar.close - 2.0 * atr
            take_profit = bar.close + 3.0 * atr  # 1:1.5 盈亏比

            order = self._create_order(
                side=Side.Buy,
                price=bar.close,
                stop_loss=stop_loss,
                take_profit=take_profit,
                reason=f"突破 {self.lookback} 周期高点 {highest_high:.2f}, "
                       f"MA{self.ma_fast}({ma_fast:.2f}) > MA{self.ma_slow}({ma_slow:.2f})",
            )
            self.position = {"side": "long", "entry": bar.close, "sl": stop_loss, "tp": take_profit}

        # 空头入场: 跌破低点 + 均线空头排列
        elif bar.close < lowest_low and ma_fast < ma_slow and self.position is None:
            stop_loss = bar.close + 2.0 * atr
            take_profit = bar.close - 3.0 * atr

            order = self._create_order(
                side=Side.Sell,
                price=bar.close,
                stop_loss=stop_loss,
                take_profit=take_profit,
                reason=f"跌破 {self.lookback} 周期低点 {lowest_low:.2f}, "
                       f"MA{self.ma_fast}({ma_fast:.2f}) < MA{self.ma_slow}({ma_slow:.2f})",
            )
            self.position = {"side": "short", "entry": bar.close, "sl": stop_loss, "tp": take_profit}

        # 出场逻辑
        elif self.position:
            if self.position["side"] == "long":
                if bar.close <= self.position["sl"] or bar.close >= self.position["tp"]:
                    order = self._create_order(
                        side=Side.Sell,
                        price=bar.close,
                        reason="多头止损/止盈",
                    )
                    self.position = None
            else:
                if bar.close >= self.position["sl"] or bar.close <= self.position["tp"]:
                    order = self._create_order(
                        side=Side.Buy,
                        price=bar.close,
                        reason="空头止损/止盈",
                    )
                    self.position = None

        return order

    def _calculate_atr(self) -> float:
        """计算平均真实波幅 (ATR)。"""
        if len(self.prices) < self.atr_period + 1:
            return 0.0

        tr_values = []
        for i in range(-self.atr_period, 0):
            high = self.highs[i]
            low = self.lows[i]
            prev_close = self.prices[i - 1]
            tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
            tr_values.append(tr)

        return np.mean(tr_values)

    def _create_order(
        self,
        side: Side,
        price: float,
        stop_loss: float = 0.0,
        take_profit: float = 0.0,
        reason: str = "",
    ) -> Order:
        """创建订单。"""
        # 根据风险计算仓位大小
        # 简化: 固定名义金额
        notional = 1000.0
        quantity = Decimal(str(notional / price))

        return Order(
            client_order_id=OrderId.new(),
            symbol=Symbol("BTCUSDT"),
            side=side,
            order_type=OrderType.Market,
            price=None,
            quantity=quantity,
            time_in_force=TimeInForce.Gtc,
            exchange=ExchangeId.Binance,
            meta={
                "strategy": "TrendFollowing",
                "reason": reason,
                "stop_loss": str(stop_loss),
                "take_profit": str(take_profit),
            },
        )


# ==================== LLM 新闻验证集成 ====================

async def verify_with_llm(strategy: TrendFollowingStrategy, llm_backend, bar: MarketBar) -> bool:
    """
    在趋势突破时,使用 LLM 验证新闻面是否支持该方向。

    返回 True 表示 LLM 支持该交易方向,False 表示反对或不确定。
    """
    recent_high = max(strategy.highs[-strategy.lookback:])
    recent_low = min(strategy.lows[-strategy.lookback:])

    is_breakout_up = bar.close > recent_high
    direction = "上涨" if is_breakout_up else "下跌"

    prompt = f"""BTC 刚刚{'突破' if is_breakup_up else '跌破'}{strategy.lookback} 周期的{'高点' if is_breakout_up else '低点'}
价格 {bar.close:.2f} USDT。

请分析:
1. 最近 24 小时是否有重大新闻支持 BTC {direction}?
2. 宏观经济环境(美联储政策、美元指数等)是否支持该方向?
3. 市场情绪(恐惧/贪婪指数、资金费率)如何?

如果基本面支持该方向,回复 "APPROVE",否则回复 "REJECT"。
简要说明理由(50 字以内)。"""

    messages = [Message(role="user", content=prompt)]
    response = await llm_backend.complete(messages)

    content = response.content.upper()
    approved = "APPROVE" in content and "REJECT" not in content

    print(f"[LLM 验证] {'通过' if approved else '拒绝'} - {response.content[:100]}")
    return approved

3.2 均值回归:布林带 + RL 仓位优化

"""
MeanReversionStrategy - 均值回归策略

核心逻辑:
- 入场: 价格触及布林带上/下轨 + RSI 超买/超卖
- 仓位: 不固定,使用 RL 模型根据市场状态动态优化仓位大小
- 出场: 价格回归中轨 或 止损
"""

import numpy as np
from typing import Optional, Tuple
from decimal import Decimal

from axon_quant import (
    MarketBar, Order, OrderId, OrderType, Side, TimeInForce,
    Symbol, ExchangeId,
    TradingEnv, Observation, Action,
)


class MeanReversionStrategy:
    """
    均值回归策略:布林带 + RL 仓位优化。

    参数:
        bb_period: 布林带周期,默认 20
        bb_std: 布林带标准差倍数,默认 2.0
        rsi_period: RSI 周期,默认 14
        rsi_overbought: RSI 超买阈值,默认 70
        rsi_oversold: RSI 超卖阈值,默认 30
    """

    def __init__(
        self,
        bb_period: int = 20,
        bb_std: float = 2.0,
        rsi_period: int = 14,
        rsi_overbought: float = 70.0,
        rsi_oversold: float = 30.0,
    ):
        self.bb_period = bb_period
        self.bb_std = bb_std
        self.rsi_period = rsi_period
        self.rsi_overbought = rsi_overbought
        self.rsi_oversold = rsi_oversold

        self.prices: list[float] = []
        self.position: Optional[str] = None

        # RL 仓位优化器(可选)
        self.rl_position_optimizer = None

    def on_bar(self, bar: MarketBar) -> Optional[Order]:
        """主决策逻辑。"""
        self.prices.append(bar.close)

        if len(self.prices) < self.bb_period:
            return None

        # 计算布林带
        middle, upper, lower = self._calculate_bollinger()

        # 计算 RSI
        rsi = self._calculate_rsi()

        # 计算价格相对于布林带的位置
        bb_position = (bar.close - lower) / (upper - lower) if upper != lower else 0.5

        order = None

        # 空头信号: 触及上轨 + RSI 超买
        if bb_position >= 0.95 and rsi >= self.rsi_overbought and self.position is None:
            position_size = self._optimize_position_size(bar, "short")
            order = self._create_order(
                side=Side.Sell,
                quantity=position_size,
                reason=f"触及布林带上轨 {upper:.2f}, RSI={rsi:.1f} 超买, "
                       f"仓位={position_size:.2%}",
            )
            self.position = "short"

        # 多头信号: 触及下轨 + RSI 超卖
        elif bb_position <= 0.05 and rsi <= self.rsi_oversold and self.position is None:
            position_size = self._optimize_position_size(bar, "long")
            order = self._create_order(
                side=Side.Buy,
                quantity=position_size,
                reason=f"触及布林带下轨 {lower:.2f}, RSI={rsi:.1f} 超卖, "
                       f"仓位={position_size:.2%}",
            )
            self.position = "long"

        # 出场: 价格回归中轨附近
        elif self.position:
            if abs(bar.close - middle) / middle < 0.005:
                order = self._create_order(
                    side=Side.Sell if self.position == "long" else Side.Buy,
                    quantity=0.0,  # 平仓
                    reason=f"价格回归中轨 {middle:.2f}, 平仓",
                )
                self.position = None

        return order

    def _calculate_bollinger(self) -> Tuple[float, float, float]:
        """计算布林带三条线。"""
        window = self.prices[-self.bb_period:]
        middle = np.mean(window)
        std = np.std(window)
        upper = middle + self.bb_std * std
        lower = middle - self.bb_std * std
        return middle, upper, lower

    def _calculate_rsi(self) -> float:
        """计算 RSI 指标。"""
        if len(self.prices) < self.rsi_period + 1:
            return 50.0

        deltas = np.diff(self.prices[-(self.rsi_period + 1):])
        gains = np.where(deltas > 0, deltas, 0)
        losses = np.where(deltas < 0, -deltas, 0)

        avg_gain = np.mean(gains)
        avg_loss = np.mean(losses)

        if avg_loss == 0:
            return 100.0

        rs = avg_gain / avg_loss
        rsi = 100.0 - (100.0 / (1.0 + rs))
        return rsi

    def _optimize_position_size(self, bar: MarketBar, direction: str) -> Decimal:
        """
        使用 RL 模型优化仓位大小。

        如果没有 RL 模型,回退到固定仓位。
        """
        if self.rl_position_optimizer is None:
            # 默认固定仓位 30%
            return Decimal("0.3")

        # 构造观测
        obs = Observation(
            market_features=[
                bar.close, bar.volume, self._calculate_rsi(),
                *self.prices[-5:],  # 最近 5 个价格
            ],
            technical_indicators=[],
            portfolio_state=PortfolioState(),
            time_features=[],
        )

        # RL 模型输出动作(仓位比例 0.0 ~ 1.0)
        action = self.rl_position_optimizer.predict(obs)
        position_ratio = abs(action.target_position)

        # 限制最大仓位
        position_ratio = min(position_ratio, 0.5)

        return Decimal(str(position_ratio))

    def _create_order(self, side: Side, quantity: Decimal, reason: str) -> Order:
        """创建订单。"""
        return Order(
            client_order_id=OrderId.new(),
            symbol=Symbol("BTCUSDT"),
            side=side,
            order_type=OrderType.Market,
            price=None,
            quantity=quantity,
            time_in_force=TimeInForce.Gtc,
            exchange=ExchangeId.Binance,
            meta={
                "strategy": "MeanReversion",
                "reason": reason,
            },
        )


# ==================== RL 仓位优化器训练 ====================

def train_rl_position_optimizer():
    """
    训练一个 RL 模型来优化均值回归策略的仓位大小。

    观测空间: 价格、成交量、RSI、布林带位置、持仓状态
    动作空间: 连续值 [-1, 1],映射到仓位比例 [0, 50%]
    奖励: 交易收益 - 风险惩罚 (回撤、波动率)
    """
    from axon_quant import (
        TradingEnv, EnvConfig, DefaultObservationSpace,
        SharpeReward, PnLReward, MultiObjectiveReward,
        ActionSpace,
    )

    # 配置环境
    config = EnvConfig(
        initial_capital=10000.0,
        transaction_cost=0.001,
        slippage=0.0005,
        max_position_ratio=0.5,  # RL 最大允许 50% 仓位
        max_steps=1000,
        symbol="BTCUSDT",
        return_window=252,
    )

    # 观测空间
    obs_space = DefaultObservationSpace.new(
        window_size=10,
        features=[
            FeatureConfig(name="close", source=FeatureSource.Close, normalizer=NormalizerType.ZScore),
            FeatureConfig(name="volume", source=FeatureSource.Volume, normalizer=NormalizerType.ZScore),
            FeatureConfig(name="rsi", source=FeatureSource.RSI(14), normalizer=NormalizerType.MinMax),
        ],
    )

    # 奖励函数:多目标(收益 + 夏普比率)
    reward_fn = MultiObjectiveReward([
        PnLReward(relative=True, scale=1.0),
        SharpeReward(risk_free_rate=0.02, window=20),
    ])

    # 创建环境
    env = TradingEnv.new(
        config=config,
        action_space=ActionSpace.continuous(low=-1.0, high=1.0),
        observation_space=obs_space,
        reward_fn=reward_fn,
        market_data=load_training_bars(),
    )

    # 使用 PPO 训练
    from stable_baselines3 import PPO
    model = PPO("MlpPolicy", env, verbose=1)
    model.learn(total_timesteps=100_000)

    return model

3.3 套利策略:协整检验 + Ensemble 多对组合

"""
StatArbStrategy - 统计套利策略

核心逻辑:
- 选择: 通过协整检验找到价格序列稳定的配对
- 信号: 价差 Z-Score 超过阈值时开仓
- 仓位: 使用 DynamicWeightedEnsemble 管理多对组合的权重
- 出场: 价差回归均值 或 止损
"""

import numpy as np
from typing import List, Dict, Tuple, Optional
from decimal import Decimal
from dataclasses import dataclass

from axon_quant import (
    MarketBar, Order, OrderId, OrderType, Side, TimeInForce,
    Symbol, ExchangeId,
    DynamicWeightedEnsemble, ModelPerformance,
)


@dataclass
class Pair:
    """交易对结构体。"""
    symbol_a: str
    symbol_b: str
    hedge_ratio: float      # 对冲比例 (A / B)
    zscore_threshold: float # Z-Score 阈值,默认 2.0
    lookback: int           # 回看周期


class StatArbStrategy:
    """
    统计套利策略:协整配对 + Ensemble 权重管理。

    参数:
        pairs: 交易对列表
        max_pairs: 同时持有的最大配对数
        ensemble: 动态权重集成器(管理多对组合的权重)
    """

    def __init__(
        self,
        pairs: List[Pair],
        max_pairs: int = 5,
        ensemble: Optional[DynamicWeightedEnsemble] = None,
    ):
        self.pairs = pairs
        self.max_pairs = max_pairs
        self.ensemble = ensemble

        # 每个配对的价格历史
        self.price_history: Dict[str, List[float]] = {p.symbol_a: [] for p in pairs}
        self.price_history.update({p.symbol_b: [] for p in pairs})

        # 当前持仓
        self.positions: Dict[str, dict] = {}  # pair_key -> position_info

        # 各配对的表现记录(用于 Ensemble 权重更新)
        self.pair_returns: Dict[str, List[float]] = {self._pair_key(p): [] for p in pairs}

    def on_bar(self, bars: Dict[str, MarketBar]) -> List[Order]:
        """
        每根 K 线到达时处理所有配对。

        Args:
            bars: symbol -> MarketBar 的映射

        Returns:
            订单列表(可能包含多个配对的订单)
        """
        # 更新价格历史
        for symbol, bar in bars.items():
            if symbol in self.price_history:
                self.price_history[symbol].append(bar.close)
                # 保持历史长度
                max_lookback = max(p.lookback for p in self.pairs) * 2
                if len(self.price_history[symbol]) > max_lookback:
                    self.price_history[symbol] = self.price_history[symbol][-max_lookback:]

        orders = []
        pair_signals = []

        for pair in self.pairs:
            if len(self.price_history[pair.symbol_a]) < pair.lookback:
                continue

            signal = self._evaluate_pair(pair, bars)
            if signal:
                pair_signals.append((pair, signal))

        # 使用 Ensemble 选择最优配对(如果配置了)
        if self.ensemble and len(pair_signals) > self.max_pairs:
            pair_signals = self._select_pairs_with_ensemble(pair_signals)

        # 生成订单
        for pair, signal in pair_signals:
            pair_orders = self._create_pair_orders(pair, signal, bars)
            orders.extend(pair_orders)

        return orders

    def _evaluate_pair(self, pair: Pair, bars: Dict[str, MarketBar]) -> Optional[dict]:
        """
        评估单个配对,返回信号或 None。

        计算步骤:
        1. 计算价差 spread = price_a - hedge_ratio * price_b
        2. 计算价差的 Z-Score
        3. Z-Score 超过阈值时产生信号
        """
        prices_a = self.price_history[pair.symbol_a][-pair.lookback:]
        prices_b = self.price_history[pair.symbol_b][-pair.lookback:]

        # 计算价差序列
        spreads = [a - pair.hedge_ratio * b for a, b in zip(prices_a, prices_b)]

        # 计算当前价差和 Z-Score
        current_spread = spreads[-1]
        mean_spread = np.mean(spreads)
        std_spread = np.std(spreads)

        if std_spread == 0:
            return None

        zscore = (current_spread - mean_spread) / std_spread

        # 检查是否已有持仓
        pair_key = self._pair_key(pair)
        has_position = pair_key in self.positions

        # 开仓信号
        if not has_position:
            if zscore > pair.zscore_threshold:
                # 价差过高:做空 A,做多 B
                return {
                    "action": "open",
                    "direction": "short_spread",
                    "zscore": zscore,
                    "spread": current_spread,
                }
            elif zscore < -pair.zscore_threshold:
                # 价差过低:做多 A,做空 B
                return {
                    "action": "open",
                    "direction": "long_spread",
                    "zscore": zscore,
                    "spread": current_spread,
                }

        # 出场信号:价差回归均值
        elif has_position:
            position = self.positions[pair_key]
            if abs(zscore) < 0.5:  # 回归阈值
                return {
                    "action": "close",
                    "direction": position["direction"],
                    "zscore": zscore,
                    "spread": current_spread,
                }

        return None

    def _select_pairs_with_ensemble(
        self,
        pair_signals: List[Tuple[Pair, dict]],
    ) -> List[Tuple[Pair, dict]]:
        """
        使用 DynamicWeightedEnsemble 选择最优配对。

        思路:将每个配对视为一个"模型",根据其近期夏普比率动态分配资金权重。
        """
        # 构造观测(简化:使用各配对的 Z-Score 绝对值作为特征)
        observations = []
        for pair, signal in pair_signals:
            obs = Observation(
                market_features=[abs(signal["zscore"]), pair.hedge_ratio],
                technical_indicators=[],
                portfolio_state=PortfolioState(),
                time_features=[],
            )
            observations.append((pair, signal, obs))

        # 获取当前权重
        weights = self.ensemble.get_weights()
        weight_map = {w.model_name: w.weight for w in weights}

        # 按权重排序,选择 top N
        scored = []
        for pair, signal, obs in observations:
            pair_key = self._pair_key(pair)
            weight = weight_map.get(pair_key, 1.0 / len(pair_signals))
            scored.append((pair, signal, weight))

        scored.sort(key=lambda x: x[2], reverse=True)
        return [(p, s) for p, s, _ in scored[:self.max_pairs]]

    def _create_pair_orders(
        self,
        pair: Pair,
        signal: dict,
        bars: Dict[str, MarketBar],
    ) -> List[Order]:
        """为配对交易创建双向订单。"""
        orders = []
        pair_key = self._pair_key(pair)

        # 固定名义金额
        notional = 1000.0

        if signal["action"] == "open":
            if signal["direction"] == "short_spread":
                # 做空 A,做多 B
                qty_a = Decimal(str(notional / bars[pair.symbol_a].close))
                qty_b = Decimal(str(notional / bars[pair.symbol_b].close))

                orders.append(self._make_order(pair.symbol_a, Side.Sell, qty_a, signal))
                orders.append(self._make_order(pair.symbol_b, Side.Buy, qty_b, signal))

                self.positions[pair_key] = {
                    "direction": "short_spread",
                    "entry_spread": signal["spread"],
                    "qty_a": qty_a,
                    "qty_b": qty_b,
                }
            else:
                # 做多 A,做空 B
                qty_a = Decimal(str(notional / bars[pair.symbol_a].close))
                qty_b = Decimal(str(notional / bars[pair.symbol_b].close))

                orders.append(self._make_order(pair.symbol_a, Side.Buy, qty_a, signal))
                orders.append(self._make_order(pair.symbol_b, Side.Sell, qty_b, signal))

                self.positions[pair_key] = {
                    "direction": "long_spread",
                    "entry_spread": signal["spread"],
                    "qty_a": qty_a,
                    "qty_b": qty_b,
                }

        elif signal["action"] == "close":
            # 平仓:反向操作
            position = self.positions[pair_key]

            if position["direction"] == "short_spread":
                orders.append(self._make_order(pair.symbol_a, Side.Buy, position["qty_a"], signal))
                orders.append(self._make_order(pair.symbol_b, Side.Sell, position["qty_b"], signal))
            else:
                orders.append(self._make_order(pair.symbol_a, Side.Sell, position["qty_a"], signal))
                orders.append(self._make_order(pair.symbol_b, Side.Buy, position["qty_b"], signal))

            # 计算收益并记录
            pnl = self._calculate_pair_pnl(pair, position, signal["spread"])
            self.pair_returns[pair_key].append(pnl)

            del self.positions[pair_key]

        return orders

    def _make_order(self, symbol: str, side: Side, quantity: Decimal, signal: dict) -> Order:
        """创建单个订单。"""
        return Order(
            client_order_id=OrderId.new(),
            symbol=Symbol(symbol),
            side=side,
            order_type=OrderType.Market,
            price=None,
            quantity=quantity,
            time_in_force=TimeInForce.Gtc,
            exchange=ExchangeId.Binance,
            meta={
                "strategy": "StatArb",
                "action": signal["action"],
                "zscore": str(signal["zscore"]),
                "spread": str(signal["spread"]),
            },
        )

    def _calculate_pair_pnl(self, pair: Pair, position: dict, exit_spread: float) -> float:
        """计算配对交易的收益。"""
        entry_spread = position["entry_spread"]
        if position["direction"] == "short_spread":
            return entry_spread - exit_spread
        else:
            return exit_spread - entry_spread

    def _pair_key(self, pair: Pair) -> str:
        """生成配对的唯一标识。"""
        return f"{pair.symbol_a}_{pair.symbol_b}"

    def update_ensemble_weights(self):
        """
        定期更新 Ensemble 权重。

        根据各配对的近期夏普比率,调整资金分配。
        """
        if self.ensemble is None:
            return

        performances = []
        for pair in self.pairs:
            pair_key = self._pair_key(pair)
            returns = self.pair_returns[pair_key][-30:]  # 最近 30 期

            if len(returns) < 5:
                continue

            sharpe = np.mean(returns) / (np.std(returns) + 1e-6)
            max_dd = min(0, min(np.minimum.accumulate(np.cumsum(returns))))

            perf = ModelPerformance(
                model_name=pair_key,
                accuracy=0.5,
                sharpe_ratio=sharpe,
                max_drawdown=max_dd,
                total_return=sum(returns),
                sample_count=len(returns),
                last_evaluated=int(time.time()),
            )
            performances.append(perf)

        for perf in performances:
            self.ensemble.update_performance(perf)


# ==================== 协整检验(配对选择)====================

def find_cointegrated_pairs(prices_df: pd.DataFrame, significance: float = 0.05) -> List[Pair]:
    """
    从价格数据框中找到协整的配对。

    参数:
        prices_df: DataFrame,列为 symbol,行为时间序列价格
        significance: 显著性水平,默认 0.05

    返回:
        协整配对列表
    """
    from statsmodels.tsa.stattools import coint

    symbols = prices_df.columns.tolist()
    pairs = []

    for i, sym_a in enumerate(symbols):
        for sym_b in symbols[i + 1:]:
            # 执行协整检验 (Engle-Granger)
            score, pvalue, _ = coint(prices_df[sym_a], prices_df[sym_b])

            if pvalue < significance:
                # 计算对冲比例(OLS 回归)
                hedge_ratio = np.polyfit(prices_df[sym_b], prices_df[sym_a], 1)[0]

                pair = Pair(
                    symbol_a=sym_a,
                    symbol_b=sym_b,
                    hedge_ratio=hedge_ratio,
                    zscore_threshold=2.0,
                    lookback=20,
                )
                pairs.append(pair)
                print(f"发现协整配对: {sym_a}/{sym_b}, p-value={pvalue:.4f}, 对冲比={hedge_ratio:.4f}")

    return pairs

4. 策略与 AI 模块集成指南

4.1 集成架构图

┌─────────────────────────────────────────────────────────────────┐
│                        Strategy Layer                            │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐ │
│  │ Traditional │  │  Hybrid     │  │  AI-Native              │ │
│  │ (Rule-based)│  │ (Rule + AI) │  │  (RL / LLM)             │ │
│  └──────┬──────┘  └──────┬──────┘  └──────────┬──────────────┘ │
│         │                │                     │                │
│         └────────────────┼─────────────────────┘                │
│                          ▼                                      │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    AXON Core Services                     │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐  │  │
│  │  │ Backtest │ │  Risk    │ │  Exchange│ │  Tracker   │  │  │
│  │  │ Engine   │ │  Engine  │ │  Adapter │ │  (MLflow)  │  │  │
│  │  └──────────┘ └──────────┘ └──────────┘ └────────────┘  │  │
│  └──────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

4.2 集成最佳实践

4.2.1 传统策略接入 RL 环境

from axon_quant import TradingEnv, EnvConfig

# 传统策略可以包装成 Gymnasium 兼容的接口
def run_strategy_in_env(strategy, env: TradingEnv):
    """
    在 TradingEnv 中运行传统策略,利用环境的标准化接口。

    好处:
    - 统一的回测和评估标准
    - 自动记录 metrics 到 Tracker
    - 与 RL 策略公平对比
    """
    obs = env.reset()
    done = False

    while not done:
        # 传统策略直接读取环境状态做决策
        bar = env.current_bar()
        action = strategy.on_bar(bar)

        # 将订单转换为环境的动作格式
        env_action = convert_order_to_action(action)

        obs, reward, done, info = env.step(env_action)

    return env.get_statistics()

4.2.2 策略组合与 Ensemble

from axon_quant import EnsembleManager, SoftVoteStrategy

# 将多个策略(传统 + AI)组合成一个 Ensemble
def create_multi_strategy_ensemble():
    """
    组合不同类型的策略,利用 Ensemble 降低单一策略失效风险。
    """
    manager = EnsembleManager(strategy=SoftVoteStrategy())

    # 注册传统策略(包装为 Policy 接口)
    manager.register_model(MomentumPolicy("momentum", 5, 20))
    manager.register_model(MeanReversionPolicy("mean_rev", 20, 2.0))

    # 注册 RL 策略
    manager.register_model(RLPolicy("ppo", "models/ppo.pt"))
    manager.register_model(RLPolicy("sac", "models/sac.pt"))

    # 注册 LLM 策略
    manager.register_model(LLMPolicy("llm_trend", llm_backend, "trend"))

    return manager

4.2.3 特征共享

from axon_quant import DefaultObservationSpace, FeatureConfig

# 传统策略和 RL 策略共享同一套特征工程
def build_shared_observation_space():
    """
    统一特征定义,确保传统策略和 RL 模型看到相同的市场表示。
    """
    features = [
        FeatureConfig(name="close", source=FeatureSource.Close, normalizer=NormalizerType.ZScore),
        FeatureConfig(name="volume", source=FeatureSource.Volume, normalizer=NormalizerType.ZScore),
        FeatureConfig(name="rsi_14", source=FeatureSource.RSI(14), normalizer=NormalizerType.MinMax),
        FeatureConfig(name="macd", source=FeatureSource.MACD(12, 26, 9), normalizer=NormalizerType.ZScore),
        FeatureConfig(name="bb_position", source=FeatureSource.BollingerPosition(20, 2.0), normalizer=NormalizerType.MinMax),
    ]

    return DefaultObservationSpace.new(window_size=20, features=features)

4.3 性能对比与选择建议

策略类型 延迟 可解释性 适应性 最佳场景
纯规则 < 1ms 极高 明确的市场模式
规则 + LLM 10ms ~ 2s 需要新闻验证的场景
规则 + RL < 5ms 仓位/风险管理优化
纯 RL < 5ms 极高 复杂多变量环境
Ensemble < 10ms 极高 生产环境首选

5. 总结

AXON 框架对传统策略提供了完整的支持:

  1. 零 RL 依赖:纯 Python 策略可直接使用 BacktestEngineExchangeAdapter
  2. 渐进式增强:传统信号 + LLM 复核,在保持低延迟的同时获得 AI 能力
  3. 统一特征工程DefaultObservationSpace 让传统策略和 RL 模型共享同一套特征
  4. Ensemble 集成DynamicWeightedEnsemble 可同时管理传统策略和 AI 模型
  5. 完整生命周期:从回测、优化到生产部署,传统策略与 AI 策略走同一套流程

建议从简单的规则策略开始,逐步引入 LLM 复核和 RL 优化,最终通过 Ensemble 实现多策略协同。