跳转至

第 5 章:策略开发实战 (Strategy Implementation)

在理解了事件驱动引擎的原理后,我们来动手构建一个真正可用的交易策略。本章将详细拆解策略代码的结构,重点介绍策略生命周期 (Lifecycle)交易接口 (Trading API) 以及如何实现复杂的风控逻辑

5.1 策略类结构与继承

一个标准的 akquant 策略通常继承自 akquant.Strategy 基类,并重写以下几个核心回调方法 (Callbacks):

  1. __init__: 构造函数。定义策略参数和内部变量。
  2. on_start: 初始化钩子。回测开始前触发,常用于订阅数据、设置风控参数。
  3. on_bar: 事件处理钩子。每根 K 线走完时触发,这是策略逻辑的核心入口。
  4. on_stop: 结束钩子。回测结束时触发,常用于清理资源或统计结果。

5.1.1 示例代码:带止损的双均线策略

我们以一个增强版的双均线策略为例,增加了固定比例止损逻辑。

"""
第 4 章:构建第一个策略 (Strategy).

本示例详细展示了一个完整策略的结构,重点介绍:
1. **策略生命周期**:`__init__`, `on_start`, `on_bar`, `on_stop`
2. **数据获取**:使用 `get_history` 获取过去 N 天的数据
3. **交易接口**:使用 `buy`, `sell` 和 `order_target_percent`
4. **日志记录**:使用 `self.log` 记录关键信息

策略逻辑 (双均线改进版):
- 计算 5日均线 (MA5) 和 20日均线 (MA20)
- 金叉 (MA5 > MA20) 且无持仓 -> 买入
- 死叉 (MA5 < MA20) 且有持仓 -> 卖出
- 增加风控:如果亏损超过 5%,强制止损
"""

import akquant as aq
import numpy as np
import pandas as pd
from akquant import Bar, Strategy


# 模拟数据生成 (与第3章相同,方便复现)
def generate_mock_data(length: int = 500) -> pd.DataFrame:
    """生成模拟数据."""
    np.random.seed(42)
    dates = pd.date_range(start="2022-01-01", periods=length, freq="D")
    prices = 100 + np.cumsum(np.random.randn(length))
    df = pd.DataFrame(
        {
            "date": dates,
            "open": prices,
            "high": prices + 1,
            "low": prices - 1,
            "close": prices,
            "volume": 100000,
            "symbol": "MOCK",
        }
    )
    return df


class MyFirstStrategy(Strategy):
    """第一个策略示例."""

    # --------------------------------------------------------------------------
    # 1. 初始化 (Initialization)
    # --------------------------------------------------------------------------
    def __init__(
        self, short_window: int = 5, long_window: int = 20, stop_loss_pct: float = 0.05
    ) -> None:
        """策略初始化函数. 在这里定义策略的参数和内部变量.

        注意:此时回测引擎尚未启动,无法访问 context。
        """
        super().__init__()

        # 策略参数
        self.short_window = short_window
        self.long_window = long_window
        self.stop_loss_pct = stop_loss_pct

        # 内部状态变量
        self.entry_price = 0.0  # 记录开仓价格

        # 设置预热期 (Warmup Period)
        # 引擎会在正式回测前预加载数据,确保 get_history 能获取到足够的数据
        self.warmup_period = long_window

    # --------------------------------------------------------------------------
    # 2. 启动回调 (On Start)
    # --------------------------------------------------------------------------
    def on_start(self) -> None:
        """回测开始时触发. 此时引擎已就绪,可以进行一些初始化操作."""
        self.log("策略启动!")
        self.log(
            f"参数设置: MA{self.short_window} vs MA{self.long_window}, "
            f"止损={self.stop_loss_pct:.1%}"
        )

    # --------------------------------------------------------------------------
    # 3. Bar 数据回调 (On Bar) - 核心逻辑
    # --------------------------------------------------------------------------
    def on_bar(self, bar: Bar) -> None:
        """每根 K 线走完时触发."""
        symbol = bar.symbol

        # 3.1 获取历史数据
        # count=21 表示获取过去 21 根 Bar (包含当前这根)
        closes = self.get_history(
            count=self.long_window + 1, symbol=symbol, field="close"
        )

        # 再次检查数据长度 (防御性编程)
        if len(closes) < self.long_window + 1:
            return

        # 3.2 计算技术指标
        # 使用切片 [:-1] 排除当前 Bar,只用截止到昨天的数据计算信号 (避免未来函数)
        # 这里的逻辑假设我们在今天收盘后计算信号,明天开盘交易
        history_closes = closes[:-1]
        ma_short = history_closes[-self.short_window :].mean()
        ma_long = history_closes[-self.long_window :].mean()

        # 3.3 获取账户信息
        current_pos = self.get_position(symbol)

        # 3.4 交易逻辑

        # 情况 A: 持仓中 -> 检查止损或死叉
        if current_pos > 0:
            # 计算浮动盈亏比例
            pnl_pct = (bar.close - self.entry_price) / self.entry_price

            # 止损检查
            if pnl_pct < -self.stop_loss_pct:
                self.log(f"触发止损! 当前亏损: {pnl_pct:.2%}")
                self.close_position(symbol)  # 清仓
                return

            # 死叉卖出
            if ma_short < ma_long:
                self.log(
                    f"死叉卖出 (MA{self.short_window}={ma_short:.2f} < "
                    f"MA{self.long_window}={ma_long:.2f})"
                )
                self.close_position(symbol)  # 清仓

        # 情况 B: 空仓中 -> 检查金叉
        elif current_pos == 0:
            if ma_short > ma_long:
                self.log(
                    f"金叉买入 (MA{self.short_window}={ma_short:.2f} > "
                    f"MA{self.long_window}={ma_long:.2f})"
                )

                # 使用 order_target_percent 买入 95% 的资金
                self.order_target_percent(0.95, symbol)

                # 记录开仓价格 (近似值,实际成交价要等订单成交后才知道,这里暂用
                # 收盘价代替)
                self.entry_price = bar.close

    # --------------------------------------------------------------------------
    # 4. 结束回调 (On Stop)
    # --------------------------------------------------------------------------
    def on_stop(self) -> None:
        """回测结束时触发. 常用于统计结果或资源释放."""
        self.log("策略停止。")


