跳转至

第 3 章:金融数据获取与处理

本章实践入口

快速运行与验收

python examples/textbook/ch03_data.py

验收要点:

  1. 脚本可成功拉取并处理一段历史行情数据。
  2. 输出中能看到数据行数、时间范围或字段结构等基本信息。
  3. 数据可被后续回测脚本直接复用。

3.1 AkShare:量化投资的开源数据基石

在量化投资中,数据质量决定了策略的上限 (Garbage In, Garbage Out)。对于中国市场,AkShare 是目前最流行的开源金融数据接口库。它提供了从股票、期货、期权、基金到宏观经济的全维度数据。

3.1.1 安装与验证

在第 1 章中我们已经安装了 akshare。可以通过以下命令验证版本:

import akshare as ak
print(ak.__version__)

3.2 金融时间序列数据 (OHLCV)

量化回测中最基础的数据单元是 K线 (Candlestick),通常包含以下字段,简称为 OHLCV

  • Open: 开盘价
  • High: 最高价
  • Low: 最低价
  • Close: 收盘价
  • Volume: 成交量

3.2.1 复权 (Adjustment)

股票价格会受到分红配股拆细等除权除息行为的影响,导致价格出现断层。为了保证回测的连续性,必须对价格进行复权处理。

  • 前复权 (Forward Adjustment):以当前价格为基准,向前推算历史价格。回测推荐使用前复权,因为它保留了当前的真实价格水平,方便计算买入股数。
  • 后复权 (Backward Adjustment):以历史上市首日价格为基准,向后推算当前价格。适合计算长周期的收益率。
  • 不复权 (No Adjustment):原始价格。除权日会出现巨大的价格跳空,严禁直接用于策略回测

复权因子计算 (Adjustment Factor)

前复权价格的计算公式如下:

\[ P_{adj} = P_{raw} \times \frac{P_{today}}{P_{ex-right}} \]

其中 \(P_{adj}\) 是复权后价格,\(P_{raw}\) 是原始价格。对于分红(每股分红 \(D\)),除权价 \(P_{ex-right} = P_{close} - D\)。这意味着历史价格会相应调低,使得收益率曲线平滑连接。

3.2.2 数据频率 (Data Frequency)

量化回测通常使用不同频率的数据:

  • Tick 数据:逐笔成交数据(包含每一笔成交的时间、价格、量)。数据量极大,适合高频策略 (HFT)。
  • Bar 数据 (OHLCV):将一段时间内的 Tick 聚合为一个数据点(如 1分钟 Bar、日线 Bar)。这是最常用的格式。
  • Daily 数据:日线数据。包含开高低收及成交量。适合中低频策略(如趋势跟踪、多因子选股)。

akquant 核心引擎基于 Bar 数据驱动,支持任意周期的 Bar(1分钟、5分钟、日线等)。

akshare 中获取前复权数据非常简单:

import akshare as ak

# 获取浦发银行 (600000) 的日线数据,前复权
df = ak.stock_zh_a_hist(symbol="600000", period="daily", start_date="20200101", end_date="20231231", adjust="qfq")
print(df.head())

3.3 数据治理与 ETL 流程

在金融工程中,数据被视为核心资产。构建高质量的数据库需要严格遵循 ETL (Extract, Transform, Load) 流程。

3.3.1 数据清洗 (Data Cleaning)

原始数据通常包含噪音、缺失值甚至错误。常见的数据治理问题包括:

  1. 缺失值 (Missing Data)
    • 原因:停牌、数据源故障、非交易日。
    • 处理:前向填充 (Forward Fill)、插值法或直接剔除。
  2. 异常值 (Outliers)
    • 原因:乌龙指、数据录入错误。
    • 处理:使用 MAD (绝对中位差) 或 3\(\sigma\) 原则识别并修正。
  3. 幸存者偏差 (Survivorship Bias)
    • 定义:如果在回测中只包含当前存在的股票,而忽略了历史上已退市的股票,会导致回测结果虚高(因为退市股票通常表现很差)。
    • 对策:必须维护包含所有历史退市股票的“全集数据库”。
  4. 前视偏差 (Look-ahead Bias)
    • 定义:在 \(T\) 时刻做决策时,使用了 \(T+1\) 时刻才能获得的数据(如使用当天的收盘价来决定当天的开盘买入)。
    • 对策:严格的时间戳对齐,使用 Point-in-Time (PIT) 数据库。

