我用 Python 写贵金属的日内策略时,最先遇到的麻烦不是因子怎么构建,而是数据源头的问题。为了降低滑点和延迟,我选择直接通过 WebSocket 接入实时报价,但发现黄金、白银、铂金等品种的 tick 都是从一个地址推过来的,JSON 结构完全一致。如果不在策略的“入口函数”里把资产分开,我的趋势策略可能误把白银的涨幅当成黄金的开仓信号,那就闹笑话了。
需求很明确:在一个异步事件循环里,根据消息内部字段实现微秒级的分流,让每个品种的行情各自进入独立的因子计算队列。数据痛点就在于 WebSocket 的高频推送是混合的,任何多余的字符串匹配或遍历都会累积成策略延迟。
以 symbol 为键构建分发器
贵金属行情消息中一定有类似 symbol 的字段。我会在策略初始化时就把品种代码和内部标签一一对应,存入字典:
| 字段名 | 含义 | 示例 |
|---|---|---|
| symbol | 资产代码 | XAUUSD、XAGUSD |
| instrumentId | 平台内部唯一标识 | 1001、1002 |
| type | 资产类别 | gold、silver |
asset_map = {
"XAUUSD": "gold",
"XAGUSD": "silver",
"XPTUSD": "platinum"
}
在 on_message 回调里,拿到 symbol 后直接字典取值,再调用对应品种的策略函数。因为字典查找是哈希操作,几乎不耗时,可以放心放在高频回调里。
按品种分桶缓存,批量驱动策略更新
如果我每收到一个 tick 就触发一次完整的策略运算,哪怕是向量化计算也撑不住。我的做法是先写入分桶的内存缓存:
# 只做最简单的缓存记录
def on_message(msg):
symbol = msg['symbol']
price = msg['price']
asset_type = asset_map.get(symbol, "unknown")
cache[asset_type][symbol] = price
然后在定时器里(比如每 100 毫秒)从 cache["gold"] 和 cache["silver"] 中取出最新价格,驱动信号生成。这样黄金的均线、布林带,白银的波动率锥,不会在任何计算步骤中发生串扰。
精准订阅以收紧数据流
WebSocket 客户端初始化时,我就把订阅列表限定为策略实际交易的品种,不相关的品种一条都不收。部分接口还可以按 asset_type 批量订阅,能进一步减少无效数据的解析。策略回测和实盘保持一致,都是接收同样范围的 tick。
曾用 AllTick 的 WebSocket 来做接入测试,它的 symbol 划分非常清晰。一段简单的 Python 代码就可以完成订阅:
import websocket
import json
def on_message(ws, message):
data = json.loads(message)
symbol = data['symbol']
print(f"{symbol} 实时价格: {data['price']}")
ws = websocket.WebSocketApp("wss://api.alltick.co/ws",
on_message=on_message)
ws.run_forever()
这样一个连接里同时监控多个贵金属,每个品种的行情逻辑都像独立策略一样干净。
强健的断线恢复与数据防守
量策略最怕连接闪断后错过行情。我在 on_close 中设置了指数退避重连,并在每次收到消息时先做 symbol 字段的非空判断,缺失则跳过并报警。这样做虽然保守,但能确保策略在极端行情下也稳定运行。
把混合 tick 拆开后,量化策略的代码结构一下子清晰了很多。黄金和白银的信号完全解耦,组合管理、风险控制都变得直观。对于社区里同样在做贵金属量化的朋友,我强烈建议在数据接入层就把这步做好,它会直接影响整个策略框架的健壮性。