if __name__ == "__main__":
    df = generate_mock_data()

    print("开始运行第 4 章示例策略...")
    result = aq.run_backtest(
        strategy=MyFirstStrategy,
        data=df,
        initial_cash=100_000,
        commission_rate=0.0003,  # 万三手续费
    )

    # 打印最终资金
    metrics = result.metrics_df
    end_value = (
        metrics.loc["end_market_value", "value"]
        if "end_market_value" in metrics.index
        else 0.0
    )
    print(f"回测结束,最终权益: {float(str(end_value)):.2f}")

5.2 深入理解生命周期 (Lifecycle Management)

5.2.1 __init__ vs on_start

  • __init__: 此时策略实例刚被创建,回测引擎尚未完全启动,你无法访问 self.ctx (Context) 或账户信息。只能做一些纯 Python 层面的变量初始化(如 self.ma_window = 20)。
  • on_start: 此时引擎已就绪 (Ready State)。你可以安全地调用 self.log(), self.get_position() 等依赖引擎上下文的 API。

5.2.2 on_bar 的执行流 (Execution Flow)

on_bar 是策略的心脏。它的标准执行流程如下:

  1. 数据获取 (Data Ingestion):使用 self.get_history() 获取所需的历史数据窗口。
  2. 信号计算 (Signal Generation):基于历史数据计算技术指标 (如 MA, RSI)。
  3. 状态检查 (State Inspection):获取当前持仓 (self.get_position) 和账户资金。
  4. 决策逻辑 (Decision Making):根据指标和持仓状态,判断是否买入或卖出。
  5. 订单执行 (Order Routing):调用下单函数 (self.buy, self.sell 等)。

5.3 交易接口详解 (Trading API)

akquant 提供了多种便捷的下单接口,底层会自动处理报单验证和资金冻结。

5.3.1 order_target_percent(symbol, target)

这是最常用的接口,实现了目标仓位管理。它会自动计算需要买入或卖出的数量,使持仓达到目标比例。

  • target=0.5: 买入直到持仓占总资产的 50%。
  • target=0.0: 清仓卖出所有持仓。
  • target=-0.5: (期货/融券) 卖空直到空头仓位占 50%。

5.3.2 buy(symbol, quantity) / sell(symbol, quantity)

最基础的原子接口,直接指定买卖数量。

  • quantity: 必须为正数。
  • 对于股票,数量通常需要是 100 的倍数 (手数),引擎会自动向下取整。

5.3.3 close_position(symbol)

一键平仓。无论当前持有多头还是空头,都会发出相反方向的市价单将其平掉。

5.4 高级策略模式 (Advanced Patterns)

在实际开发中,简单的双均线往往不够用。我们需要更复杂的策略模式。

5.4.1 多因子选股 (Multi-Factor Selection)

在多标的回测中(例如全市场选股),我们需要遍历所有标的,计算因子得分,然后构建组合。

设计模式

  1. 每日定时任务:使用 schedule_function 或在 on_bar 中检查是否是每日收盘。
  2. 横截面计算:获取所有标的当日收盘价。
  3. 排序与筛选:根据因子值排序,选出 Top N。
  4. 调仓:卖出不在 Top N 的标的,买入新进入 Top N 的标的。
def on_bar(self, bar):
    # 仅在每日收盘前执行 (假设日线数据)
    # 遍历所有关注的标的
    scores = {}
    for symbol in self.universe:
        # 计算因子...
        score = ...
        scores[symbol] = score

    # 排序选股
    target_symbols = sorted(scores, key=scores.get, reverse=True)[:10]

    # 调仓逻辑...

5.4.2 状态机策略 (Finite State Machine)

对于复杂的择时策略,使用状态机可以清晰地管理逻辑。

  • State 0 (空仓): 等待入场信号。
  • State 1 (持有): 监控止损/止盈。
  • State 2 (加仓): 盈利加仓。
  • State 3 (冷却): 止损后暂停交易一段时间。
