历史数据批量拉取:如何高效获取10年分钟级美股数据

用户头像sh_*599ojc
2026-04-12 发布

一、开篇

批量拉取历史数据是量化回测的第一道工序。这道工序的完成质量,直接决定了后续所有策略验证的可信度——一个因限频、超时或数据缺失而产生偏差的历史数据集,会让回测结果与实盘表现产生系统性背离。

本文拆解历史数据批量拉取的完整工程方案:从单次请求的限频自适应处理,到分片并发拉取与断点续传,再到本地存储与完整性校验。你可以直接将本文代码用于美股、港股、A股及数字货币的分钟级历史数据获取。

二、痛点拆解:为什么简单循环会死得很难看

在动手写代码之前,先用一张表把核心问题梳理清楚。

痛点 具体表现 简单循环的后果 本文解决方案
限频 免费层请求频率受限,超限返回3001 脚本被临时封禁,后续请求全部失败 识别错误码3001,读取Retry-After头,自适应等待
单次拉取上限 一次请求最多返回1000条K线,10年1分钟数据约100万条 必须分页,且需处理多页拼接 分页循环拉取,基于时间戳增量翻页
网络超时 跨国请求RTT较高,大响应体易超时 请求卡死,整个脚本挂起 设置合理的connect/read timeout,失败自动重试
断点续传 脚本中途崩溃,已拉数据未保存 从头再来,浪费配额和时间 每拉完一个分片立即落盘,重启时跳过已完成分片
内存管理 10年分钟数据约100万行,全部加载到内存再存盘 内存溢出,脚本被系统终止 流式写入,边拉边存,不在内存中累积全量数据
数据完整性 退市股票、停牌期间数据表现各异 直接用Pandas对齐会出错,回测产生偏差 使用数据源的退市标识和历史成分股接口校验

一句话总结:批量拉取历史数据不是简单的API循环调用,而是一个需要处理限频、重试、分页、断点续传和流式存储的数据迁移工程。

三、系统架构总览

在写代码之前,先把整体设计画清楚。

┌─────────────────────────────────────────────────────────────────┐
│                        控制层(主程序)                           │
│  - 读取待拉取symbol列表                                           │
│  - 根据本地进度文件跳过已完成symbol                                 │
│  - 为每个symbol生成时间切片任务                                    │
└─────────────────────────────┬───────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    任务队列 & 并发控制层                           │
│  - asyncio 任务队列,控制并发数(默认3-5)                          │
│  - 每个任务:拉取一个(symbol, 时间切片)的数据块,内含分页循环         │
│  - 带指数退避的重试机制(限频/超时/5xx)                            │
└─────────────────────────────┬───────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                        API 调用层                                 │
│  - TickDB /v1/market/kline 接口                                  │
│  - 自动处理3001限频,读取Retry-After                              │
│  - 超时设置:(3.05, 30) 连接超时3秒,读取超时30秒                   │
│  - 分页:每次最多1000条,用最后一条时间戳+1继续拉取                   │
└─────────────────────────────┬───────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                        数据存储层                                 │
│  - 每个symbol独立CSV文件                                          │
│  - 流式追加写入,不在内存累积                                       │
│  - 记录进度文件(JSON),支持断点续传                               │
└─────────────────────────────────────────────────────────────────┘

运行时的典型日志输出:

[INFO] 加载进度文件,已记录 2 个symbol
[DEBUG] AAPL.US 切片 1609459200000 已完成,跳过
[WARNING] 触发限频,等待 5 秒 (HTTP 3001)
[INFO] TSLA.US [1612137600000-1614729600000] 拉取成功,流式落盘 11700 条
[INFO] TSLA.US 切片 1612137600000 已标记完成

四、生产级代码实现

4.1 核心函数:带分页与限频处理的历史K线拉取

import os
import json
import time
import asyncio
import aiohttp
import aiofiles
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ⚠️ 工程预警:API Key 必须从环境变量读取,严禁硬编码
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

BASE_URL = "//api.tickdb.ai"
HEADERS = {"X-API-Key": API_KEY}
MAX_RETRIES = 5
BASE_DELAY = 1
MAX_DELAY = 60
CONCURRENT_LIMIT = 3  # 并发数,免费层建议保守设置


