量化策略实战:WebSocket 行情推送

用户头像sh_****559rtx
2026-05-29 发布

我用 Python 写贵金属的日内策略时,最先遇到的麻烦不是因子怎么构建,而是数据源头的问题。为了降低滑点和延迟,我选择直接通过 WebSocket 接入实时报价,但发现黄金、白银、铂金等品种的 tick 都是从一个地址推过来的,JSON 结构完全一致。如果不在策略的“入口函数”里把资产分开,我的趋势策略可能误把白银的涨幅当成黄金的开仓信号,那就闹笑话了。

需求很明确:在一个异步事件循环里,根据消息内部字段实现微秒级的分流,让每个品种的行情各自进入独立的因子计算队列。数据痛点就在于 WebSocket 的高频推送是混合的,任何多余的字符串匹配或遍历都会累积成策略延迟。

以 symbol 为键构建分发器

贵金属行情消息中一定有类似 symbol 的字段。我会在策略初始化时就把品种代码和内部标签一一对应,存入字典:

字段名 含义 示例
symbol 资产代码 XAUUSD、XAGUSD
instrumentId 平台内部唯一标识 1001、1002
type 资产类别 gold、silver
asset_map = {
    "XAUUSD": "gold",
    "XAGUSD": "silver",
    "XPTUSD": "platinum"
}

on_message 回调里,拿到 symbol 后直接字典取值,再调用对应品种的策略函数。因为字典查找是哈希操作,几乎不耗时,可以放心放在高频回调里。

按品种分桶缓存,批量驱动策略更新

如果我每收到一个 tick 就触发一次完整的策略运算,哪怕是向量化计算也撑不住。我的做法是先写入分桶的内存缓存:

# 只做最简单的缓存记录
def on_message(msg):
    symbol = msg['symbol']
    price = msg['price']
    asset_type = asset_map.get(symbol, "unknown")
    cache[asset_type][symbol] = price

然后在定时器里(比如每 100 毫秒)从 cache["gold"]cache["silver"] 中取出最新价格,驱动信号生成。这样黄金的均线、布林带,白银的波动率锥,不会在任何计算步骤中发生串扰。

精准订阅以收紧数据流

WebSocket 客户端初始化时,我就把订阅列表限定为策略实际交易的品种,不相关的品种一条都不收。部分接口还可以按 asset_type 批量订阅,能进一步减少无效数据的解析。策略回测和实盘保持一致,都是接收同样范围的 tick。

曾用 AllTick 的 WebSocket 来做接入测试,它的 symbol 划分非常清晰。一段简单的 Python 代码就可以完成订阅:

import websocket
import json

def on_message(ws, message):
    data = json.loads(message)
    symbol = data['symbol']
    print(f"{symbol} 实时价格: {data['price']}")

ws = websocket.WebSocketApp("wss://api.alltick.co/ws",
                            on_message=on_message)
ws.run_forever()

这样一个连接里同时监控多个贵金属,每个品种的行情逻辑都像独立策略一样干净。

强健的断线恢复与数据防守

量策略最怕连接闪断后错过行情。我在 on_close 中设置了指数退避重连,并在每次收到消息时先做 symbol 字段的非空判断,缺失则跳过并报警。这样做虽然保守,但能确保策略在极端行情下也稳定运行。

把混合 tick 拆开后,量化策略的代码结构一下子清晰了很多。黄金和白银的信号完全解耦,组合管理、风险控制都变得直观。对于社区里同样在做贵金属量化的朋友,我强烈建议在数据接入层就把这步做好,它会直接影响整个策略框架的健壮性。

84088f16281891338ec1fd09bc021f2c.jpg

评论