2026量化策略回测的历史数据 API:从数据获取到策略验证

用户头像Fxdund
2026-04-15 发布

一、历史数据 API 在量化回测中的核心价值

在量化交易的世界里,“数据是基石,速度是命脉”,尤其是在 2026 年,随着量化策略向高频化、多资产化升级,数据源的选型直接决定了回测结果的准确性和策略落地的可行性。
2026量化策略回测的历史数据 API.png

一个容易被忽视的事实是:策略实盘与回测的盈利偏差中,有相当一部分源于数据源问题——毫秒级延迟、时间戳错位、数据断层,这些技术细节往往成为制约策略盈利的关键瓶颈。对于分钟级别的统计套利策略,秒级延迟可能尚可接受;但对于高频做市商或跨交易所套利策略,微秒级的差距就意味着胜负已分。

而且,数据源的切换成本极高。一旦策略围绕某个数据源的字段定义、时间戳格式和错误处理逻辑深度耦合,迁移到新数据源意味着数周甚至数月的重构工作。因此,在项目启动阶段选择一款高质量的数据 API,其价值远高于后期纠错。

对量化开发者而言,理想的回测数据 API 应当满足以下核心需求:

  1. 历史数据完整性:覆盖足够长的时间跨度,支持日线、分钟线乃至 Tick 级多粒度数据。
  2. 数据一致性:历史数据与实时行情的数据结构保持一致,减少策略切换时的适配成本。
  3. 接口易用性:提供标准 REST API 或 WebSocket,与 Python/Pandas 等量化工具栈无缝集成。
  4. 成本可控:提供合理的免费额度,按量计费模式透明。

在实际技术选型中,iTick API 提供了一个相当有参考价值的方案。它覆盖港股、美股、A 股、外汇、期货、加密货币等全球主流市场,通过 RESTful 和 WebSocket 两种接口形式满足不同场景需求——REST API 适合批量数据查询和历史数据获取,WebSocket 则为实时性要求高的交易场景提供低延迟数据流。其历史数据回溯功能支持长达 15 年的日线级数据下载,为策略回测提供了可靠支撑。

二、API 技术能力速览

在深入代码实战之前,先快速了解这个 API 的几个核心技术指标,这有助于你判断它是否适合自己的策略场景。

市场覆盖:覆盖全球多个主要市场,包括外汇(GB 市场)、股票(港股 HK、深市 SZ、沪市 SH、美股 US 等)、期货(US、HK、CN)和基金(US 等),一套 API 即可满足多资产策略的数据需求。

数据粒度:支持从 Tick 级逐笔成交、分钟线、小时线到日线、周线、月线的全粒度 K 线数据,可以满足从高频回测到长周期趋势策略的不同需求。

实时性:WebSocket 实时推送模式下,外汇数据延迟低至 30ms,主要市场行情延迟控制在 100ms 以内。配合全球节点加速网络,即便在跨市场场景下也能保持稳定的数据传输。

接口协议:同时提供 RESTful HTTP GET 请求和 WebSocket 实时推送。REST API 适合批量查询历史数据,WebSocket 则适用于低延迟的实时数据流订阅,两者使用相同的认证体系和数据格式,切换成本极低。

三、回测数据管道的代码实战

下面我们以 iTick API 为例,构建一个完整的量化回测数据管道——从历史数据获取、本地缓存,到与回测框架对接,覆盖实际开发中的核心环节。所有代码均使用 Python,你可以直接复制并根据自己的需求修改。

3.1 基础 REST API:获取历史 K 线数据

REST API 是最常用的历史数据获取方式,适用于批量下载和离线回测场景。

import requests
import pandas as pd
import time
from typing import Optional, List