async def fetch_kline_slice(
    session: aiohttp.ClientSession,
    symbol: str,
    start_time: int,
    end_time: int,
    interval: str = "1m",
    limit: int = 1000  # TickDB 单次最大 1000 条
) -> Optional[List[Dict[str, Any]]]:
    """
    拉取一个时间切片的历史K线,带分页循环、指数退避重试和限频自适应。
  
    ⚠️ 工程预警:
        - 免费层限频严格,建议并发数不超过3
        - 该函数已包含指数退避重连、限频自适应和分页逻辑,可直接用于生产
    """
    all_klines = []
    current_start = start_time
    url = f"{BASE_URL}/v1/market/kline"
  
    while current_start < end_time:
        params = {
            "symbol": symbol,
            "interval": interval,
            "start_time": current_start,
            "end_time": end_time,
            "limit": limit
        }
      
        retry_count = 0
        page_success = False
      
        while retry_count <= MAX_RETRIES:
            try:
                async with session.get(
                    url,
                    headers=HEADERS,
                    params=params,
                    timeout=aiohttp.ClientTimeout(connect=3.05, sock_read=30)
                ) as resp:
                    data = await resp.json()
                    code = data.get("code", 0)
                  
                    if code == 0:
                        klines = data.get("data", {}).get("klines", [])
                        if not klines:
                            return all_klines if all_klines else None
                      
                        all_klines.extend(klines)
                        logger.debug(f"{symbol} 分页拉取 {len(klines)} 条,累计 {len(all_klines)} 条")
                      
                        if len(klines) < limit:
                            # 已拉完该时间切片
                            logger.info(f"{symbol} [{start_time}-{end_time}] 拉取完成,共 {len(all_klines)} 条")
                            return all_klines
                        else:
                            # 可能还有更多数据,用最后一条的时间戳+1作为下一页起点
                            current_start = klines[-1]['time'] + 1
                            page_success = True
                            retry_count = 0  # 重置重试计数
                            break
                  
                    elif code == 3001:  # 限频
                        retry_after = int(resp.headers.get("Retry-After", 5))
                        logger.warning(f"触发限频,等待 {retry_after} 秒")
                        await asyncio.sleep(retry_after)
                        retry_count += 1
                        continue
                  
                    elif code in (1001, 1002):
                        raise ValueError("API Key 无效,请检查环境变量")
                  
                    elif code == 2002:
                        logger.error(f"{symbol} 不存在,跳过")
                        return None
                  
                    else:
                        logger.error(f"未知错误 {code}: {data.get('message')}")
                        retry_count += 1
                      
            except asyncio.TimeoutError:
                logger.warning(f"{symbol} 请求超时,重试 {retry_count+1}/{MAX_RETRIES}")
                retry_count += 1
            except Exception as e:
                logger.error(f"{symbol} 请求异常: {e}")
                retry_count += 1
          
            # 指数退避 + 抖动
            if retry_count <= MAX_RETRIES and not page_success:
                delay = min(BASE_DELAY * (2 ** (retry_count - 1)), MAX_DELAY)
                jitter = delay * 0.1 * (hash(symbol) % 100) / 100
                await asyncio.sleep(delay + jitter)
      
        if not page_success:
            logger.error(f"{symbol} 分页拉取失败,已获取 {len(all_klines)} 条")
            return all_klines if all_klines else None
  
    return all_klines

4.2 时间切片生成器

def generate_time_slices(
    start_date: datetime,
    end_date: datetime,
    slice_days: int = 30
) -> List[tuple]:
    """
    将长时间范围切分为多个小片,便于断点续传。
  
    切分粒度可调整。每个切片内部会通过分页循环完整拉取。
    """
    slices = []
    current = start_date
    while current < end_date:
        slice_end = min(current + timedelta(days=slice_days), end_date)
        slices.append((
            int(current.timestamp() * 1000),
            int(slice_end.timestamp() * 1000)
        ))
        current = slice_end
    return slices

4.3 进度管理与断点续传

import json
from pathlib import Path

class ProgressManager:
    """管理拉取进度,支持断点续传"""
  
    def __init__(self, progress_file: str):
        self.progress_file = Path(progress_file)
        self.progress: Dict[str, List[int]] = {}
        self._load()
  
    def _load(self):
        if self.progress_file.exists():
            with open(self.progress_file, 'r') as f:
                self.progress = json.load(f)
            logger.info(f"加载进度文件,已记录 {len(self.progress)} 个symbol")
  
    def save(self):
        with open(self.progress_file, 'w') as f:
            json.dump(self.progress, f)
  
    def is_slice_done(self, symbol: str, start_ts: int) -> bool:
        return symbol in self.progress and start_ts in self.progress[symbol]
  
    def mark_slice_done(self, symbol: str, start_ts: int):
        if symbol not in self.progress:
            self.progress[symbol] = []
        if start_ts not in self.progress[symbol]:
            self.progress[symbol].append(start_ts)
            self.save()

4.4 流式数据写入

import aiofiles
import csv

async def append_klines_to_csv(symbol: str, klines: List[Dict], output_dir: str):
    """流式追加写入CSV,不在内存累积"""
    output_path = Path(output_dir) / f"{symbol}.csv"
    file_exists = output_path.exists()
  
    async with aiofiles.open(output_path, 'a', newline='') as f:
        writer = csv.writer(f)
        if not file_exists:
            await writer.writerow(['timestamp', 'open', 'high', 'low', 'close', 'volume'])
      
        for k in klines:
            await writer.writerow([
                k['time'],
                k['open'],
                k['high'],
                k['low'],
                k['close'],
                k['volume']
            ])

4.5 主控流程

