一、场景与研究背景
在港股量化策略开发、实盘模拟与历史回测流程中,行情数据连续性直接决定模型输出可信度。若 Tick 序列存在缺漏,分时均价、滚动成交量、盘口套利因子、高频交易信号全部产生系统性偏差,回测结果失真,无法作为实盘依据。
早期行情接入方案普遍采用「标的变更即重连 WebSocket」逻辑,单标的本地测试无明显异常,但多标的轮动、自选池批量切换场景下会暴露稳定性缺陷:每次重建连接重置序列计数器,叠加网络抖动、消费线程瓶颈,Tick 序列号频繁跳变,形成数据缺口;手动补齐历史 Tick 耗时较长,难以满足高频策略实时运行与批量回测的数据连续性要求。
本文基于长连接增量订阅架构,实现不中断链路增减观测标的,搭配序列连续性校验、自动区间补数、消息缓冲状态机完整流程,提供可直接接入量化系统的 Python 实现,从底层解决 Tick 断层带来的回测失真问题,适用于高频 T+0、指标量化、盘口对冲等研究场景。
二、量化系统核心数据需求
- 单持久 WebSocket 承载多只港股,策略轮动、观测池调整时无需断开链路,实时 Tick 流不中断,保证连续因子采样;
- 每笔 Tick 携带自增序列号,消费层实时校验连续性,检测到缺口自动调用历史接口拉取缺失区间数据,回填至数据流;
- 本地维护订阅标的集合,前置过滤重复订阅、空参数指令,减少冗余 Tick 带来的因子重复计算与内存开销;
- 集成心跳存活检测、链路异常恢复、乱序缓冲机制,长时间策略运行、隔夜回测任务期间维持数据流稳定输入。
三、四类影响回测的数据断层根源
1. 全量重连重置序列号,大规模 Tick 丢失
每次调整标的销毁重建 Socket,本地 last_seq 缓存清空,新连接序列号从服务当前数值重新计数,新旧数据流无法衔接。大范围数据缺失会直接导致长周期回测样本不足,收益曲线、最大回撤完全偏离真实行情。
2. 网络延迟、消费滞后引发小幅序列跳变
高波动时段 Tick 推送密度陡增,若消费处理速度跟不上推送速率,或短时数据包丢失,会出现序列号跳跃(例如 1003 直接跳到 1008)。少量 Tick 缺失会干扰微观价格因子、短期动量信号的计算精度。
3. 并发订阅指令造成本地与服务端状态不一致
策略批量切换标的时并发发送增删指令,本地标的集合和服务订阅列表错位,产生两类脏数据:重复 Tick 抬高 CPU 负载、标的漏推送中断因子采样。
4. 无缓冲隔离实时流与补数流,时序错乱
检测缺口后仅等待历史数据,新实时 Tick 持续涌入,新旧数据无序堆叠,时序混乱,基于滚动窗口、时间序列的量化模型全部失效。
四、核心技术定义:增量动态订阅
动态增量订阅依托单条长期存活 WebSocket 链路,通过专用控制帧携带新增 / 移除标的代码清单,无需断开、重建连接即可更新服务订阅清单。区别于定时 REST 轮询、改标的即全量重订阅两种传统方案,可消除重连风暴,维持 Tick 时序完整,适合长时间量化运行、连续回测数据采集。
五、场景与参数对照验证表
| 应用场景 | 量化数据痛点 | 订阅控制参数(指令 ID / 操作 / 标的代码) | 验证基准(回测数据层) |
|---|---|---|---|
| 程序启动批量加载回测标的 | 新增标的需要重连,中断因子连续采样 | cmd_id=22004,action=add,code=[00700.HK,9988.HK] | 本地订阅集合与输入标的数量匹配,同一条链路 Tick 序列号持续递增无重置 |
| 策略轮动新增观测标的 | 重连重置序列号,产生区间数据缺失 | cmd_id=22004,action=add,code=[新增标的代码] | 原有链路保持存活,新旧 Tick 序列号连续,因子采样无中断 |
| 策略停止跟踪指定标的 | 持续接收无效 Tick,占用回测计算资源 | cmd_id=22004,action=del,code=[待移除代码] | 本地集合移除对应标的,不再接收该标的 Tick,释放算力 |
| 重复发送新增标的指令 | 重复 Tick 导致因子重复运算,样本重叠 | cmd_id=22004,action=add,code=[已订阅代码] | 本地前置去重,拦截重复指令,无冗余数据流入计算层 |
| 传入空标的列表指令 | 订阅状态混乱,随机丢失部分标的 Tick | cmd_id=22004,action=add/del,code=[] | 本地前置参数校验,空列表直接丢弃,不向服务发送 |
六、完整 Python 量化接入代码
import websocket
import json
import threading
import time
# 港股专属行情WebSocket接入地址
WS_STOCK_URL = "wss://quote.alltick.co/quote-stock-b-ws-api?token=YOUR_TOKEN"
# 量化系统本地状态变量
subscriptions = set() # 当前有效观测标的集合,自动去重
last_seq = None # 上一条Tick序列号,用于连续性校验
msg_buffer = [] # 序列号缺口缓冲区,补数完成后有序回放供因子计算
ws_app = None
def send_subscribe_action(action: str, code_list: list):
"""发送增量订阅控制帧,add新增标的,del移除标的,固定cmd_id=22004"""
global ws_app
if not ws_app or not ws_app.sock or not ws_app.connected:
return
if len(code_list) == 0:
return
unique_codes = list(set(code_list))
ctrl_frame = {
"cmd_id": 22004,
"action": action,
"code": unique_codes
}
ws_app.send(json.dumps(ctrl_frame))
# 同步更新本地标的集合,保持与服务状态对齐
if action == "add":
for c in unique_codes:
subscriptions.add(c)
elif action == "del":
for c in unique_codes:
if c in subscriptions:
subscriptions.remove(c)
def request_missing_tick(start_seq: int, end_seq: int):
"""检测序列缺口,调用历史接口拉取缺失Tick,用于回填回测数据集"""
print(f"检测Tick序列缺口,拉取缺失区间 seq:{start_seq+1} ~ {end_seq-1}")
# 此处可扩展HTTP历史Tick请求逻辑,取回数据有序插入缓冲区前端
def check_seq_continuity(current_seq: int) -> bool:
"""实时序列号校验,断层自动触发补数,保障回测样本完整"""
global last_seq
if last_seq is None:
last_seq = current_seq
return True
if current_seq != last_seq + 1:
request_missing_tick(last_seq, current_seq)
return False
last_seq = current_seq
return True
def on_open(ws):
"""链路建立后加载初始回测标的池"""
init_codes = ["00700.HK", "9988.HK", "09992.HK"]
send_subscribe_action("add", init_codes)
print("WebSocket链路建立,完成初始量化标的订阅,开始持续采集Tick数据")
def on_message(ws):
"""Tick回调入口:报文过滤、序列号校验、缓冲存储,因子计算前置统一处理"""
global last_seq, msg_buffer
if not message or len(message.strip()) == 0:
return
try:
tick_data = json.loads(message)
except Exception:
return
if "seq" not in tick_data or "code" not in tick_data:
return
tick_seq = tick_data["seq"]
tick_code = tick_data["code"]
# 过滤无价格空报文,避免污染回测样本
if tick_data.get("price") in (0, None) and tick_data.get("open") in (0, None):
return
check_seq_continuity(tick_seq)
msg_buffer.append(tick_data)
last_seq = tick_seq
def on_error(ws, error):
print(f"WebSocket链路异常,Tick采集中断:{error}")
def on_close(ws, close_code, close_msg):
"""链路断开重置本地状态,重连后重新订阅恢复数据采集"""
global last_seq, msg_buffer, subscriptions
print(f"链路断开,断开码:{close_code},详情:{close_msg}")
last_seq = None
msg_buffer.clear()
subscriptions.clear()
def ws_runner():
global ws_app
ws_app = websocket.WebSocketApp(
WS_STOCK_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
# 10秒心跳保活,规避假活链路导致无感知丢包,保障长时回测稳定
ws_app.run_forever(ping_interval=10, ping_timeout=5)
if __name__ == "__main__":
# 异步启动行情采集线程,不阻塞主线程策略运行
tick_thread = threading.Thread(target=ws_runner, daemon=True)
tick_thread.start()
time.sleep(2)
# 模拟量化策略新增跟踪标的
send_subscribe_action("add", ["01299.HK"])
time.sleep(10)
# 模拟量化策略移除跟踪标的
send_subscribe_action("del", ["01299.HK"])
while True:
time.sleep(1)
七、量化运行高频故障排查(现象|检测手段|解决方案)
1. 高波动时段缓冲区持续堆积,因子计算延迟,回测效率下降
现象:msg_buffer 长度持续上涨,单 tick 因子运算耗时拉长,批量回测周期显著增加;
检测手段:埋点记录缓冲长度、单 tick 处理耗时,监控线程 CPU 占用;
解决方案:独立异步线程批量消费缓冲区,设置容量阈值;超出上限拉取行情快照对齐状态,丢弃过期 tick 释放算力。
2. 链路假活无报错,序列号持续断裂,回测样本长期缺失
现象:无断开回调,但连续多段 tick 序列号不连续,回测存在大量空白区间;
检测手段:统计连续缺口次数,监控 ping-pong 心跳响应周期;
解决方案:增加业务层序列号超时判断,连续 5 次缺口主动断连重建;重连后拉取全量快照统一校准时序。
3. 快速切换标的引发指令竞态,本地与服务订阅状态错位
现象:本地已移除标的仍持续推送 tick,多余数据干扰多标的并行回测;
检测手段:日志留存所有 add/del 指令与 tick 标的代码,交叉比对本地集合;
解决方案:订阅指令增加线程锁,增删操作串行执行;每条指令下发延迟 200ms 校验流入数据,状态异常自动修正本地集合。
4. 标的代码缺失.HK 后缀,订阅静默失效,单标的回测无样本
现象:下发订阅无报错,但长期无对应 tick 流入,该标的回测数据集为空;
检测手段:比对下发 code 与接口规范格式;
解决方案:本地配置港股代码校验规则,缺失市场后缀的无效代码直接拦截。
八、方案适用边界
支持场景:单条持久 WebSocket 链路内,通过固定指令 ID 动态增删港股观测标的,无需重建连接,持续输出完整 tick 时序供量化策略、历史回测使用;
不支持场景:多条 WebSocket 之间同步订阅状态、实时接口一次性全量回溯历史 tick、非规范私有控制指令交互。
九、研究总结
本套行情采集架构依托行情 API 提供的 WebSocket 动态订阅能力搭建,核心通过实时序列号校验 + 自动区间补数解决港股 tick 序列号跳变、时序断层问题,从底层保障量化回测与实盘采样的数据完整性。
文中代码可直接集成至各类量化框架,同时梳理四类长时间跑盘高频故障与标准化处理逻辑,适用于高频量化、多标的并行回测、盘口因子挖掘等场景。开发与回测阶段遵循接口边界约束,能够规避重连风暴、时序错乱带来的模型偏差,减少人工修复数据、重复回测的时间成本,提升量化研究的数据可信度。