3.3.2 数据存储 (Storage)

对于高频或海量数据,CSV 并非最佳选择。推荐使用更高效的二进制格式:

  • Parquet / Feather:列式存储,读取速度快,压缩率高,Pandas 完美支持。
  • HDF5:适合大规模数值矩阵存储。
  • KDB+ / DolphinDB:专业的时序数据库 (Time Series Database),适合机构级应用。

3.3.3 标准化字段定义

为了适配 akquant 引擎,所有数据必须被映射到以下标准字段:

字段名 类型 说明
date pd.Timestamp 交易日期/时间
symbol str 标的代码 (如 sh600000)
open float 开盘价
high float 最高价
low float 最低价
close float 收盘价
volume float 成交量

3.3.4 ETL 脚本示例

下面的代码演示了完整的 ETL 流程:从 AkShare 提取数据,清洗为标准格式,并保存为 Parquet 文件。

创建文件 examples/textbook/ch03_data.py

"""
第 3 章:金融数据获取与处理.

本示例演示了量化交易中数据工程的核心步骤:
1. 获取数据:从 AKShare 获取 A 股历史日线数据
2. 数据清洗:将原始数据转换为 AKQuant 所需的标准格式
3. 数据存储:将清洗后的数据保存为高性能的 Parquet 格式
4. 数据读取:从本地加载数据进行验证

这些步骤是构建任何量化策略的基石。
"""

import os
from pathlib import Path

import akshare as ak
import pandas as pd