class FSMStrategy(Strategy):
    def __init__(self):
        self.state = "EMPTY"
        self.cooldown_counter = 0

    def on_bar(self, bar):
        if self.state == "EMPTY":
            if self.signal_buy():
                self.buy(bar.symbol, 100)
                self.state = "HOLDING"

        elif self.state == "HOLDING":
            if self.check_stop_loss():
                self.close_position(bar.symbol)
                self.state = "COOLDOWN"
                self.cooldown_counter = 5

        elif self.state == "COOLDOWN":
            self.cooldown_counter -= 1
            if self.cooldown_counter <= 0:
                self.state = "EMPTY"

5.5 自定义指标开发 (Custom Indicators)

虽然 akquant 内置了常用的 TA-Lib 指标,但在实战中,我们经常需要开发私有指标。

5.5.1 继承 Indicator 基类

所有的指标都应继承自 akquant.Indicator,并实现 update 方法。这种设计支持增量计算 (Incremental Calculation),避免了每次重算整个历史序列的浪费。

class MyMomentum(Indicator):
    def __init__(self, period=10):
        super().__init__()
        self.period = period
        self.history = []

    def update(self, value):
        self.history.append(value)
        if len(self.history) > self.period:
            self.history.pop(0)

        if len(self.history) < self.period:
            return float('nan')

        return self.history[-1] - self.history[0]

5.5.2 在策略中使用

def __init__(self):
    self.my_mom = MyMomentum(period=10)

def on_bar(self, bar):
    mom_value = self.my_mom.update(bar.close)
    if not math.isnan(mom_value) and mom_value > 0:
        # Do something...

5.6 高级风控管理 (Risk Management)

风控是量化交易的生命线。除了基本的止损,我们还需要更高级的仓位管理技术。

5.6.1 凯利公式 (Kelly Criterion)

凯利公式用于计算在胜率和赔率已知的情况下,最优的下注比例。

\[ f^* = \frac{bp - q}{b} = \frac{p(b+1) - 1}{b} \]

其中:

  • \(f^*\):最优仓位比例。
  • \(b\):赔率(盈亏比)。
  • \(p\):胜率。
  • \(q\):败率 (\(1-p\))。

实战应用: 通常使用半凯利 (Half-Kelly),即只使用凯利公式计算出仓位的一半,以应对参数估计的不确定性。

5.6.2 波动率目标 (Volatility Targeting)

这是对冲基金最常用的风控手段。目标是保持组合的年化波动率恒定(例如 15%)。

\[ Weight_t = \frac{\text{Target Vol}}{\text{Realized Vol}_t} \]
  • 当市场波动率低时,加杠杆,提高资金利用率。
  • 当市场波动率高时,降仓位,控制风险暴露。
# 示例:波动率目标仓位管理
current_vol = np.std(returns[-20:]) * np.sqrt(252) # 年化波动率
target_vol = 0.15 # 目标 15% 波动率

leverage = target_vol / current_vol
# 限制最大杠杆
leverage = min(leverage, 1.5)

self.order_target_percent(symbol, leverage)

5.7 事件回调处理 (Event Handling)

除了 on_barakquant 还提供了丰富的事件回调,让你能精确控制交易流程。

5.7.1 on_order_status

当订单状态发生变化时(如从 Pending 变为 Filled,或被交易所 Rejected)触发。

def on_order_status(self, order: Order):
    if order.status == OrderStatus.FILLED:
        self.log(f"订单成交: {order.symbol} {order.filled_qty} @ {order.avg_price}")
    elif order.status == OrderStatus.REJECTED:
        self.log(f"订单被拒: {order.reason}", level="ERROR")
        # 可以在这里实现重试逻辑

5.7.2 on_trade

当发生实际成交时触发。与 on_order_status 的区别在于,一笔大单可能会分多次成交,每次成交都会触发 on_trade,而 on_order_status 通常只在状态跃迁时触发。


本章小结

5.6.3 止损逻辑 (Stop-Loss)

本章示例展示了一个简单的固定比例止损

# 计算浮动盈亏比例
pnl_pct = (bar.close - self.entry_price) / self.entry_price

# 止损检查
if pnl_pct < -self.stop_loss_pct:
    self.log(f"触发止损! 当前亏损: {pnl_pct:.2%}")
    self.close_position(symbol) # 清仓

5.8 调试与日志 (Debugging & Logging)

策略开发中最痛苦的莫过于逻辑不符合预期。akquant 提供了完善的日志系统。

  • self.log(msg): 会自动打上当前回测时间的标签 [2023-01-05 15:00:00] msg
  • 断点调试: 由于 akquant 是纯 Python/Rust 混合,你完全可以在 PyCharm/VSCode 中打断点调试 on_bar 逻辑。

小结:通过本章的学习,你已经掌握了编写一个完整策略所需的所有基础知识,包括生命周期管理、交易接口、高级模式和风控逻辑。下一章,我们将进入实战环节,探讨中国 A 股特有的交易规则(T+1、涨跌停)在回测中如何处理。