class HistoricalDataClient:
    """
    历史数据客户端,基于 iTick REST API
    文档地址:https://itick.org
    """

    def __init__(self, api_token: str, base_url: str = "//api.itick.org"):
        self.api_token = api_token
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "accept": "application/json",
            "token": api_token
        })

    def get_kline(
        self,
        symbol: str,
        region: str,
        ktype: str = "8",
        limit: int = 100,
        end_time: Optional[int] = None,
        max_retries: int = 3
    ) -> pd.DataFrame:
        """
        获取历史 K 线数据

        Args:
            symbol: 股票/外汇代码(如 "AAPL", "EURUSD")
            region: 市场区域(如 "US", "HK", "SH", "SZ", "GB")
            ktype: K 线类型,"1"-"10" 分别代表 1/5/10/30 分钟、1/2/4 小时、日/周/月线
            limit: 获取的 K 线数量
            end_time: 截止时间戳(毫秒),默认为当前时间
            max_retries: 最大重试次数

        Returns:
            pandas DataFrame,包含 OHLCV 数据
        """
        endpoint = f"{self.base_url}/stock/kline"

        if end_time is None:
            end_time = int(time.time() * 1000)

        params = {
            "region": region,
            "code": symbol,
            "kType": ktype,
            "limit": limit,
            "et": end_time
        }

        for attempt in range(max_retries):
            try:
                resp = self.session.get(endpoint, params=params, timeout=30)
                resp.raise_for_status()
                data = resp.json()

                if data.get("code") != 0:
                    raise RuntimeError(f"API 返回错误: {data.get('msg')}")

                candles = data.get("data", [])
                if not candles:
                    return pd.DataFrame()

                df = pd.DataFrame(candles)
                # iTick 返回的时间戳字段名为 't'(毫秒)
                df["datetime"] = pd.to_datetime(df["t"], unit="ms")
                df.set_index("datetime", inplace=True)
                # 重命名列以匹配常规 OHLCV 命名
                df.rename(columns={
                    "o": "open",
                    "h": "high",
                    "l": "low",
                    "c": "close",
                    "v": "volume"
                }, inplace=True)
                return df[["open", "high", "low", "close", "volume"]]

            except requests.exceptions.RequestException as e:
                if attempt == max_retries - 1:
                    raise RuntimeError(f"数据获取失败: {e}")
                time.sleep(2 ** attempt)  # 指数退避重试

# 使用示例
client = HistoricalDataClient(api_token="{{YOUR_API_TOKEN}}")
df = client.get_kline(
    symbol="AAPL",
    region="US",
    ktype="8",      # 日线
    limit=200       # 获取最近 200 根日线
)
print(f"获取到 {len(df)} 条日线数据")
print(df.head())

3.2 批量获取与本地缓存

对于大规模回测场景,频繁调用 API 不仅受限于速率限额,还会拖慢回测速度。建议建立本地数据缓存机制:

import sqlite3
import json
from pathlib import Path
from datetime import datetime

class CachedDataClient(HistoricalDataClient):
    """带本地 SQLite 缓存的增强版客户端"""

    def __init__(self, api_token: str, cache_dir: str = "./data_cache"):
        super().__init__(api_token)
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self._init_db()

    def _init_db(self):
        """初始化 SQLite 数据库"""
        self.db_path = self.cache_dir / "market_data.db"
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS kline_cache (
                    symbol TEXT NOT NULL,
                    region TEXT NOT NULL,
                    ktype TEXT NOT NULL,
                    end_time INTEGER NOT NULL,
                    limit_num INTEGER NOT NULL,
                    data_json TEXT NOT NULL,
                    cached_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY (symbol, region, ktype, end_time, limit_num)
                )
            """)

    def get_kline_cached(
        self,
        symbol: str,
        region: str,
        ktype: str = "8",
        limit: int = 100,
        end_time: Optional[int] = None,
        force_refresh: bool = False
    ) -> pd.DataFrame:
        """优先从缓存获取,未命中时调用 API 并存入缓存"""

        if end_time is None:
            end_time = int(time.time() * 1000)

        # 生成缓存键
        cache_key = (symbol, region, ktype, end_time, limit)

        # 1. 检查缓存(除非强制刷新)
        if not force_refresh:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute(
                    """SELECT data_json FROM kline_cache
                       WHERE symbol=? AND region=? AND ktype=?
                       AND end_time=? AND limit_num=?""",
                    cache_key
                )
                row = cursor.fetchone()
                if row:
                    print(f"✅ 命中缓存: {symbol} ({region})")
                    return pd.read_json(row[0], orient="split")

        # 2. 缓存未命中,调用 API
        print(f"⏳ 调用 API: {symbol} ({region})")
        df = super().get_kline(symbol, region, ktype, limit, end_time)

        if df.empty:
            return df

        # 3. 存入缓存
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT OR REPLACE INTO kline_cache
                   (symbol, region, ktype, end_time, limit_num, data_json)
                   VALUES (?, ?, ?, ?, ?, ?)""",
                (*cache_key, df.to_json(orient="split"))
            )

        return df