def fetch_and_clean_data(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
    """
    获取 A 股日线数据并清洗为 AKQuant 标准格式.

    :param symbol: 股票代码 (如 "600000")
    :param start_date: 开始日期 (如 "20230101")
    :param end_date: 结束日期 (如 "20231231")
    :return: 清洗后的 DataFrame
    """
    print(f"正在获取 {symbol} 的历史数据 ({start_date}-{end_date})...")

    # 1. 获取数据
    # 使用 stock_zh_a_hist 接口获取历史行情
    # adjust="qfq" 表示前复权,这是回测中最常用的复权方式
    try:
        df = ak.stock_zh_a_hist(
            symbol=symbol,
            period="daily",
            start_date=start_date,
            end_date=end_date,
            adjust="qfq",
        )
    except Exception as e:
        print(f"数据获取失败: {e}")
        return pd.DataFrame()

    if df.empty:
        print("警告: 获取到的数据为空")
        from typing import cast

        return cast(pd.DataFrame, df)

    # 2. 重命名列 (AKShare 中文列名 -> AKQuant 标准英文列名)
    # 标准列名: date, open, high, low, close, volume
    rename_map = {
        "日期": "date",
        "开盘": "open",
        "最高": "high",
        "最低": "low",
        "收盘": "close",
        "成交量": "volume",
    }
    df = df.rename(columns=rename_map)

    # 3. 格式转换与清洗
    # 转换日期列为 pandas datetime 类型
    df["date"] = pd.to_datetime(df["date"])

    # 筛选必要的列,去除多余字段 (如涨跌幅、换手率等,除非策略需要)
    required_cols = ["date", "open", "high", "low", "close", "volume"]
    df = df[required_cols]

    # 添加 symbol 列 (多标的回测时必需)
    df["symbol"] = symbol

    # 处理缺失值 (简单的丢弃策略)
    df = df.dropna()

    # 按日期升序排序
    df = df.sort_values("date").reset_index(drop=True)

    print(f"数据清洗完成,共 {len(df)} 条记录")
    return df  # type: ignore


def save_to_parquet(df: pd.DataFrame, file_path: str) -> None:
    """
    将 DataFrame 保存为 Parquet 格式.

    Parquet 是一种高性能列式存储格式,读写速度远快于 CSV。
    """
    # 确保父目录存在
    Path(file_path).parent.mkdir(parents=True, exist_ok=True)

    df.to_parquet(file_path)
    print(f"数据已保存至: {file_path}")


def load_from_parquet(file_path: str) -> pd.DataFrame:
    """从 Parquet 文件读取数据."""
    if not os.path.exists(file_path):
        print(f"文件不存在: {file_path}")
        return pd.DataFrame()

    df = pd.read_parquet(file_path)
    print(f"从本地加载数据成功,共 {len(df)} 条记录")
    return df


if __name__ == "__main__":
    # 配置参数
    SYMBOL = "600000"  # 浦发银行
    START_DATE = "20230101"
    END_DATE = "20231231"
    DATA_DIR = "data"  # 数据存储目录

    # 1. 获取并清洗数据
    df_clean = fetch_and_clean_data(SYMBOL, START_DATE, END_DATE)

    if not df_clean.empty:
        # 打印前 5 行预览
        print("\n数据预览 (Head):")
        print(df_clean.head())

        # 打印数据类型信息
        print("\n数据信息 (Info):")
        df_clean.info()

        # 2. 保存数据
        file_path = f"{DATA_DIR}/{SYMBOL}.parquet"
        save_to_parquet(df_clean, file_path)

        # 3. 验证读取
        print("\n正在验证读取...")
        df_loaded = load_from_parquet(file_path)

        # 简单验证
        assert len(df_clean) == len(df_loaded)
        print("验证通过:读取的数据与原始数据一致")

运行结果

python examples/textbook/ch03_data.py

你将在控制台看到数据清洗前后的对比,并在 data/ 目录下找到生成的 .parquet 文件。

3.4 数据库设计 (Database Design)

随着数据量的增长,单纯的文件存储(CSV/Parquet)将难以满足查询需求。我们需要引入专业的数据库。

3.4.1 关系型数据库 (Relational DB)

  • 代表:PostgreSQL, MySQL。
  • 适用:资产基础信息(如股票代码、上市日期、行业分类)、交易账户信息(如资金流水、订单记录)。
  • 特点:支持复杂关联查询 (JOIN),事务一致性 (ACID) 强。

3.4.2 时序数据库 (Time-Series DB)

  • 代表:ClickHouse, InfluxDB, DolphinDB。
  • 适用:行情数据 (Tick/Bar)、高频因子数据。
  • 特点
    • 写入快:每秒可写入百万级数据点。
    • 压缩率高:列式存储,针对时间序列优化的压缩算法(如 Delta Encoding)。
    • 聚合快:计算“某股票过去一年的平均成交量”仅需毫秒级。

ClickHouse 建表示例

CREATE TABLE kline_1m (
    date Date,
    datetime DateTime,
    symbol String,
    open Float32,
    high Float32,
    low Float32,
    close Float32,
    volume Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (symbol, datetime);

3.5 特征存储 (Feature Store)

在机器学习项目中,特征工程往往是最耗时的。为了避免重复计算,我们需要构建特征存储 (Feature Store)

  • 离线存储 (Offline Store):存储历史特征(如过去 10 年的 5日均线),用于模型训练。通常基于数仓 (Hive) 或对象存储 (S3)。
  • 在线存储 (Online Store):存储最新特征(如当天的 5日均线),用于实盘预测。通常基于 Redis,要求低延迟读取。
  • 一致性:保证训练和推理使用完全相同的特征计算逻辑。

3.6 实时数据流 (Real-time Stream)

在实盘交易中,我们需要处理实时推送的数据流。

  • WebSocket:建立持久连接,服务端主动推送数据。比轮询 (Polling) HTTP 接口效率高得多。
  • 消息队列 (Kafka/RabbitMQ):在数据源和策略引擎之间引入缓冲层。防止行情爆发时(如开盘瞬间)数据积压导致系统崩溃。

akquant 的实盘网关模块内置了 WebSocket 客户端,并自动处理断线重连和心跳保活。


本章小结

  1. 数据质量决定策略上限,清洗和对齐是回测前置条件。
  2. OHLCV、复权、停牌处理是 A 股数据工程的三大关键点。
  3. 从离线存储到实时流式处理,需要统一字段与时间语义。

课后练习

  1. 下载同一标的的日线与分钟线,比较字段与时间粒度差异。
  2. 对一段数据分别做前复权与不复权,观察价格序列变化。
  3. 编写一个最小校验脚本,自动检查缺失值与重复时间戳。

常见错误与排查

  1. 时间列错位:确认时区和交易日历是否一致。
  2. 价格跳变异常:检查是否遗漏复权处理。
  3. 回测读不到数据:核对本地缓存路径与字段命名是否与引擎约定一致。