async def download_symbol(
    session: aiohttp.ClientSession,
    symbol: str,
    start_date: datetime,
    end_date: datetime,
    progress: ProgressManager,
    output_dir: str,
    semaphore: asyncio.Semaphore
):
    """下载单个symbol的全量历史数据"""
    async with semaphore:
        slices = generate_time_slices(start_date, end_date, slice_days=30)
        for start_ts, end_ts in slices:
            if progress.is_slice_done(symbol, start_ts):
                logger.debug(f"{symbol} 切片 {start_ts} 已完成,跳过")
                continue
          
            klines = await fetch_kline_slice(session, symbol, start_ts, end_ts)
            if klines:
                await append_klines_to_csv(symbol, klines, output_dir)
                progress.mark_slice_done(symbol, start_ts)
            else:
                logger.warning(f"{symbol} 切片 {start_ts} 拉取失败,将在下次运行时重试")
          
            # 礼貌性等待,避免连续请求触发限频
            await asyncio.sleep(0.2)


async def main(symbols: List[str], start_date: datetime, end_date: datetime, output_dir: str):
    """主入口"""
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    progress = ProgressManager(f"{output_dir}/progress.json")
    semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
  
    async with aiohttp.ClientSession() as session:
        tasks = [
            download_symbol(session, sym, start_date, end_date, progress, output_dir, semaphore)
            for sym in symbols
        ]
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    # 示例:拉取苹果和特斯拉过去5年的1分钟线
    symbols = ["AAPL.US", "TSLA.US"]
    end = datetime.now()
    start = end - timedelta(days=365 * 5)
    asyncio.run(main(symbols, start, end, "./data"))

五、进阶话题:确保历史数据的完整性

代码能跑通只是第一步。要让回测结果可靠,还需要校验数据完整性。

5.1 时区与夏令时陷阱:UTC 到美东时间的正确转换

TickDB 返回的时间戳是 UTC 毫秒,这是正确的工程实践。但在美股回测中,直接使用 UTC 时间会产生两个问题:

  1. 美股交易时段是美东时间 09:30-16:00,每年 3 月和 11 月夏令时切换,与 UTC 的偏移量不同
  2. 如果用 UTC 时间直接过滤“开盘前 30 分钟”,在夏令时切换前后会错位

正确做法

import pandas as pd
import pytz

# 读取数据后转换时区
df['timestamp_utc'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
df['timestamp_et'] = df['timestamp_utc'].dt.tz_convert('America/New_York')

# 过滤正常交易时段(自动处理夏令时)
mask = (df['timestamp_et'].dt.time >= pd.Timestamp('09:30').time()) & \
       (df['timestamp_et'].dt.time <= pd.Timestamp('16:00').time())
df_trading = df[mask]

这个转换是跨市场回测的基础——TickDB 统一返回 UTC 的优势在于,你只需要维护一套时区转换逻辑,而不是每个市场分别适配。

5.2 交易日历对齐

TickDB 返回的K线在停牌期间不返回空K线。这意味着不同股票的时间索引维度不一致。回测前必须做两件事:

  1. 生成标准交易日历的完整分钟时间轴
  2. pandas.merge_asofreindex 对齐,停牌期间用前向填充

5.3 退市股票处理

如果只拉取了当前活跃股票的历史数据,回测收益会被幸存者偏差高估。TickDB 支持获取历史成分股列表和已退市股票数据。在构建回测标的池时,务必包含退市股票,并在退市日之后将其剔除。

5.4 数据完整性校验脚本

import pandas as pd

def validate_completeness(df: pd.DataFrame, symbol: str, start_date: datetime, end_date: datetime):
    """简单校验:检查起止时间和记录数是否在合理范围"""
    expected_days = (end_date - start_date).days
    actual_days = df['timestamp'].dt.date.nunique()
    coverage = actual_days / expected_days
  
    print(f"{symbol}: 预期 {expected_days} 天,实际 {actual_days} 天,覆盖率 {coverage:.1%}")
    if coverage < 0.95:
        print(f"⚠️ {symbol} 数据覆盖率偏低,请检查停牌或拉取失败的时间段")

六、常见问题与解决

问题 原因 解决
拉取速度太慢 免费层限频严格,单线程串行 适当增加并发(不超过5),或升级套餐
部分切片反复失败 网络抖动或服务端临时过载 指数退避重试已内置,观察日志定位
内存占用过高 全量数据在内存中合并 本文采用流式写入,内存稳定在百MB级
CSV文件巨大 10年1分钟线约100万行 考虑按年分区存储,或用Parquet格式压缩
分页时出现重复数据 时间边界重叠 代码已使用last_time + 1 避免边界重复

七、最终交付

批量拉取历史数据是量化工程的“第一公里”。这公里走不稳,后面回测跑出来的收益都是空中楼阁。

本文给出的代码可以直接用于生产——它处理了限频、超时、分页、断点续传、并发控制、时区转换,并在本地落盘时采用了流式写入。你只需要替换 symbols 列表和日期范围,就能拉取美股、港股、A股、加密货币的历史分钟数据。


延伸方案

如果你是个人开发者:可以到官网注册申请 API KEY。免费层足够拉取中等规模的标的。

如果你是量化团队:需要更高并发或企业级SLA,可到官网联系官方获取团队方案。

如果你习惯用AI辅助开发:到 Clawhub 搜索“tickdb-market-data SKILL”,用自然语言查询历史行情。


本文不构成任何投资建议。市场有风险,投资需谨慎。

评论