各位量化同好,你们在把外汇策略从回测环境搬到实盘时,有没有发现一个怪象:策略里设定的信号触发逻辑非常清晰,可一到实盘,信号要么姗姗来迟,要么就在本该触发的时候却静悄悄。我开始也以为是滑点或网络问题,后来仔细排查,才发现病根儿出在实时行情数据的“订阅管理”上。今天就跟大伙儿深入聊聊这个实盘避坑指南。
从回测思维到实盘思维的转换
做回测时,我们都是用事先下载好的整段历史数据去喂策略,数据是静态的、完整的、确定的。但实盘是动态的:你的策略在运行过程中,对品种的关注集合是不断变化的。比如一个趋势策略,可能在盘整期只监控流动性最好的两大直盘,一旦某个交叉盘出现突破形态,就希望立刻把这个新品种纳入实时监控范围。
这就要求我们的行情接收模块,必须具备在不重启、不断开连接的情况下,灵活增删货币对的能力。如果不这样做,你就只能在启动时订阅一大堆“可能用到”的品种,白白消耗计算资源,拖慢核心策略。
我的解决方案:构建一套安全的本地订阅指令过滤器
我的做法,是在 WebSocket 数据接口之上,封装一层自己的“订阅管理器”。这个管理器内部用一个字典维护着所有当前已激活的货币对列表,它负责拦截来自策略的所有订阅和取消指令,并进行去重和校验。具体实现非常轻量:
import websocket
import json
# 收到行情后的回调
def on_message(ws, message):
data = json.loads(message)
# 这里接入你的核心策略
print("收到行情:", data)
ws = websocket.WebSocketApp("wss://api.alltick.co/realtime",
on_message=on_message)
# 本地维护的活跃订阅池
subscribed = {}
def subscribe(symbols):
# 过滤出尚未订阅的品种,避免重复请求
new_subs = [s for s in symbols if s not in subscribed]
if new_subs:
ws.send(json.dumps({"action": "subscribe", "symbols": new_subs}))
for s in new_subs:
subscribed[s] = True
def unsubscribe(symbols):
# 过滤出当前确实在订阅池中的品种
existing_subs = [s for s in symbols if s in subscribed]
if existing_subs:
ws.send(json.dumps({"action": "unsubscribe", "symbols": existing_subs}))
for s in existing_subs:
subscribed.pop(s)
ws.run_forever()
有了这个最小化封装,你策略里的任何逻辑都可以放心大胆地去调用 subscribe(["EURGBP"]) ,而不用担心短时间多次调用会被服务端判定为异常流量。因为它内部已经帮你把无效的、重复的操作给“吞”掉了。
实战中的两个关键优化
一是批量发送。假设策略条件触发,要同时加仓三个货币对并平掉另外两个的监控。不要一次发五条消息,而是整理成一条 subscribe 和一条 unsubscribe 消息(如果你的数据接口支持混合 action 就更方便),连续发出。这能大幅降低消息往返时延,也展现出更规整的通信行为。
二是尊重服务端的限流规则。量化交易里,我们总想快,但有些快是致命的。我之前在一个低流动性品种上做高频模式识别,策略在几秒内加了又撤、撤了又加。结果我的 IP 直接被服务端“冷静”了一段时间。现在的处理是,内存里设一个小的防抖缓冲,把短时间内的同品种操作直接合并为终态,效果非常好。
数据到手后的“轻量化”处理
数据进来以后,不要急着落盘。所有复杂运算,如算波动率、画通道,都基于内存中的最新 tick 缓存和滑动窗口完成。我是用单独的异步任务,每 500 毫秒将内存中累积的 tick 打包写入数据库,这样几乎对主策略没有性能影响。
总的来说,量化交易的真实门槛,有一半在策略,另一半就在这些承载策略的工程细节里。把数据管道控制得井井有条,你会发现策略的实盘表现和回测的差距,会被极大地缩小。


