做量化交易的人都知道,回测系统的核心不是策略有多花哨,而是数据有多可靠。
如果历史行情数据本身就有问题,那么再完美的回测结果也只是“垃圾进,垃圾出”。

本文从实战出发,聊聊如何通过 API 批量拉取历史行情数据,并做一套严谨的回测数据清洗流程。这些坑,我都踩过。
一、为什么历史行情数据这么难搞?
很多人以为历史行情就是“股票代码+日期+开高低收+成交量”。真上手才发现,问题一大堆:
- 不同数据源格式不同,有的前复权、有的后复权、有的不复权
- 停牌日、除权除息日、涨跌停板数据容易被忽略
- API 限流、断点续传、数据缺失需要处理
- 国内 A 股、美股、期货的数据格式和规则差异巨大
一个合格的量化回测系统,必须能从源头保证数据的完整性、一致性、无偏性。
二、批量拉取的工程设计
2.1 基础思路
不要一次性拉全部历史数据,更不要写死日期。合理的设计应该是:
配置股票池 → 判断本地已有数据 → 只拉缺失区间 → 合并去重 → 校验一致性
2.2 代码示例:带断点续传的批量拉取
下面使用 iTick API 获取历史日线数据(前复权),并实现本地缓存与断点续传。
import requests
import pandas as pd
import time
from pathlib import Path
API_TOKEN = "your_token_here" # 替换为实际 Token
BASE_URL = "//api.itick.org"
def build_headers():
"""构造请求头,包含 API Token 验证"""
return {
"token": API_TOKEN,
"Content-Type": "application/json"
}
def fetch_stock_history(stock_code, region="HK", k_type=8, start_date="20000101",
end_date="20231231", cache_dir="./data/raw"):
"""
带缓存的批量拉取,自动断点续传
参数说明:
stock_code : 股票代码(港股示例:00700)
region : 市场代码(HK/US/SZ/SH 等)
k_type : K线类型(8:日线,9:周线,10:月线)
start_date : 开始日期(格式 YYYYMMDD)
end_date : 结束日期(格式 YYYYMMDD)
cache_dir : 本地缓存目录
"""
Path(cache_dir).mkdir(parents=True, exist_ok=True)
cache_file = Path(cache_dir) / f"{stock_code}.parquet"
# 已有数据则加载,仅拉取缺失区间
if cache_file.exists():
df_old = pd.read_parquet(cache_file)
df_old['trade_date'] = pd.to_datetime(df_old['trade_date'])
last_date = df_old['trade_date'].max()
start_date = (last_date + pd.Timedelta(days=1)).strftime('%Y%m%d')
if start_date > end_date:
return df_old
print(f"{stock_code}: 本地已有数据至 {last_date.date()},开始增量拉取...")
else:
df_old = pd.DataFrame()
# 将日期范围转换为时间戳(iTick kType 模式下需通过 et 参数控制截止)
start_ts = int(pd.Timestamp(start_date).timestamp())
end_ts = int(pd.Timestamp(end_date).timestamp())
all_data = []
current_end_ts = end_ts
batch_days = 100 # 每批最多拉取约 100 个交易日
while True:
# 计算当前批次的起始截止区间(基于天数回推)
batch_start_ts = max(start_ts, current_end_ts - batch_days * 86400)
params = {
"region": region,
"code": stock_code,
"kType": k_type,
"limit": 500, # 每次最多返回 500 根 K 线
"et": current_end_ts
}
try:
url = f"{BASE_URL}/stock/kline"
resp = requests.get(url, headers=build_headers(), params=params, timeout=15)
if resp.status_code != 200:
print(f"拉取失败: {stock_code}, 状态码 {resp.status_code}")
time.sleep(2)
continue
data = resp.json()
if data.get("code") == 0 and data.get("data"):
batch_data = data["data"]
all_data.extend(batch_data)
print(f"{stock_code}: 拉取到 {len(batch_data)} 条数据")
# 判断是否还有更早的数据
earliest_ts = batch_data[-1].get("t", 0) if batch_data else 0
if earliest_ts <= start_ts or len(batch_data) < 500:
break
current_end_ts = earliest_ts - 86400 # 继续拉取更早数据
else:
print(f"拉取失败: {stock_code}, 错误信息: {data.get('msg')}")
break
time.sleep(0.5) # 限流控制
except Exception as e:
print(f"拉取异常: {stock_code}, 错误: {e}")
time.sleep(5)
continue
if not all_data:
return df_old
# 数据转换与合并
df_new = pd.DataFrame(all_data)
# 将时间戳转换为日期
df_new['trade_date'] = pd.to_datetime(df_new['t'], unit='s')
# 重命名字段为统一格式
df_new = df_new.rename(columns={
'o': 'open', 'h': 'high', 'l': 'low',
'c': 'close', 'v': 'volume'
})
df_new = df_new[['trade_date', 'open', 'high', 'low', 'close', 'volume']]
df_combined = pd.concat([df_old, df_new], ignore_index=True) if not df_old.empty else df_new
df_combined = df_combined.drop_duplicates(subset=['trade_date']).sort_values('trade_date')
df_combined.to_parquet(cache_file, index=False)
print(f"{stock_code}: 数据保存至 {cache_file}, 共计 {len(df_combined)} 条")
return df_combined
这个函数做了几件关键的事:
- 检查本地缓存(Parquet 格式),只拉取缺失区间
- 通过
limit和分批区间控制拉取量,支持大量历史数据的自动分页 - 异常重试与限流睡眠
- 时间戳自动转换为标准化日期字段
2.3 多股票并发拉取
单线程循环拉取效率较低,可使用线程池实现并发,但仍需控制并发数以避免 API 限流:
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_batch(stock_list, region="HK", max_workers=3):
"""
批量拉取多只股票的历史数据
max_workers: 并发数建议 ≤ 5,防止被限流
"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(fetch_stock_history, code, region): code
for code in stock_list
}
for future in as_completed(futures):
code = futures[future]
try:
results[code] = future.result()
print(f"{code}: 拉取完成")
except Exception as e:
print(f"{code}: 拉取失败, 错误: {e}")
return results
并发数建议不超过 5,否则容易被数据源封禁。
三、回测数据清洗 Checklist
拉下来的原始数据,离直接用于回测还差好几步。这是我总结的清洗流程,每一步都不能省。
3.1 时间轴处理
# 确保交易日连续,无跳空
def align_trading_days(df, trading_calendar=None):
df['trade_date'] = pd.to_datetime(df['trade_date'])
df = df.sort_values('trade_date').set_index('trade_date')
if trading_calendar is None:
# 生成完整日历(工作日频率)
full_calendar = pd.date_range(start=df.index.min(), end=df.index.max(), freq='B')
else:
full_calendar = trading_calendar
df = df.reindex(full_calendar)
return df
用工作日频率(freq='B')生成完整日历,缺失日期会自动填入 NaN,后续再填充或标记。
3.2 除权除息与复权统一
这是最大的坑!
很多新手直接用不复权数据做回测,结果会发现某天价格突然跳空低开 30%(实际上是除权),策略却以为是大跌而错误开平仓。
最佳实践:全程使用 前复权(qfq) 数据,保持历史价格连续可比。但要注意,前复权会导致早期价格出现负数(极端分红),需要做截断处理:
# 剔除前复权后的负价格或极小价格
df = df[(df['close'] > 0.01) & (df['high'] > 0.01)]
3.3 涨跌停板标记
回测时,如果策略根据信号在涨停价买入,实际根本无法成交。需要提前标记:
# 计算涨跌停价(A股主板±10%,科创/创业±20%,港股无涨跌停板限制)
def calc_limit_prices(df, stock_code):
# 根据股票代码判断市场
if stock_code.startswith('688') or stock_code.startswith('300'):
limit_pct = 0.20 # 科创板/创业板
elif stock_code.startswith('600') or stock_code.startswith('000'):
limit_pct = 0.10 # A股主板
else:
# 港股无涨跌停板限制,直接返回
df['is_limit_up'] = False
df['is_limit_down'] = False
return df
df['prev_close'] = df['close'].shift(1)
df['upper_limit'] = df['prev_close'] * (1 + limit_pct)
df['lower_limit'] = df['prev_close'] * (1 - limit_pct)
# 标记一字板
df['is_limit_up'] = (df['open'] >= df['upper_limit'] - 0.001) & (df['close'] >= df['upper_limit'] - 0.001)
df['is_limit_down'] = (df['open'] <= df['lower_limit'] + 0.001) & (df['close'] <= df['lower_limit'] + 0.001)
return df
回测执行时,遇到 is_limit_up 且为买入信号,应跳过或转换策略。
3.4 停牌数据处理
停牌期间,没有成交,不应填充为前一日价格(会导致回测出现不合理收益)。正确做法:
# 停牌日成交量应该为0或NaN,不做前向填充
df['volume'] = df['volume'].fillna(0)
# 对于价格字段,停牌日保持NaN,后续回测引擎遇到NaN应直接跳过该日
3.5 数据对齐(多股票回测)
多股票回测时,需将所有股票对齐到同一个交易日历:
def align_multi_stocks(stock_dfs, trading_days):
"""
stock_dfs: dict {code: DataFrame}
trading_days: 交易日列表(pd.DatetimeIndex)
"""
aligned = {}
for code, df in stock_dfs.items():
df_aligned = df.set_index('trade_date').reindex(trading_days)
aligned[code] = df_aligned
return aligned
四、数据质量校验
清洗完毕后,一定要跑一遍自动化校验:
def validate_data(df, stock_code):
checks = {
"是否有重复日期": df.index.duplicated().sum() == 0,
"是否有空价格": df[['open','high','low','close']].isna().any().any() == False,
"最低价是否高于最高价": (df['low'] <= df['high']).all(),
"成交量是否非负": (df['volume'] >= 0).all(),
"价格序列是否单调异常": (
(df['close'] - df['close'].shift(1)).abs() / df['close'].shift(1) < 0.2
).all(), # 除去涨跌停
}
for name, result in checks.items():
print(f"{stock_code} - {name}: {'通过' if result else '失败'}")
return all(checks.values())
五、存储与版本管理建议
- 格式:强烈推荐 Parquet 或 Feather,比 CSV 快 10 倍以上,且占用空间小。
- 目录结构:
data/ raw/ # 原始API拉取数据(按股票保存) cleaned/ # 清洗后数据(已复权、对齐、填充) meta/ # 股票列表、交易日历、除权因子备份 - 版本控制:历史数据不要放 Git,用 DVC(Data Version Control)或直接云存储(S3、OSS)。
六、个人建议
- 永远保留原始拉取数据,清洗脚本可重复执行。否则哪天发现清洗逻辑错了,你还得全部重拉。
- 不要完美主义。回测数据做不到 100%精确,但必须保证无偏性(误差在买卖双方随机出现)。
- 先验小样本。对某只股票拉 3 年数据,手动核对除权除息日、涨跌停日,确信流程正确后再批量跑。
- 备胎数据源。核心股票池至少准备两个数据源交叉验证。
最后,记住一句话:回测是用来排除坏策略的,不是用来证明好策略的。 而这一切的起点,就是靠谱的历史行情数据。希望这篇文章能帮你少走弯路。
参考文档:https://docs.itick.org/websocket/stocks
GitHub:https://github.com/itick-org/

