前言
在量化策略研发、实盘数据采集与模型回测环节中,稳定、连续的股票实时行情数据是策略有效运行的基础。基于 WebSocket 对接行情 API 是当前量化领域主流的实时数据接入方案。
初期采用连接初始化阶段固定订阅标的的方式,可满足简单场景需求。但在多标的轮动策略、动态选股模型、多组合并行监测等场景下,需要频繁调整行情订阅列表。若每次变更订阅都断开并重建 WebSocket 连接,不仅会产生数据断档,影响回测精度与策略信号连续性,还会增加网络交互开销。
本文结合 AllTick 行情 API,分享基于单条长连接实现动态增删股票订阅的完整技术方案。从实现逻辑、代码落地、异常处理到量化场景应用价值展开说明,供各位量化研究者、策略开发者参考交流。
一、核心概念界定
股票 WebSocket 动态订阅,指在已建立的长连接链路中,通过下发协议指令调整订阅标的,全程不重建网络连接。该模式区别于连接初始化时的一次性静态订阅,能够适配订阅列表高频变动的场景,保障数据流的完整性,契合量化策略对数据连续性的要求。
二、量化场景需求与传统方案缺陷
1. 核心业务需求
针对量化数据采集、策略实盘运行、批量标的监测等场景,数据接入环节需满足两项核心要求:
- 支持单只、批量股票标的灵活新增与取消订阅,适配动态选股、多池轮动类策略;
- 调整订阅列表的过程中,其余已订阅标的的行情数据流不中断,保证策略计算、指标采样不受影响。
2. 静态订阅 + 重复建连的问题
传统方案在订阅列表变更时执行「断连 - 重连 - 重新订阅」流程,在量化应用中存在明显短板:
- 连接频繁创建与销毁,提升客户端与服务端网络负载,大规模标的采集时问题更为突出;
- 重连间隙出现数据断层,造成行情采样缺失,直接影响回测结果的真实性与实盘策略的信号输出;
- 重复执行连接鉴权、订阅初始化逻辑,代码冗余度高,不利于量化工具的长期迭代与维护。
本次优化的核心思路:复用单一 WebSocket 长连接,依靠链路内指令完成订阅管理,从底层保障数据链路稳定。
三、整体实现逻辑
整套方案分为本地订阅状态管理与WebSocket 指令交互两大模块,逻辑简洁且便于嵌入量化数据框架。
- 本地使用集合结构存储当前全部已订阅股票代码,从源头规避重复订阅请求;
- 统一执行规则:优先更新本地订阅状态,再向服务端发送控制指令,保证本地记录与服务端推送范围保持一致;
- AllTick WebSocket 协议支持在长连接生命周期内持续下发控制指令,为动态订阅功能提供协议支撑。
核心操作执行流程
标的新增订阅
校验目标标的未存在于本地订阅集合后,将标的代码写入集合;随后按照协议规范构造订阅指令,通过长连接发送至服务端,服务端随即开启对应标的的行情推送。
标的取消订阅
校验目标标的已存在于本地订阅集合后,将标的代码从集合中移除;构造取消订阅指令并下发,终止服务端对该标的的数据推送。
批量变更订阅
整体刷新本地订阅标的列表,将所有目标标的整合为单条指令批量下发,减少网络请求次数,提升大批量标的管理效率。
四、Python 完整实现代码
代码严格遵循 AllTick 接口规范,采用官方指定cmd_id=22004帧格式,使用code字段传递标的代码。内置心跳保活、空值校验、异常捕获,可直接集成至量化数据采集程序、行情监听工具中。
import websocket
import json
# 股票WebSocket端点:AllTick 官方 API Docs(WebSocket 地址说明)
WS_STOCK_URL = "wss://quote.alltick.co/quote-stock-b-ws-api?token=YOUR_TOKEN"
# 本地集合:维护已订阅标的,防止重复请求,适配量化多标的管理
stock_subscribe_set = set()
def on_open(ws):
"""连接建立回调,完成初始标的订阅"""
init_codes = ["stock_a", "stock_b"]
global stock_subscribe_set
stock_subscribe_set.update(init_codes)
# 遵循官方帧格式定义订阅消息
init_msg = {
"cmd_id": 22004,
"action": "subscribe",
"code": init_codes
}
ws.send(json.dumps(init_msg))
def on_message(ws, message):
"""行情数据接收回调,增加异常防护,保障量化数据稳定接收"""
if not message:
return
try:
data = json.loads(message)
# 此处可接入数据解析、指标计算、策略信号触发、本地存储等量化逻辑
print(data)
except json.JSONDecodeError:
return
def on_error(ws, error):
"""连接异常回调,用于程序日志记录与故障排查"""
print(f"WebSocket 连接异常:{error}")
def on_close(ws, close_status_code, close_msg):
"""连接关闭回调,可补充重连告警、状态记录逻辑"""
print("WebSocket 连接已断开")
def add_subscribe(ws, code_list):
"""动态新增标的订阅,适配动态选股、临时池添加场景"""
global stock_subscribe_set
valid_codes = [code for code in code_list if code not in stock_subscribe_set]
if not valid_codes:
return
stock_subscribe_set.update(valid_codes)
req_msg = {
"cmd_id": 22004,
"action": "subscribe",
"code": valid_codes
}
ws.send(json.dumps(req_msg))
def remove_subscribe(ws, code_list):
"""动态取消标的订阅,适配标的剔除、策略下线场景"""
global stock_subscribe_set
valid_codes = [code for code in code_list if code in stock_subscribe_set]
if not valid_codes:
return
for code in valid_codes:
stock_subscribe_set.discard(code)
req_msg = {
"cmd_id": 22004,
"action": "unsubscribe",
"code": valid_codes
}
ws.send(json.dumps(req_msg))
if __name__ == "__main__":
# 初始化客户端,10秒心跳保活,维持长连接稳定性
ws_app = websocket.WebSocketApp(
WS_STOCK_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws_app.run_forever(ping_interval=10)
五、方案在量化领域的应用价值
相较于传统反复建连的模式,本方案在量化研发、实盘运行环节具备明确优势:
- 数据连续性提升:全程复用单条长连接,调整订阅标的时无数据断档,保证行情采样完整,提升回测数据可信度与实盘策略运行稳定性;
- 资源开销优化:减少连接创建、鉴权等重复操作,降低程序算力与网络消耗,适合多标的、多策略并行运行的量化架构;
- 工程扩展性强:订阅、取消订阅逻辑模块化封装,可快速对接标的池管理、自动选股、组合调仓等量化模块,便于工具与策略迭代;
- 适配高频场景:支持单标的与批量标的操作,能够匹配轮动策略、短线模型等需要频繁调整观测标的的应用场景。
六、常见问题与排查方案
结合量化程序长期运行的实测经验,整理三类高频问题及对应处理方式,便于线上运维与调试:
-
问题:下发订阅指令后,无法接收对应标的行情数据
排查方向:核对指令内
code字段、本地集合标的代码是否一致,确认cmd_id取值为 22004处理方案:同步本地标的列表后,重新下发标准订阅指令。
-
问题:执行取消订阅后,仍持续收到该标的行情数据
排查方向:检查本地集合是否正常移除对应标的代码,确认取消订阅指令正常发送
处理方案:以本地订阅集合为基准增加数据过滤逻辑,拦截无效数据流,避免干扰策略计算。
-
问题:批量下发订阅指令后,部分标的订阅状态异常
排查方向:拆分批量标的列表,逐一对单标的进行订阅、取消订阅功能验证
处理方案:临时改用单标的逐条下发指令,规避批量协议兼容问题。
七、功能边界说明
本方案基于 AllTick 标准股票 WebSocket 协议开发,仅支持单条连接内部实现标的动态增删。无法实现跨设备、跨连接的订阅状态同步,同时不支持协议定义范围之外的自定义指令,集成至量化系统时需遵循该边界设计整体架构。

