量化交易系统:历史行情 API 批量拉取与回测数据清洗

用户头像Fxdund
2026-05-25 发布

做量化交易的人都知道,回测系统的核心不是策略有多花哨,而是数据有多可靠
如果历史行情数据本身就有问题,那么再完美的回测结果也只是“垃圾进,垃圾出”。
量化交易系统.png

本文从实战出发,聊聊如何通过 API 批量拉取历史行情数据,并做一套严谨的回测数据清洗流程。这些坑,我都踩过。

一、为什么历史行情数据这么难搞?

很多人以为历史行情就是“股票代码+日期+开高低收+成交量”。真上手才发现,问题一大堆:

  • 不同数据源格式不同,有的前复权、有的后复权、有的不复权
  • 停牌日、除权除息日、涨跌停板数据容易被忽略
  • API 限流、断点续传、数据缺失需要处理
  • 国内 A 股、美股、期货的数据格式和规则差异巨大

一个合格的量化回测系统,必须能从源头保证数据的完整性、一致性、无偏性

二、批量拉取的工程设计

2.1 基础思路

不要一次性拉全部历史数据,更不要写死日期。合理的设计应该是:

配置股票池 → 判断本地已有数据 → 只拉缺失区间 → 合并去重 → 校验一致性

2.2 代码示例:带断点续传的批量拉取

下面使用 iTick API 获取历史日线数据(前复权),并实现本地缓存与断点续传。

import requests
import pandas as pd
import time
from pathlib import Path

API_TOKEN = "your_token_here"  # 替换为实际 Token
BASE_URL = "//api.itick.org"

def build_headers():
    """构造请求头,包含 API Token 验证"""
    return {
        "token": API_TOKEN,
        "Content-Type": "application/json"
    }
def fetch_stock_history(stock_code, region="HK", k_type=8, start_date="20000101",
                        end_date="20231231", cache_dir="./data/raw"):
    """
    带缓存的批量拉取,自动断点续传

    参数说明:
        stock_code : 股票代码(港股示例:00700)
        region     : 市场代码(HK/US/SZ/SH 等)
        k_type     : K线类型(8:日线,9:周线,10:月线)
        start_date : 开始日期(格式 YYYYMMDD)
        end_date   : 结束日期(格式 YYYYMMDD)
        cache_dir  : 本地缓存目录
    """
    Path(cache_dir).mkdir(parents=True, exist_ok=True)
    cache_file = Path(cache_dir) / f"{stock_code}.parquet"

    # 已有数据则加载,仅拉取缺失区间
    if cache_file.exists():
        df_old = pd.read_parquet(cache_file)
        df_old['trade_date'] = pd.to_datetime(df_old['trade_date'])
        last_date = df_old['trade_date'].max()
        start_date = (last_date + pd.Timedelta(days=1)).strftime('%Y%m%d')
        if start_date > end_date:
            return df_old
        print(f"{stock_code}: 本地已有数据至 {last_date.date()},开始增量拉取...")
    else:
        df_old = pd.DataFrame()

    # 将日期范围转换为时间戳(iTick kType 模式下需通过 et 参数控制截止)
    start_ts = int(pd.Timestamp(start_date).timestamp())
    end_ts = int(pd.Timestamp(end_date).timestamp())

    all_data = []
    current_end_ts = end_ts
    batch_days = 100  # 每批最多拉取约 100 个交易日

    while True:
        # 计算当前批次的起始截止区间(基于天数回推)
        batch_start_ts = max(start_ts, current_end_ts - batch_days * 86400)

        params = {
            "region": region,
            "code": stock_code,
            "kType": k_type,
            "limit": 500,      # 每次最多返回 500 根 K 线
            "et": current_end_ts
        }

        try:
            url = f"{BASE_URL}/stock/kline"
            resp = requests.get(url, headers=build_headers(), params=params, timeout=15)
            if resp.status_code != 200:
                print(f"拉取失败: {stock_code}, 状态码 {resp.status_code}")
                time.sleep(2)
                continue

            data = resp.json()
            if data.get("code") == 0 and data.get("data"):
                batch_data = data["data"]
                all_data.extend(batch_data)
                print(f"{stock_code}: 拉取到 {len(batch_data)} 条数据")

                # 判断是否还有更早的数据
                earliest_ts = batch_data[-1].get("t", 0) if batch_data else 0
                if earliest_ts <= start_ts or len(batch_data) < 500:
                    break
                current_end_ts = earliest_ts - 86400  # 继续拉取更早数据
            else:
                print(f"拉取失败: {stock_code}, 错误信息: {data.get('msg')}")
                break

            time.sleep(0.5)  # 限流控制

        except Exception as e:
            print(f"拉取异常: {stock_code}, 错误: {e}")
            time.sleep(5)
            continue

    if not all_data:
        return df_old

    # 数据转换与合并
    df_new = pd.DataFrame(all_data)
    # 将时间戳转换为日期
    df_new['trade_date'] = pd.to_datetime(df_new['t'], unit='s')
    # 重命名字段为统一格式
    df_new = df_new.rename(columns={
        'o': 'open', 'h': 'high', 'l': 'low',
        'c': 'close', 'v': 'volume'
    })
    df_new = df_new[['trade_date', 'open', 'high', 'low', 'close', 'volume']]

    df_combined = pd.concat([df_old, df_new], ignore_index=True) if not df_old.empty else df_new
    df_combined = df_combined.drop_duplicates(subset=['trade_date']).sort_values('trade_date')

    df_combined.to_parquet(cache_file, index=False)
    print(f"{stock_code}: 数据保存至 {cache_file}, 共计 {len(df_combined)} 条")

    return df_combined