# 使用示例
cached_client = CachedDataClient(api_token="{{YOUR_API_TOKEN}}")
df = cached_client.get_kline_cached("600519", "SH", ktype="8", limit=100)
print(f"获取到 {len(df)} 条日线数据")

3.3 接入回测框架

获取数据后,需要将其接入量化回测框架。以 Backtrader 为例:

import backtrader as bt

class PandasDataFeed(bt.feeds.PandasData):
    """将 pandas DataFrame 转换为 Backtrader 数据源"""
    params = (
        ('datetime', None),
        ('open', 'open'),
        ('high', 'high'),
        ('low', 'low'),
        ('close', 'close'),
        ('volume', 'volume'),
        ('openinterest', -1),
    )

def run_backtest_with_iTick_data():
    """使用 iTick 历史数据运行回测"""
    cerebro = bt.Cerebro()

    # 初始化客户端
    client = CachedDataClient(api_token="{{YOUR_API_TOKEN}}")

    # 获取多只股票的历史数据
    symbols = [
        {"symbol": "AAPL", "region": "US", "name": "Apple"},
        {"symbol": "MSFT", "region": "US", "name": "Microsoft"},
        {"symbol": "600519", "region": "SH", "name": "Kweichow Moutai"}
    ]

    for item in symbols:
        df = client.get_kline_cached(
            symbol=item["symbol"],
            region=item["region"],
            ktype="8",    # 日线
            limit=500     # 约 2 年数据
        )
        if not df.empty:
            data = PandasDataFeed(dataname=df)
            cerebro.adddata(data, name=item["name"])
            print(f"✅ 已加载 {item['name']} 数据,{len(df)} 条")

    # 设置初始资金
    cerebro.broker.setcash(100000.0)
    print(f"初始资金: {cerebro.broker.getvalue():.2f}")

    # 添加策略(这里使用内置的简单均线策略作为示例)
    cerebro.addstrategy(bt.strategies.SMA_CrossOver)

    # 运行回测
    results = cerebro.run()
    print(f"回测后资金: {cerebro.broker.getvalue():.2f}")

if __name__ == "__main__":
    run_backtest_with_iTick_data()

3.4 WebSocket 实时数据订阅

对于需要实时验证策略信号的场景,iTick 的 WebSocket 接口提供了低延迟的数据推送能力。实测中外汇数据延迟低至 30ms,股票行情延迟控制在 100ms 以内。

import websocket
import json
import threading
import time

class iTickWebSocketClient:
    """iTick WebSocket 实时行情客户端"""

    def __init__(self, api_token: str):
        self.api_token = api_token
        self.ws_url = "wss://api.itick.org/sws"
        self.ws = None
        self.is_connected = False
        self.subscribed_symbols = set()
        self.on_quote_callback = None

    def connect(self):
        """建立 WebSocket 连接"""
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            header={"token": self.api_token}
        )
        threading.Thread(target=self.ws.run_forever, daemon=True).start()

    def _on_open(self, ws):
        print("✅ WebSocket 连接已建立")
        self.is_connected = True

        # 发送认证消息
        auth_msg = {"ac": "auth", "params": self.api_token}
        ws.send(json.dumps(auth_msg))

    def _on_message(self, ws, message):
        """处理收到的行情数据"""
        try:
            data = json.loads(message)
            if self.on_quote_callback:
                self.on_quote_callback(data)
        except json.JSONDecodeError:
            print(f"⚠️ 无法解析消息: {message}")

    def _on_error(self, ws, error):
        print(f"❌ WebSocket 错误: {error}")

    def _on_close(self, ws, close_status_code, close_msg):
        print("🔌 WebSocket 连接已关闭")
        self.is_connected = False

    def subscribe(self, symbols: list, data_types: list = None):
        """
        订阅实时行情

        Args:
            symbols: 股票代码列表,格式如 ["600519$SH", "AAPL$US", "EURUSD$GB"]
            data_types: 数据类型,可选 "quote", "depth", "tick",默认 ["quote"]
        """
        if not self.is_connected:
            raise RuntimeError("WebSocket 未连接,请先调用 connect()")

        if data_types is None:
            data_types = ["quote"]

        params = ",".join(symbols)
        types = ",".join(data_types)

        subscribe_msg = {
            "ac": "subscribe",
            "params": params,
            "types": types
        }
        self.ws.send(json.dumps(subscribe_msg))
        print(f"📡 已订阅: {params}")

