一、历史数据 API 在量化回测中的核心价值
在量化交易的世界里,“数据是基石,速度是命脉”,尤其是在 2026 年,随着量化策略向高频化、多资产化升级,数据源的选型直接决定了回测结果的准确性和策略落地的可行性。

一个容易被忽视的事实是:策略实盘与回测的盈利偏差中,有相当一部分源于数据源问题——毫秒级延迟、时间戳错位、数据断层,这些技术细节往往成为制约策略盈利的关键瓶颈。对于分钟级别的统计套利策略,秒级延迟可能尚可接受;但对于高频做市商或跨交易所套利策略,微秒级的差距就意味着胜负已分。
而且,数据源的切换成本极高。一旦策略围绕某个数据源的字段定义、时间戳格式和错误处理逻辑深度耦合,迁移到新数据源意味着数周甚至数月的重构工作。因此,在项目启动阶段选择一款高质量的数据 API,其价值远高于后期纠错。
对量化开发者而言,理想的回测数据 API 应当满足以下核心需求:
- 历史数据完整性:覆盖足够长的时间跨度,支持日线、分钟线乃至 Tick 级多粒度数据。
- 数据一致性:历史数据与实时行情的数据结构保持一致,减少策略切换时的适配成本。
- 接口易用性:提供标准 REST API 或 WebSocket,与 Python/Pandas 等量化工具栈无缝集成。
- 成本可控:提供合理的免费额度,按量计费模式透明。
在实际技术选型中,iTick API 提供了一个相当有参考价值的方案。它覆盖港股、美股、A 股、外汇、期货、加密货币等全球主流市场,通过 RESTful 和 WebSocket 两种接口形式满足不同场景需求——REST API 适合批量数据查询和历史数据获取,WebSocket 则为实时性要求高的交易场景提供低延迟数据流。其历史数据回溯功能支持长达 15 年的日线级数据下载,为策略回测提供了可靠支撑。
二、API 技术能力速览
在深入代码实战之前,先快速了解这个 API 的几个核心技术指标,这有助于你判断它是否适合自己的策略场景。
市场覆盖:覆盖全球多个主要市场,包括外汇(GB 市场)、股票(港股 HK、深市 SZ、沪市 SH、美股 US 等)、期货(US、HK、CN)和基金(US 等),一套 API 即可满足多资产策略的数据需求。
数据粒度:支持从 Tick 级逐笔成交、分钟线、小时线到日线、周线、月线的全粒度 K 线数据,可以满足从高频回测到长周期趋势策略的不同需求。
实时性:WebSocket 实时推送模式下,外汇数据延迟低至 30ms,主要市场行情延迟控制在 100ms 以内。配合全球节点加速网络,即便在跨市场场景下也能保持稳定的数据传输。
接口协议:同时提供 RESTful HTTP GET 请求和 WebSocket 实时推送。REST API 适合批量查询历史数据,WebSocket 则适用于低延迟的实时数据流订阅,两者使用相同的认证体系和数据格式,切换成本极低。
三、回测数据管道的代码实战
下面我们以 iTick API 为例,构建一个完整的量化回测数据管道——从历史数据获取、本地缓存,到与回测框架对接,覆盖实际开发中的核心环节。所有代码均使用 Python,你可以直接复制并根据自己的需求修改。
3.1 基础 REST API:获取历史 K 线数据
REST API 是最常用的历史数据获取方式,适用于批量下载和离线回测场景。
import requests
import pandas as pd
import time
from typing import Optional, List
class HistoricalDataClient:
"""
历史数据客户端,基于 iTick REST API
文档地址:https://itick.org
"""
def __init__(self, api_token: str, base_url: str = "//api.itick.org"):
self.api_token = api_token
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"accept": "application/json",
"token": api_token
})
def get_kline(
self,
symbol: str,
region: str,
ktype: str = "8",
limit: int = 100,
end_time: Optional[int] = None,
max_retries: int = 3
) -> pd.DataFrame:
"""
获取历史 K 线数据
Args:
symbol: 股票/外汇代码(如 "AAPL", "EURUSD")
region: 市场区域(如 "US", "HK", "SH", "SZ", "GB")
ktype: K 线类型,"1"-"10" 分别代表 1/5/10/30 分钟、1/2/4 小时、日/周/月线
limit: 获取的 K 线数量
end_time: 截止时间戳(毫秒),默认为当前时间
max_retries: 最大重试次数
Returns:
pandas DataFrame,包含 OHLCV 数据
"""
endpoint = f"{self.base_url}/stock/kline"
if end_time is None:
end_time = int(time.time() * 1000)
params = {
"region": region,
"code": symbol,
"kType": ktype,
"limit": limit,
"et": end_time
}
for attempt in range(max_retries):
try:
resp = self.session.get(endpoint, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
raise RuntimeError(f"API 返回错误: {data.get('msg')}")
candles = data.get("data", [])
if not candles:
return pd.DataFrame()
df = pd.DataFrame(candles)
# iTick 返回的时间戳字段名为 't'(毫秒)
df["datetime"] = pd.to_datetime(df["t"], unit="ms")
df.set_index("datetime", inplace=True)
# 重命名列以匹配常规 OHLCV 命名
df.rename(columns={
"o": "open",
"h": "high",
"l": "low",
"c": "close",
"v": "volume"
}, inplace=True)
return df[["open", "high", "low", "close", "volume"]]
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise RuntimeError(f"数据获取失败: {e}")
time.sleep(2 ** attempt) # 指数退避重试
# 使用示例
client = HistoricalDataClient(api_token="{{YOUR_API_TOKEN}}")
df = client.get_kline(
symbol="AAPL",
region="US",
ktype="8", # 日线
limit=200 # 获取最近 200 根日线
)
print(f"获取到 {len(df)} 条日线数据")
print(df.head())
3.2 批量获取与本地缓存
对于大规模回测场景,频繁调用 API 不仅受限于速率限额,还会拖慢回测速度。建议建立本地数据缓存机制:
import sqlite3
import json
from pathlib import Path
from datetime import datetime
class CachedDataClient(HistoricalDataClient):
"""带本地 SQLite 缓存的增强版客户端"""
def __init__(self, api_token: str, cache_dir: str = "./data_cache"):
super().__init__(api_token)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self._init_db()
def _init_db(self):
"""初始化 SQLite 数据库"""
self.db_path = self.cache_dir / "market_data.db"
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS kline_cache (
symbol TEXT NOT NULL,
region TEXT NOT NULL,
ktype TEXT NOT NULL,
end_time INTEGER NOT NULL,
limit_num INTEGER NOT NULL,
data_json TEXT NOT NULL,
cached_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (symbol, region, ktype, end_time, limit_num)
)
""")
def get_kline_cached(
self,
symbol: str,
region: str,
ktype: str = "8",
limit: int = 100,
end_time: Optional[int] = None,
force_refresh: bool = False
) -> pd.DataFrame:
"""优先从缓存获取,未命中时调用 API 并存入缓存"""
if end_time is None:
end_time = int(time.time() * 1000)
# 生成缓存键
cache_key = (symbol, region, ktype, end_time, limit)
# 1. 检查缓存(除非强制刷新)
if not force_refresh:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""SELECT data_json FROM kline_cache
WHERE symbol=? AND region=? AND ktype=?
AND end_time=? AND limit_num=?""",
cache_key
)
row = cursor.fetchone()
if row:
print(f"✅ 命中缓存: {symbol} ({region})")
return pd.read_json(row[0], orient="split")
# 2. 缓存未命中,调用 API
print(f"⏳ 调用 API: {symbol} ({region})")
df = super().get_kline(symbol, region, ktype, limit, end_time)
if df.empty:
return df
# 3. 存入缓存
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO kline_cache
(symbol, region, ktype, end_time, limit_num, data_json)
VALUES (?, ?, ?, ?, ?, ?)""",
(*cache_key, df.to_json(orient="split"))
)
return df
# 使用示例
cached_client = CachedDataClient(api_token="{{YOUR_API_TOKEN}}")
df = cached_client.get_kline_cached("600519", "SH", ktype="8", limit=100)
print(f"获取到 {len(df)} 条日线数据")
3.3 接入回测框架
获取数据后,需要将其接入量化回测框架。以 Backtrader 为例:
import backtrader as bt
class PandasDataFeed(bt.feeds.PandasData):
"""将 pandas DataFrame 转换为 Backtrader 数据源"""
params = (
('datetime', None),
('open', 'open'),
('high', 'high'),
('low', 'low'),
('close', 'close'),
('volume', 'volume'),
('openinterest', -1),
)
def run_backtest_with_iTick_data():
"""使用 iTick 历史数据运行回测"""
cerebro = bt.Cerebro()
# 初始化客户端
client = CachedDataClient(api_token="{{YOUR_API_TOKEN}}")
# 获取多只股票的历史数据
symbols = [
{"symbol": "AAPL", "region": "US", "name": "Apple"},
{"symbol": "MSFT", "region": "US", "name": "Microsoft"},
{"symbol": "600519", "region": "SH", "name": "Kweichow Moutai"}
]
for item in symbols:
df = client.get_kline_cached(
symbol=item["symbol"],
region=item["region"],
ktype="8", # 日线
limit=500 # 约 2 年数据
)
if not df.empty:
data = PandasDataFeed(dataname=df)
cerebro.adddata(data, name=item["name"])
print(f"✅ 已加载 {item['name']} 数据,{len(df)} 条")
# 设置初始资金
cerebro.broker.setcash(100000.0)
print(f"初始资金: {cerebro.broker.getvalue():.2f}")
# 添加策略(这里使用内置的简单均线策略作为示例)
cerebro.addstrategy(bt.strategies.SMA_CrossOver)
# 运行回测
results = cerebro.run()
print(f"回测后资金: {cerebro.broker.getvalue():.2f}")
if __name__ == "__main__":
run_backtest_with_iTick_data()
3.4 WebSocket 实时数据订阅
对于需要实时验证策略信号的场景,iTick 的 WebSocket 接口提供了低延迟的数据推送能力。实测中外汇数据延迟低至 30ms,股票行情延迟控制在 100ms 以内。
import websocket
import json
import threading
import time
class iTickWebSocketClient:
"""iTick WebSocket 实时行情客户端"""
def __init__(self, api_token: str):
self.api_token = api_token
self.ws_url = "wss://api.itick.org/sws"
self.ws = None
self.is_connected = False
self.subscribed_symbols = set()
self.on_quote_callback = None
def connect(self):
"""建立 WebSocket 连接"""
self.ws = websocket.WebSocketApp(
self.ws_url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
header={"token": self.api_token}
)
threading.Thread(target=self.ws.run_forever, daemon=True).start()
def _on_open(self, ws):
print("✅ WebSocket 连接已建立")
self.is_connected = True
# 发送认证消息
auth_msg = {"ac": "auth", "params": self.api_token}
ws.send(json.dumps(auth_msg))
def _on_message(self, ws, message):
"""处理收到的行情数据"""
try:
data = json.loads(message)
if self.on_quote_callback:
self.on_quote_callback(data)
except json.JSONDecodeError:
print(f"⚠️ 无法解析消息: {message}")
def _on_error(self, ws, error):
print(f"❌ WebSocket 错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
print("🔌 WebSocket 连接已关闭")
self.is_connected = False
def subscribe(self, symbols: list, data_types: list = None):
"""
订阅实时行情
Args:
symbols: 股票代码列表,格式如 ["600519$SH", "AAPL$US", "EURUSD$GB"]
data_types: 数据类型,可选 "quote", "depth", "tick",默认 ["quote"]
"""
if not self.is_connected:
raise RuntimeError("WebSocket 未连接,请先调用 connect()")
if data_types is None:
data_types = ["quote"]
params = ",".join(symbols)
types = ",".join(data_types)
subscribe_msg = {
"ac": "subscribe",
"params": params,
"types": types
}
self.ws.send(json.dumps(subscribe_msg))
print(f"📡 已订阅: {params}")
# 使用示例
def on_quote_received(data):
"""行情数据回调函数"""
if data.get("ac") == "quote":
print(f"收到报价: {data}")
# 创建客户端并订阅
ws_client = iTickWebSocketClient(api_token="{{YOUR_API_TOKEN}}")
ws_client.on_quote_callback = on_quote_received
ws_client.connect()
# 等待连接建立
time.sleep(2)
# 订阅多只股票的实时报价
ws_client.subscribe([
"600519$SH", # 贵州茅台(A股)
"AAPL$US", # 苹果(美股)
"EURUSD$GB" # 欧元/美元(外汇)
])
四、回测数据质量的三大技术考量
4.1 前瞻偏差防护
写交易模型时最容易犯的错误就是前瞻偏差——代码里不小心用了未来数据。例如,在计算当日的技术指标时使用了当日的收盘价作为输入,而实际交易中收盘价在收盘前是未知的。在使用历史数据 API 时,务必确认数据的时间戳是交易发生时刻而非数据发布时刻。iTick 返回的 K 线数据中每根 K 线都带有毫秒级时间戳,可以帮助开发者在回测框架中精确控制时间逻辑。
4.2 复权数据处理
股票的分红、拆股和配股会直接影响价格序列的连续性。如果使用未复权的历史价格进行回测,可能产生虚假的交易信号——比如拆股后价格突然“腰斩”会触发错误的止损信号。专业级 API 通常会提供复权选项,在使用时需要确认数据是否已做复权处理,以及复权的计算口径。
4.3 跨市场时间戳对齐
在跨交易所套利策略中,不同交易所行情数据的时间戳偏差可能导致“虚假套利信号”。例如,同一标的在 A 市场和 B 市场的报价时间差如果超过 50ms,回测中看到的价差可能在实际交易中并不存在。iTick 通过全球节点加速网络实现多市场数据同步推送,将跨市场数据偏差控制在毫秒级,这是免费数据源难以保证的。
五、总结与建议
对于量化开发者,建议遵循以下路径来高效利用历史数据 API:
验证阶段:先使用免费数据额度快速验证策略逻辑,无需过早投入成本。iTick 的免费方案已包含基础实时行情和历史 K 线查询,足够完成初步的策略验证。
优化阶段:当策略在基础数据上表现稳定后,利用 API 提供的高质量历史数据进行精细化回测。重点关注数据粒度的切换——从日线回测升级到分钟线甚至 Tick 级回测,往往会暴露出策略在日线级别上掩盖的问题。
实盘准备:确保回测所用的历史数据与实盘行情的数据结构、时间戳格式和字段定义完全一致。iTick 的 REST 和 WebSocket 接口使用相同的认证体系和数据格式,这使得从回测到实盘的切换几乎无缝。
成本控制:充分利用本地缓存机制,避免重复下载相同数据。一次批量获取后存入 SQLite,后续回测直接从本地读取,既省费用又提速。对于长期维护的量化项目,建议建立一套自动化的数据更新脚本,定期拉取增量数据。
声明:本文内容仅供参考,不构成任何投资建议。
参考文档:https://blog.itick.org/itick-ema12-strategy-backtesting-tutorial
GitHub:https://github.com/itick-org/