这个函数做了几件关键的事:

  • 检查本地缓存(Parquet 格式),只拉取缺失区间
  • 通过 limit 和分批区间控制拉取量,支持大量历史数据的自动分页
  • 异常重试与限流睡眠
  • 时间戳自动转换为标准化日期字段

2.3 多股票并发拉取

单线程循环拉取效率较低,可使用线程池实现并发,但仍需控制并发数以避免 API 限流:

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_batch(stock_list, region="HK", max_workers=3):
    """
    批量拉取多只股票的历史数据

    max_workers: 并发数建议 ≤ 5,防止被限流
    """
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(fetch_stock_history, code, region): code
            for code in stock_list
        }
        for future in as_completed(futures):
            code = futures[future]
            try:
                results[code] = future.result()
                print(f"{code}: 拉取完成")
            except Exception as e:
                print(f"{code}: 拉取失败, 错误: {e}")
    return results

并发数建议不超过 5,否则容易被数据源封禁。

三、回测数据清洗 Checklist

拉下来的原始数据,离直接用于回测还差好几步。这是我总结的清洗流程,每一步都不能省。

3.1 时间轴处理

# 确保交易日连续,无跳空
def align_trading_days(df, trading_calendar=None):
    df['trade_date'] = pd.to_datetime(df['trade_date'])
    df = df.sort_values('trade_date').set_index('trade_date')
    if trading_calendar is None:
        # 生成完整日历(工作日频率)
        full_calendar = pd.date_range(start=df.index.min(), end=df.index.max(), freq='B')
    else:
        full_calendar = trading_calendar
    df = df.reindex(full_calendar)
    return df

用工作日频率(freq='B')生成完整日历,缺失日期会自动填入 NaN,后续再填充或标记。

3.2 除权除息与复权统一

这是最大的坑!

很多新手直接用不复权数据做回测,结果会发现某天价格突然跳空低开 30%(实际上是除权),策略却以为是大跌而错误开平仓。

最佳实践:全程使用 前复权(qfq) 数据,保持历史价格连续可比。但要注意,前复权会导致早期价格出现负数(极端分红),需要做截断处理:

# 剔除前复权后的负价格或极小价格
df = df[(df['close'] > 0.01) & (df['high'] > 0.01)]

3.3 涨跌停板标记

回测时,如果策略根据信号在涨停价买入,实际根本无法成交。需要提前标记:

# 计算涨跌停价(A股主板±10%,科创/创业±20%,港股无涨跌停板限制)
def calc_limit_prices(df, stock_code):
    # 根据股票代码判断市场
    if stock_code.startswith('688') or stock_code.startswith('300'):
        limit_pct = 0.20   # 科创板/创业板
    elif stock_code.startswith('600') or stock_code.startswith('000'):
        limit_pct = 0.10   # A股主板
    else:
        # 港股无涨跌停板限制,直接返回
        df['is_limit_up'] = False
        df['is_limit_down'] = False
        return df

    df['prev_close'] = df['close'].shift(1)
    df['upper_limit'] = df['prev_close'] * (1 + limit_pct)
    df['lower_limit'] = df['prev_close'] * (1 - limit_pct)

    # 标记一字板
    df['is_limit_up'] = (df['open'] >= df['upper_limit'] - 0.001) & (df['close'] >= df['upper_limit'] - 0.001)
    df['is_limit_down'] = (df['open'] <= df['lower_limit'] + 0.001) & (df['close'] <= df['lower_limit'] + 0.001)
    return df

回测执行时,遇到 is_limit_up 且为买入信号,应跳过或转换策略。

3.4 停牌数据处理

停牌期间,没有成交,不应填充为前一日价格(会导致回测出现不合理收益)。正确做法:

# 停牌日成交量应该为0或NaN,不做前向填充
df['volume'] = df['volume'].fillna(0)
# 对于价格字段,停牌日保持NaN,后续回测引擎遇到NaN应直接跳过该日

3.5 数据对齐(多股票回测)

多股票回测时,需将所有股票对齐到同一个交易日历:

def align_multi_stocks(stock_dfs, trading_days):
    """
    stock_dfs: dict {code: DataFrame}
    trading_days: 交易日列表(pd.DatetimeIndex)
    """
    aligned = {}
    for code, df in stock_dfs.items():
        df_aligned = df.set_index('trade_date').reindex(trading_days)
        aligned[code] = df_aligned
    return aligned

四、数据质量校验

清洗完毕后,一定要跑一遍自动化校验:

def validate_data(df, stock_code):
    checks = {
        "是否有重复日期": df.index.duplicated().sum() == 0,
        "是否有空价格": df[['open','high','low','close']].isna().any().any() == False,
        "最低价是否高于最高价": (df['low'] <= df['high']).all(),
        "成交量是否非负": (df['volume'] >= 0).all(),
        "价格序列是否单调异常": (
            (df['close'] - df['close'].shift(1)).abs() / df['close'].shift(1) < 0.2
        ).all(),  # 除去涨跌停
    }
    for name, result in checks.items():
        print(f"{stock_code} - {name}: {'通过' if result else '失败'}")
    return all(checks.values())

五、存储与版本管理建议

  • 格式:强烈推荐 ParquetFeather,比 CSV 快 10 倍以上,且占用空间小。
  • 目录结构
    data/
      raw/          # 原始API拉取数据(按股票保存)
      cleaned/      # 清洗后数据(已复权、对齐、填充)
      meta/         # 股票列表、交易日历、除权因子备份
    
  • 版本控制:历史数据不要放 Git,用 DVC(Data Version Control)或直接云存储(S3、OSS)。

六、个人建议

  1. 永远保留原始拉取数据,清洗脚本可重复执行。否则哪天发现清洗逻辑错了,你还得全部重拉。
  2. 不要完美主义。回测数据做不到 100%精确,但必须保证无偏性(误差在买卖双方随机出现)。
  3. 先验小样本。对某只股票拉 3 年数据,手动核对除权除息日、涨跌停日,确信流程正确后再批量跑。
  4. 备胎数据源。核心股票池至少准备两个数据源交叉验证。

最后,记住一句话:回测是用来排除坏策略的,不是用来证明好策略的。 而这一切的起点,就是靠谱的历史行情数据。希望这篇文章能帮你少走弯路。

参考文档:https://docs.itick.org/websocket/stocks
GitHub:https://github.com/itick-org/

评论