# 使用示例
def on_quote_received(data):
    """行情数据回调函数"""
    if data.get("ac") == "quote":
        print(f"收到报价: {data}")

# 创建客户端并订阅
ws_client = iTickWebSocketClient(api_token="{{YOUR_API_TOKEN}}")
ws_client.on_quote_callback = on_quote_received
ws_client.connect()

# 等待连接建立
time.sleep(2)

# 订阅多只股票的实时报价
ws_client.subscribe([
    "600519$SH",   # 贵州茅台(A股)
    "AAPL$US",     # 苹果(美股)
    "EURUSD$GB"    # 欧元/美元(外汇)
])

四、回测数据质量的三大技术考量

4.1 前瞻偏差防护

写交易模型时最容易犯的错误就是前瞻偏差——代码里不小心用了未来数据。例如,在计算当日的技术指标时使用了当日的收盘价作为输入,而实际交易中收盘价在收盘前是未知的。在使用历史数据 API 时,务必确认数据的时间戳是交易发生时刻而非数据发布时刻。iTick 返回的 K 线数据中每根 K 线都带有毫秒级时间戳,可以帮助开发者在回测框架中精确控制时间逻辑。

4.2 复权数据处理

股票的分红、拆股和配股会直接影响价格序列的连续性。如果使用未复权的历史价格进行回测,可能产生虚假的交易信号——比如拆股后价格突然“腰斩”会触发错误的止损信号。专业级 API 通常会提供复权选项,在使用时需要确认数据是否已做复权处理,以及复权的计算口径。

4.3 跨市场时间戳对齐

在跨交易所套利策略中,不同交易所行情数据的时间戳偏差可能导致“虚假套利信号”。例如,同一标的在 A 市场和 B 市场的报价时间差如果超过 50ms,回测中看到的价差可能在实际交易中并不存在。iTick 通过全球节点加速网络实现多市场数据同步推送,将跨市场数据偏差控制在毫秒级,这是免费数据源难以保证的。

五、总结与建议

对于量化开发者,建议遵循以下路径来高效利用历史数据 API:

验证阶段:先使用免费数据额度快速验证策略逻辑,无需过早投入成本。iTick 的免费方案已包含基础实时行情和历史 K 线查询,足够完成初步的策略验证。

优化阶段:当策略在基础数据上表现稳定后,利用 API 提供的高质量历史数据进行精细化回测。重点关注数据粒度的切换——从日线回测升级到分钟线甚至 Tick 级回测,往往会暴露出策略在日线级别上掩盖的问题。

实盘准备:确保回测所用的历史数据与实盘行情的数据结构、时间戳格式和字段定义完全一致。iTick 的 REST 和 WebSocket 接口使用相同的认证体系和数据格式,这使得从回测到实盘的切换几乎无缝。

成本控制:充分利用本地缓存机制,避免重复下载相同数据。一次批量获取后存入 SQLite,后续回测直接从本地读取,既省费用又提速。对于长期维护的量化项目,建议建立一套自动化的数据更新脚本,定期拉取增量数据。

声明:本文内容仅供参考,不构成任何投资建议。

参考文档:https://blog.itick.org/itick-ema12-strategy-backtesting-tutorial
GitHub:https://github.com/itick-org/

评论