一、开篇
批量拉取历史数据是量化回测的第一道工序。这道工序的完成质量,直接决定了后续所有策略验证的可信度——一个因限频、超时或数据缺失而产生偏差的历史数据集,会让回测结果与实盘表现产生系统性背离。
本文拆解历史数据批量拉取的完整工程方案:从单次请求的限频自适应处理,到分片并发拉取与断点续传,再到本地存储与完整性校验。你可以直接将本文代码用于美股、港股、A股及数字货币的分钟级历史数据获取。
二、痛点拆解:为什么简单循环会死得很难看
在动手写代码之前,先用一张表把核心问题梳理清楚。
| 痛点 | 具体表现 | 简单循环的后果 | 本文解决方案 |
|---|---|---|---|
| 限频 | 免费层请求频率受限,超限返回3001 | 脚本被临时封禁,后续请求全部失败 | 识别错误码3001,读取Retry-After头,自适应等待 |
| 单次拉取上限 | 一次请求最多返回1000条K线,10年1分钟数据约100万条 | 必须分页,且需处理多页拼接 | 分页循环拉取,基于时间戳增量翻页 |
| 网络超时 | 跨国请求RTT较高,大响应体易超时 | 请求卡死,整个脚本挂起 | 设置合理的connect/read timeout,失败自动重试 |
| 断点续传 | 脚本中途崩溃,已拉数据未保存 | 从头再来,浪费配额和时间 | 每拉完一个分片立即落盘,重启时跳过已完成分片 |
| 内存管理 | 10年分钟数据约100万行,全部加载到内存再存盘 | 内存溢出,脚本被系统终止 | 流式写入,边拉边存,不在内存中累积全量数据 |
| 数据完整性 | 退市股票、停牌期间数据表现各异 | 直接用Pandas对齐会出错,回测产生偏差 | 使用数据源的退市标识和历史成分股接口校验 |
一句话总结:批量拉取历史数据不是简单的API循环调用,而是一个需要处理限频、重试、分页、断点续传和流式存储的数据迁移工程。
三、系统架构总览
在写代码之前,先把整体设计画清楚。
┌─────────────────────────────────────────────────────────────────┐
│ 控制层(主程序) │
│ - 读取待拉取symbol列表 │
│ - 根据本地进度文件跳过已完成symbol │
│ - 为每个symbol生成时间切片任务 │
└─────────────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 任务队列 & 并发控制层 │
│ - asyncio 任务队列,控制并发数(默认3-5) │
│ - 每个任务:拉取一个(symbol, 时间切片)的数据块,内含分页循环 │
│ - 带指数退避的重试机制(限频/超时/5xx) │
└─────────────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ API 调用层 │
│ - TickDB /v1/market/kline 接口 │
│ - 自动处理3001限频,读取Retry-After │
│ - 超时设置:(3.05, 30) 连接超时3秒,读取超时30秒 │
│ - 分页:每次最多1000条,用最后一条时间戳+1继续拉取 │
└─────────────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ - 每个symbol独立CSV文件 │
│ - 流式追加写入,不在内存累积 │
│ - 记录进度文件(JSON),支持断点续传 │
└─────────────────────────────────────────────────────────────────┘
运行时的典型日志输出:
[INFO] 加载进度文件,已记录 2 个symbol
[DEBUG] AAPL.US 切片 1609459200000 已完成,跳过
[WARNING] 触发限频,等待 5 秒 (HTTP 3001)
[INFO] TSLA.US [1612137600000-1614729600000] 拉取成功,流式落盘 11700 条
[INFO] TSLA.US 切片 1612137600000 已标记完成
四、生产级代码实现
4.1 核心函数:带分页与限频处理的历史K线拉取
import os
import json
import time
import asyncio
import aiohttp
import aiofiles
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ⚠️ 工程预警:API Key 必须从环境变量读取,严禁硬编码
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
BASE_URL = "//api.tickdb.ai"
HEADERS = {"X-API-Key": API_KEY}
MAX_RETRIES = 5
BASE_DELAY = 1
MAX_DELAY = 60
CONCURRENT_LIMIT = 3 # 并发数,免费层建议保守设置
async def fetch_kline_slice(
session: aiohttp.ClientSession,
symbol: str,
start_time: int,
end_time: int,
interval: str = "1m",
limit: int = 1000 # TickDB 单次最大 1000 条
) -> Optional[List[Dict[str, Any]]]:
"""
拉取一个时间切片的历史K线,带分页循环、指数退避重试和限频自适应。
⚠️ 工程预警:
- 免费层限频严格,建议并发数不超过3
- 该函数已包含指数退避重连、限频自适应和分页逻辑,可直接用于生产
"""
all_klines = []
current_start = start_time
url = f"{BASE_URL}/v1/market/kline"
while current_start < end_time:
params = {
"symbol": symbol,
"interval": interval,
"start_time": current_start,
"end_time": end_time,
"limit": limit
}
retry_count = 0
page_success = False
while retry_count <= MAX_RETRIES:
try:
async with session.get(
url,
headers=HEADERS,
params=params,
timeout=aiohttp.ClientTimeout(connect=3.05, sock_read=30)
) as resp:
data = await resp.json()
code = data.get("code", 0)
if code == 0:
klines = data.get("data", {}).get("klines", [])
if not klines:
return all_klines if all_klines else None
all_klines.extend(klines)
logger.debug(f"{symbol} 分页拉取 {len(klines)} 条,累计 {len(all_klines)} 条")
if len(klines) < limit:
# 已拉完该时间切片
logger.info(f"{symbol} [{start_time}-{end_time}] 拉取完成,共 {len(all_klines)} 条")
return all_klines
else:
# 可能还有更多数据,用最后一条的时间戳+1作为下一页起点
current_start = klines[-1]['time'] + 1
page_success = True
retry_count = 0 # 重置重试计数
break
elif code == 3001: # 限频
retry_after = int(resp.headers.get("Retry-After", 5))
logger.warning(f"触发限频,等待 {retry_after} 秒")
await asyncio.sleep(retry_after)
retry_count += 1
continue
elif code in (1001, 1002):
raise ValueError("API Key 无效,请检查环境变量")
elif code == 2002:
logger.error(f"{symbol} 不存在,跳过")
return None
else:
logger.error(f"未知错误 {code}: {data.get('message')}")
retry_count += 1
except asyncio.TimeoutError:
logger.warning(f"{symbol} 请求超时,重试 {retry_count+1}/{MAX_RETRIES}")
retry_count += 1
except Exception as e:
logger.error(f"{symbol} 请求异常: {e}")
retry_count += 1
# 指数退避 + 抖动
if retry_count <= MAX_RETRIES and not page_success:
delay = min(BASE_DELAY * (2 ** (retry_count - 1)), MAX_DELAY)
jitter = delay * 0.1 * (hash(symbol) % 100) / 100
await asyncio.sleep(delay + jitter)
if not page_success:
logger.error(f"{symbol} 分页拉取失败,已获取 {len(all_klines)} 条")
return all_klines if all_klines else None
return all_klines
4.2 时间切片生成器
def generate_time_slices(
start_date: datetime,
end_date: datetime,
slice_days: int = 30
) -> List[tuple]:
"""
将长时间范围切分为多个小片,便于断点续传。
切分粒度可调整。每个切片内部会通过分页循环完整拉取。
"""
slices = []
current = start_date
while current < end_date:
slice_end = min(current + timedelta(days=slice_days), end_date)
slices.append((
int(current.timestamp() * 1000),
int(slice_end.timestamp() * 1000)
))
current = slice_end
return slices
4.3 进度管理与断点续传
import json
from pathlib import Path
class ProgressManager:
"""管理拉取进度,支持断点续传"""
def __init__(self, progress_file: str):
self.progress_file = Path(progress_file)
self.progress: Dict[str, List[int]] = {}
self._load()
def _load(self):
if self.progress_file.exists():
with open(self.progress_file, 'r') as f:
self.progress = json.load(f)
logger.info(f"加载进度文件,已记录 {len(self.progress)} 个symbol")
def save(self):
with open(self.progress_file, 'w') as f:
json.dump(self.progress, f)
def is_slice_done(self, symbol: str, start_ts: int) -> bool:
return symbol in self.progress and start_ts in self.progress[symbol]
def mark_slice_done(self, symbol: str, start_ts: int):
if symbol not in self.progress:
self.progress[symbol] = []
if start_ts not in self.progress[symbol]:
self.progress[symbol].append(start_ts)
self.save()
4.4 流式数据写入
import aiofiles
import csv
async def append_klines_to_csv(symbol: str, klines: List[Dict], output_dir: str):
"""流式追加写入CSV,不在内存累积"""
output_path = Path(output_dir) / f"{symbol}.csv"
file_exists = output_path.exists()
async with aiofiles.open(output_path, 'a', newline='') as f:
writer = csv.writer(f)
if not file_exists:
await writer.writerow(['timestamp', 'open', 'high', 'low', 'close', 'volume'])
for k in klines:
await writer.writerow([
k['time'],
k['open'],
k['high'],
k['low'],
k['close'],
k['volume']
])
4.5 主控流程
async def download_symbol(
session: aiohttp.ClientSession,
symbol: str,
start_date: datetime,
end_date: datetime,
progress: ProgressManager,
output_dir: str,
semaphore: asyncio.Semaphore
):
"""下载单个symbol的全量历史数据"""
async with semaphore:
slices = generate_time_slices(start_date, end_date, slice_days=30)
for start_ts, end_ts in slices:
if progress.is_slice_done(symbol, start_ts):
logger.debug(f"{symbol} 切片 {start_ts} 已完成,跳过")
continue
klines = await fetch_kline_slice(session, symbol, start_ts, end_ts)
if klines:
await append_klines_to_csv(symbol, klines, output_dir)
progress.mark_slice_done(symbol, start_ts)
else:
logger.warning(f"{symbol} 切片 {start_ts} 拉取失败,将在下次运行时重试")
# 礼貌性等待,避免连续请求触发限频
await asyncio.sleep(0.2)
async def main(symbols: List[str], start_date: datetime, end_date: datetime, output_dir: str):
"""主入口"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
progress = ProgressManager(f"{output_dir}/progress.json")
semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
async with aiohttp.ClientSession() as session:
tasks = [
download_symbol(session, sym, start_date, end_date, progress, output_dir, semaphore)
for sym in symbols
]
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
# 示例:拉取苹果和特斯拉过去5年的1分钟线
symbols = ["AAPL.US", "TSLA.US"]
end = datetime.now()
start = end - timedelta(days=365 * 5)
asyncio.run(main(symbols, start, end, "./data"))
五、进阶话题:确保历史数据的完整性
代码能跑通只是第一步。要让回测结果可靠,还需要校验数据完整性。
5.1 时区与夏令时陷阱:UTC 到美东时间的正确转换
TickDB 返回的时间戳是 UTC 毫秒,这是正确的工程实践。但在美股回测中,直接使用 UTC 时间会产生两个问题:
- 美股交易时段是美东时间 09:30-16:00,每年 3 月和 11 月夏令时切换,与 UTC 的偏移量不同
- 如果用 UTC 时间直接过滤“开盘前 30 分钟”,在夏令时切换前后会错位
正确做法:
import pandas as pd
import pytz
# 读取数据后转换时区
df['timestamp_utc'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
df['timestamp_et'] = df['timestamp_utc'].dt.tz_convert('America/New_York')
# 过滤正常交易时段(自动处理夏令时)
mask = (df['timestamp_et'].dt.time >= pd.Timestamp('09:30').time()) & \
(df['timestamp_et'].dt.time <= pd.Timestamp('16:00').time())
df_trading = df[mask]
这个转换是跨市场回测的基础——TickDB 统一返回 UTC 的优势在于,你只需要维护一套时区转换逻辑,而不是每个市场分别适配。
5.2 交易日历对齐
TickDB 返回的K线在停牌期间不返回空K线。这意味着不同股票的时间索引维度不一致。回测前必须做两件事:
- 生成标准交易日历的完整分钟时间轴
- 用
pandas.merge_asof或reindex对齐,停牌期间用前向填充
5.3 退市股票处理
如果只拉取了当前活跃股票的历史数据,回测收益会被幸存者偏差高估。TickDB 支持获取历史成分股列表和已退市股票数据。在构建回测标的池时,务必包含退市股票,并在退市日之后将其剔除。
5.4 数据完整性校验脚本
import pandas as pd
def validate_completeness(df: pd.DataFrame, symbol: str, start_date: datetime, end_date: datetime):
"""简单校验:检查起止时间和记录数是否在合理范围"""
expected_days = (end_date - start_date).days
actual_days = df['timestamp'].dt.date.nunique()
coverage = actual_days / expected_days
print(f"{symbol}: 预期 {expected_days} 天,实际 {actual_days} 天,覆盖率 {coverage:.1%}")
if coverage < 0.95:
print(f"⚠️ {symbol} 数据覆盖率偏低,请检查停牌或拉取失败的时间段")
六、常见问题与解决
| 问题 | 原因 | 解决 |
|---|---|---|
| 拉取速度太慢 | 免费层限频严格,单线程串行 | 适当增加并发(不超过5),或升级套餐 |
| 部分切片反复失败 | 网络抖动或服务端临时过载 | 指数退避重试已内置,观察日志定位 |
| 内存占用过高 | 全量数据在内存中合并 | 本文采用流式写入,内存稳定在百MB级 |
| CSV文件巨大 | 10年1分钟线约100万行 | 考虑按年分区存储,或用Parquet格式压缩 |
| 分页时出现重复数据 | 时间边界重叠 | 代码已使用last_time + 1 避免边界重复 |
七、最终交付
批量拉取历史数据是量化工程的“第一公里”。这公里走不稳,后面回测跑出来的收益都是空中楼阁。
本文给出的代码可以直接用于生产——它处理了限频、超时、分页、断点续传、并发控制、时区转换,并在本地落盘时采用了流式写入。你只需要替换 symbols 列表和日期范围,就能拉取美股、港股、A股、加密货币的历史分钟数据。
延伸方案
如果你是个人开发者:可以到官网注册申请 API KEY。免费层足够拉取中等规模的标的。
如果你是量化团队:需要更高并发或企业级SLA,可到官网联系官方获取团队方案。
如果你习惯用AI辅助开发:到 Clawhub 搜索“tickdb-market-data SKILL”,用自然语言查询历史行情。
本文不构成任何投资建议。市场有风险,投资需谨慎。

