第 5 章:策略开发实战 (Strategy Implementation)¶
在理解了事件驱动引擎的原理后,我们来动手构建一个真正可用的交易策略。本章将详细拆解策略代码的结构,重点介绍策略生命周期 (Lifecycle)、交易接口 (Trading API) 以及如何实现复杂的风控逻辑。
本章实践入口¶
快速运行与验收¶
验收要点:
- 脚本可完成策略初始化、信号生成、下单与回测统计输出。
- 日志中可观察到订单状态变化与关键风控触发信息。
- 调整均线参数后,回测结果会出现可解释的变化。
5.1 策略类结构与继承¶
一个标准的 AKQuant 策略通常继承自 AKQuant.Strategy 基类,并重写以下几个核心回调方法 (Callbacks):
__init__: 构造函数。定义策略参数和内部变量。on_start: 初始化钩子。回测开始前触发,常用于订阅数据、设置风控参数。on_bar: 事件处理钩子。每根 K 线走完时触发,这是策略逻辑的核心入口。on_stop: 结束钩子。回测结束时触发,常用于清理资源或统计结果。
5.1.1 示例代码:带止损的双均线策略¶
我们以一个增强版的双均线策略为例,增加了固定比例止损逻辑。
"""
第 5 章:构建第一个策略 (Strategy).
本示例详细展示了一个完整策略的结构,重点介绍:
1. **策略生命周期**:`__init__`, `on_start`, `on_bar`, `on_stop`
2. **数据获取**:使用 `get_history` 获取过去 N 天的数据
3. **交易接口**:使用 `buy`, `sell` 和 `order_target_percent`
4. **日志记录**:使用 `self.log` 记录关键信息
策略逻辑 (双均线改进版):
- 计算 5日均线 (MA5) 和 20日均线 (MA20)
- 金叉 (MA5 > MA20) 且无持仓 -> 买入
- 死叉 (MA5 < MA20) 且有持仓 -> 卖出
- 增加风控:如果亏损超过 5%,强制止损
"""
import akquant as aq
import numpy as np
import pandas as pd
from akquant import Bar, Strategy
# 模拟数据生成 (与第3章相同,方便复现)
def generate_mock_data(length: int = 500) -> pd.DataFrame:
"""生成模拟数据."""
np.random.seed(42)
dates = pd.date_range(start="2022-01-01", periods=length, freq="D")
prices = 100 + np.cumsum(np.random.randn(length))
df = pd.DataFrame(
{
"date": dates,
"open": prices,
"high": prices + 1,
"low": prices - 1,
"close": prices,
"volume": 100000,
"symbol": "MOCK",
}
)
return df
class MyFirstStrategy(Strategy):
"""第一个策略示例."""
# --------------------------------------------------------------------------
# 1. 初始化 (Initialization)
# --------------------------------------------------------------------------
def __init__(
self, short_window: int = 5, long_window: int = 20, stop_loss_pct: float = 0.05
) -> None:
"""策略初始化函数. 在这里定义策略的参数和内部变量.
注意:此时回测引擎尚未启动,无法访问 context。
"""
super().__init__()
# 策略参数
self.short_window = short_window
self.long_window = long_window
self.stop_loss_pct = stop_loss_pct
# 内部状态变量
self.entry_price = 0.0 # 记录开仓价格
# 设置预热期 (Warmup Period)
# 引擎会在正式回测前预加载数据,确保 get_history 能获取到足够的数据
self.warmup_period = long_window
# --------------------------------------------------------------------------
# 2. 启动回调 (On Start)
# --------------------------------------------------------------------------
def on_start(self) -> None:
"""回测开始时触发. 此时引擎已就绪,可以进行一些初始化操作."""
self.log("策略启动!")
self.log(
f"参数设置: MA{self.short_window} vs MA{self.long_window}, "
f"止损={self.stop_loss_pct:.1%}"
)
# --------------------------------------------------------------------------
# 3. Bar 数据回调 (On Bar) - 核心逻辑
# --------------------------------------------------------------------------
def on_bar(self, bar: Bar) -> None:
"""每根 K 线走完时触发."""
symbol = bar.symbol
# 3.1 获取历史数据
# count=21 表示获取过去 21 根 Bar (包含当前这根)
closes = self.get_history(
count=self.long_window + 1, symbol=symbol, field="close"
)
# 再次检查数据长度 (防御性编程)
if len(closes) < self.long_window + 1:
return
# 3.2 计算技术指标
# 使用切片 [:-1] 排除当前 Bar,只用截止到昨天的数据计算信号 (避免未来函数)
# 这里的逻辑假设我们在今天收盘后计算信号,明天开盘交易
history_closes = closes[:-1]
ma_short = history_closes[-self.short_window :].mean()
ma_long = history_closes[-self.long_window :].mean()
# 3.3 获取账户信息
current_pos = self.get_position(symbol)
# 3.4 交易逻辑
# 情况 A: 持仓中 -> 检查止损或死叉
if current_pos > 0:
# 计算浮动盈亏比例
pnl_pct = (bar.close - self.entry_price) / self.entry_price
# 止损检查
if pnl_pct < -self.stop_loss_pct:
self.log(f"触发止损! 当前亏损: {pnl_pct:.2%}")
self.close_position(symbol) # 清仓
return
# 死叉卖出
if ma_short < ma_long:
self.log(
f"死叉卖出 (MA{self.short_window}={ma_short:.2f} < "
f"MA{self.long_window}={ma_long:.2f})"
)
self.close_position(symbol) # 清仓
# 情况 B: 空仓中 -> 检查金叉
elif current_pos == 0:
if ma_short > ma_long:
self.log(
f"金叉买入 (MA{self.short_window}={ma_short:.2f} > "
f"MA{self.long_window}={ma_long:.2f})"
)
# 使用 order_target_percent 买入 95% 的资金
self.order_target_percent(0.95, symbol)
# 记录开仓价格 (近似值,实际成交价要等订单成交后才知道,这里暂用
# 收盘价代替)
self.entry_price = bar.close
# --------------------------------------------------------------------------
# 4. 结束回调 (On Stop)
# --------------------------------------------------------------------------
def on_stop(self) -> None:
"""回测结束时触发. 常用于统计结果或资源释放."""
self.log("策略停止。")
if __name__ == "__main__":
df = generate_mock_data()
print("开始运行第 5 章示例策略...")
result = aq.run_backtest(
strategy=MyFirstStrategy,
data=df,
initial_cash=100_000,
commission_rate=0.0003, # 万三手续费
)
# 打印最终资金
metrics = result.metrics_df
end_value = (
metrics.loc["end_market_value", "value"]
if "end_market_value" in metrics.index
else 0.0
)
print(f"回测结束,最终权益: {float(str(end_value)):.2f}")
5.2 深入理解生命周期 (Lifecycle Management)¶
5.2.1 __init__ vs on_start¶
__init__: 此时策略实例刚被创建,回测引擎尚未完全启动,你无法访问self.ctx(Context) 或账户信息。只能做一些纯 Python 层面的变量初始化(如self.ma_window = 20)。on_start: 此时引擎已就绪 (Ready State)。你可以安全地调用self.log(),self.get_position()等依赖引擎上下文的 API。
5.2.2 on_bar 的执行流 (Execution Flow)¶
on_bar 是策略的心脏。它的标准执行流程如下:
- 数据获取 (Data Ingestion):使用
self.get_history()获取所需的历史数据窗口。 - 信号计算 (Signal Generation):基于历史数据计算技术指标 (如 MA, RSI)。
- 状态检查 (State Inspection):获取当前持仓 (
self.get_position) 和账户资金。 - 决策逻辑 (Decision Making):根据指标和持仓状态,判断是否买入或卖出。
- 订单执行 (Order Routing):调用下单函数 (
self.buy,self.sell等)。
5.2.3 类风格与函数式入口边界¶
AKQuant 同时支持两种策略入口:
- 类风格:
strategy=MyStrategy,适合中长期维护、复杂状态管理。 - 函数式:
strategy=on_bar+initialize=...,适合快速原型、脚本化调试。
函数式入口示例:
import akquant as aq
def initialize(ctx):
ctx.counter = 0
def on_bar(ctx, bar):
ctx.counter += 1
if ctx.get_position(bar.symbol) == 0:
ctx.buy(bar.symbol, 1)
else:
ctx.sell(bar.symbol, 1)
result = aq.run_backtest(
data=data_feed,
strategy=on_bar,
initialize=initialize,
symbols="TEST",
)
当你需要 on_start/on_stop/on_order/on_trade 等完整生命周期并封装为可复用组件时,优先使用类风格;当你需要快速验证交易逻辑和参数时,函数式入口更轻量。
5.3 交易接口详解 (Trading API)¶
AKQuant 提供了多种便捷的下单接口,底层会自动处理报单验证和资金冻结。
5.3.1 order_target_percent(symbol, target)¶
这是最常用的接口,实现了目标仓位管理。它会自动计算需要买入或卖出的数量,使持仓达到目标比例。
target=0.5: 买入直到持仓占总资产的 50%。target=0.0: 清仓卖出所有持仓。target=-0.5: (期货/融券) 卖空直到空头仓位占 50%。
5.3.2 buy(symbol, quantity) / sell(symbol, quantity)¶
最基础的原子接口,直接指定买卖数量。
quantity: 必须为正数。- 对于股票,数量通常需要是 100 的倍数 (手数),引擎会自动向下取整。
5.3.3 close_position(symbol)¶
一键平仓。无论当前持有多头还是空头,都会发出相反方向的市价单将其平掉。
5.3.4 信用账户参数与账户快照¶
当你在股票场景做融资/融券回测时,需要在 RiskConfig 中显式启用信用账户模式:
from akquant.config import RiskConfig
risk_config = RiskConfig(
account_mode="margin",
enable_short_sell=True,
initial_margin_ratio=0.5,
maintenance_margin_ratio=0.3,
financing_rate_annual=0.08,
borrow_rate_annual=0.10,
allow_force_liquidation=True,
liquidation_priority="short_first",
)
策略内可通过 get_account() 读取信用账户专有字段:
borrowed_cash: 融资负债short_market_value: 空头市值maintenance_ratio: 维持担保比例accrued_interest/daily_interest: 累计与当日计息
snap = self.get_account()
print(
snap["account_mode"],
snap["borrowed_cash"],
snap["short_market_value"],
snap["maintenance_ratio"],
snap["accrued_interest"],
snap["daily_interest"],
)
5.4 高级策略模式 (Advanced Patterns)¶
在实际开发中,简单的双均线往往不够用。我们需要更复杂的策略模式。
5.4.1 多因子选股 (Multi-Factor Selection)¶
在多标的回测中(例如全市场选股),我们需要遍历所有标的,计算因子得分,然后构建组合。
设计模式:
- 每日定时任务:使用
schedule_function或在on_bar中检查是否是每日收盘。 - 横截面计算:获取所有标的当日收盘价。
- 排序与筛选:根据因子值排序,选出 Top N。
- 调仓:卖出不在 Top N 的标的,买入新进入 Top N 的标的。
def on_bar(self, bar):
# 仅在每日收盘前执行 (假设日线数据)
# 遍历所有关注的标的
scores = {}
for symbol in self.universe:
# 计算因子...
score = ...
scores[symbol] = score
# 排序选股
target_symbols = sorted(scores, key=scores.get, reverse=True)[:10]
# 调仓逻辑...
5.4.2 状态机策略 (Finite State Machine)¶
对于复杂的择时策略,使用状态机可以清晰地管理逻辑。
- State 0 (空仓): 等待入场信号。
- State 1 (持有): 监控止损/止盈。
- State 2 (加仓): 盈利加仓。
- State 3 (冷却): 止损后暂停交易一段时间。
class FSMStrategy(Strategy):
def __init__(self):
self.state = "EMPTY"
self.cooldown_counter = 0
def on_bar(self, bar):
if self.state == "EMPTY":
if self.signal_buy():
self.buy(bar.symbol, 100)
self.state = "HOLDING"
elif self.state == "HOLDING":
if self.check_stop_loss():
self.close_position(bar.symbol)
self.state = "COOLDOWN"
self.cooldown_counter = 5
elif self.state == "COOLDOWN":
self.cooldown_counter -= 1
if self.cooldown_counter <= 0:
self.state = "EMPTY"
5.5 自定义指标开发 (Custom Indicators)¶
AKQuant 目前已经提供了 AKQuant.talib 兼容层,并支持 python/rust 双后端;但在实战中,我们仍会频繁遇到需要开发私有指标或策略专用信号的场景。
5.5.1 继承 Indicator 基类¶
所有的指标都应继承自 AKQuant.Indicator,并实现 update 方法。这种设计支持增量计算 (Incremental Calculation),避免了每次重算整个历史序列的浪费。
class MyMomentum(Indicator):
def __init__(self, period=10):
super().__init__()
self.period = period
self.history = []
def update(self, value):
self.history.append(value)
if len(self.history) > self.period:
self.history.pop(0)
if len(self.history) < self.period:
return float('nan')
return self.history[-1] - self.history[0]
5.5.2 在策略中使用¶
def __init__(self):
self.my_mom = MyMomentum(period=10)
def on_bar(self, bar):
mom_value = self.my_mom.update(bar.close)
if not math.isnan(mom_value) and mom_value > 0:
# Do something...
5.5.3 使用 AKQuant.talib 双后端¶
当策略从 TA-Lib 迁移时,建议先保持函数签名不变,再通过 backend 参数切换执行后端。
backend="auto"默认走rust。- 需要与历史策略逐步对齐时,建议显式使用
backend="python"。 - 如需全局覆盖
auto,可设置环境变量AKQUANT_TALIB_AUTO_BACKEND=python|rust。
from akquant import talib as ta
close = df["close"].to_numpy()
high = df["high"].to_numpy()
low = df["low"].to_numpy()
# python 后端(兼容基线)
rsi_py = ta.RSI(close, timeperiod=14, backend="python")
# rust 后端(高性能)
rsi_rs = ta.RSI(close, timeperiod=14, backend="rust")
adx_rs = ta.ADX(high, low, close, timeperiod=14, backend="rust")
slowk_rs, slowd_rs = ta.STOCH(
high,
low,
close,
fastk_period=5,
slowk_period=3,
slowd_period=3,
backend="rust",
)
当前 rust backend 已覆盖:
- 单输出:
SMA/EMA/RSI/ATR/ROC/WILLR/CCI/ADX/MFI/OBV/TRIX/MOM/DEMA/TEMA/KAMA/NATR/SAR - 多输出:
MACD/BBANDS/STOCH
批次 B/C 的常见调用示例:
volume = df["volume"].to_numpy()
mfi_rs = ta.MFI(high, low, close, volume, timeperiod=14, backend="rust")
obv_rs = ta.OBV(close, volume, backend="rust")
trix_rs = ta.TRIX(close, timeperiod=15, backend="rust")
mom_rs = ta.MOM(close, period=10, backend="rust")
dema_rs = ta.DEMA(close, timeperiod=20, backend="rust")
tema_rs = ta.TEMA(close, timeperiod=20, backend="rust")
kama_rs = ta.KAMA(close, period=10, backend="rust")
natr_rs = ta.NATR(high, low, close, timeperiod=14, backend="rust")
sar_rs = ta.SAR(high, low, acceleration=0.02, maximum=0.2, backend="rust")
在策略里使用时,建议显式处理 warmup 区段:
import numpy as np
signal = ta.TEMA(close, timeperiod=20, backend="rust")
last_signal = signal[-1]
if np.isnan(last_signal):
return
在工程实践中,推荐流程是:
- 先用
backend="python"与原策略对齐结果; - 对齐完成后切
backend="auto"(默认rust)或显式backend="rust"做性能提速; - 用固定数据集回归验证 warmup 与输出形态(单值或 tuple)一致。
- 对支持
period别名的指标优先沿用旧参数命名,降低迁移成本。
5.5.4 指标选型与组合模板¶
实战里不建议“单指标决策”,更推荐“趋势 + 动量 + 波动率/风险”组合。
| 场景 | 推荐组合 | 起步参数(可回测微调) | 说明 |
|---|---|---|---|
| 趋势跟随 | EMA + ADX + NATR |
EMA(20/60), ADX(14), NATR(14) |
用 ADX 过滤震荡,用 NATR 控制仓位 |
| 均值回归 | BBANDS + RSI |
BBANDS(20,2,2), RSI(14) |
价格触带 + RSI 极值联合触发 |
| 量价确认 | OBV + MFI + ROC |
MFI(14), ROC(10) |
方向信号由价给出,量能决定是否放行 |
| 跟踪止损 | SAR + ATR |
SAR(0.02,0.2), ATR(14) |
用 SAR 跟踪趋势,用 ATR 定义止损宽度 |
组合模板示例(趋势跟随):
ema_fast = ta.EMA(close, timeperiod=20, backend="rust")
ema_slow = ta.EMA(close, timeperiod=60, backend="rust")
adx = ta.ADX(high, low, close, timeperiod=14, backend="rust")
natr = ta.NATR(high, low, close, timeperiod=14, backend="rust")
if np.isnan(ema_fast[-1]) or np.isnan(adx[-1]) or np.isnan(natr[-1]):
return
trend_up = ema_fast[-1] > ema_slow[-1]
trend_strong = adx[-1] >= 20
risk_ok = natr[-1] < 4.0
if trend_up and trend_strong and risk_ok:
self.buy(symbol, 100)
组合模板示例(均值回归):
upper, middle, lower = ta.BBANDS(close, timeperiod=20, backend="rust")
rsi = ta.RSI(close, timeperiod=14, backend="rust")
if np.isnan(lower[-1]) or np.isnan(rsi[-1]):
return
long_signal = close[-1] < lower[-1] and rsi[-1] < 30
exit_signal = close[-1] > middle[-1]
延伸阅读:
- 指标组合实战手册
- 可运行示例:45_talib_indicator_playbook_demo.py
- 可选真实数据模式:python examples/45_talib_indicator_playbook_demo.py --data-source akshare --symbol sh600000 --start-date 20240101 --end-date 20260301
5.6 高级风控管理 (Risk Management)¶
风控是量化交易的生命线。除了基本的止损,我们还需要更高级的仓位管理技术。
提示:本节讨论的是策略层的风控(如根据波动率动态调整仓位)。如果你需要引擎层的硬性风控(如限制单股持仓不超过 10%、总杠杆不超过 1.5倍),请参考 4.8 风控引擎。
5.6.1 凯利公式 (Kelly Criterion)¶
凯利公式用于计算在胜率和赔率已知的情况下,最优的下注比例。
其中:
- \(f^*\):最优仓位比例。
- \(b\):赔率(盈亏比)。
- \(p\):胜率。
- \(q\):败率 (\(1-p\))。
实战应用: 通常使用半凯利 (Half-Kelly),即只使用凯利公式计算出仓位的一半,以应对参数估计的不确定性。
5.6.2 波动率目标 (Volatility Targeting)¶
这是对冲基金最常用的风控手段。目标是保持组合的年化波动率恒定(例如 15%)。
- 当市场波动率低时,加杠杆,提高资金利用率。
- 当市场波动率高时,降仓位,控制风险暴露。
# 示例:波动率目标仓位管理
current_vol = np.std(returns[-20:]) * np.sqrt(252) # 年化波动率
target_vol = 0.15 # 目标 15% 波动率
leverage = target_vol / current_vol
# 限制最大杠杆
leverage = min(leverage, 1.5)
self.order_target_percent(symbol, leverage)
5.6.3 止损逻辑 (Stop-Loss)¶
本章示例展示了一个简单的固定比例止损:
# 计算浮动盈亏比例
pnl_pct = (bar.close - self.entry_price) / self.entry_price
# 止损检查
if pnl_pct < -self.stop_loss_pct:
self.log(f"触发止损! 当前亏损: {pnl_pct:.2%}")
self.close_position(symbol) # 清仓
5.7 事件回调处理 (Event Handling)¶
除了 on_bar,AKQuant 还提供了丰富的事件回调,让你能精确控制交易流程。
5.7.1 on_order¶
当订单状态发生变化时(如从 Submitted 到 Filled,或被风控/交易所 Rejected)触发。
def on_order(self, order):
if str(order.status) == "Filled":
self.log(f"订单成交: {order.symbol} {order.filled_quantity} @ {order.average_filled_price}")
elif str(order.status) == "Rejected":
self.log(f"订单被拒: {order.reject_reason}", level="ERROR")
# 可以在这里实现重试逻辑
5.7.2 on_trade¶
当发生实际成交时触发。与 on_order 的区别在于,一笔大单可能会分多次成交,每次成交都会触发 on_trade,而 on_order 更关注订单状态流转。
5.7.3 on_event:策略外的统一事件流¶
如果你需要把回测事件推送到日志系统、监控看板或告警服务,可在 run_backtest 入口直接传入 on_event,统一消费事件流。
events = []
result = aq.run_backtest(
data=data_feed,
strategy=MyStrategy,
symbols="AAPL",
on_event=events.append,
)
该方式不要求改动策略类内部代码,适合将“交易逻辑”和“可观测性管道”分层维护。
5.8 调试与日志 (Debugging & Logging)¶
策略开发中最痛苦的莫过于逻辑不符合预期。AKQuant 提供了完善的日志系统。
self.log(msg): 会自动打上当前回测时间的标签[2023-01-05 15:00:00] msg。- 断点调试: 由于
AKQuant是纯 Python/Rust 混合,你完全可以在 PyCharm/VSCode 中打断点调试on_bar逻辑。
本章小结¶
- 生命周期回调是策略执行的主骨架,
on_bar是核心决策入口。 - 下单接口与风控规则必须协同设计,才能避免策略“能跑但不可交易”。
- 事件回调与日志体系是策略调试和可观测性的基础设施。
课后练习¶
- 给双均线策略新增一个成交量过滤条件并对比结果。
- 在
on_order中增加拒单分类统计,输出拒单原因分布。 - 增加一个策略级最大回撤止损并验证触发逻辑。
常见错误与排查¶
- 无交易发生:检查信号条件是否过严或数据窗口不足。
- 订单被拒绝:核对现金、持仓、最小交易单位和交易方向约束。
- 回测结果异常抖动:确认是否存在未来函数或未对齐的数据字段。