全部
文章&策略
学习干货
问答
官方
用户头像Fxdund
2026-05-28 发布
金融行情(股票、期货、外汇、指数、基金)对实时性有着极致要求:端到端延迟需控制在毫秒级,数据吞吐量常达每秒数万条,且必须保证有序、不丢、不重。通用 WebSocket 保活策略在这样的场景下往往力不从心——心跳间隔太长会错过快速断线,重连策略太笨重会错过行情脉冲,流量控制太简单则会撑爆客户端。本文将针对金融行情特征,提供一套经过生产验证的优化方案。 一、心跳保活:不止是 Ping/Pong WebSocket 协议自身提供Ping/Pong控制帧,但很多网络中间件(Nginx、AWS ALB)会过滤或延迟处理这类帧,导致连接“假死”。因此,应用层心跳是更可靠的选择。 1.1 应用层心跳设计 客户端每隔一定时间发送业务 ping(例如{"type":"ping","ts":123456}),服务端回复pong。 间隔选择:2530 秒(兼顾 NAT 超时一般为 60120 秒,又不过度消耗资源)。 超时判定:连续 2 次心跳未收到pong,判定连接失效,立即触发重连。 RTT 监控:记录心跳往返时间,当 RTT 持续升高时,可提前预警或切换接入点。 1.2 代码示例 下面 iTick API WebSocket SDK 为例,在 SDK 基础上增加应用层心跳守护,实现双重检测。 import time import threading from itick_sdk import Client # 示例SDK,实际替换为你的API class HeartbeatGuard: def __init__(self, client: Client, on_dead_callback, interval=25, timeout=10): self.client = client self.on_dead = on_dead_callback self.interval = interval self.timeout = timeout self.last_pong = time.time() self._running = False self._thread = None def start(self): self._running = True self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def _run(self): while self._running: now = time.time() if now - self.last_pong > self.timeout: if not self.client.is_websocket_connected(): # 假设SDK提供此方法 self.on_dead() # 发送应用层ping(需要在SDK支持自定义消息时使用) try: self.client.send_websocket_message('{"type":"ping"}') except: pass time.sleep(self.interval) def record_pong(self): self.last_pong = time.time() 关键点:即使 SDK 内部已有 WebSocket 协议层的 Ping/Pong,额外增加应用层心跳仍能有效防止“连接假死”问题。 二、断线重连:指数退避 + 会话恢复 2.1 重连策略的核心要素 指数退避:避免重连风暴,初始间隔 1s,每次失败后翻倍,上限 30~60 秒。 随机抖动:给延迟乘以 0.8~1.2 的随机系数,防止大批客户端同时重连。 网络状态感知:监听online/offline事件,仅在网络可用时重连。 状态恢复:重连成功后,重新订阅之前的主题,并利用消息序列号(seq)拉取缺失数据。 2.2 带抖动和退避的重连实现 import random import time from itick_sdk import Client class ReconnectingClient: def __init__(self, token): self.client = Client(token) self.reconnect_attempt = 0 self.base_delay = 1.0 # 1秒 self.max_delay = 30.0 # 最大30秒 self.subscribed_symbols = [] # 保存订阅列表 self._manual_close = False def connect(self): # 假设SDK的连接方法 self.client.connect_websocket() self.client.set_on_close(self._on_close) def _on_close(self, code, reason): if self._manual_close: return self._schedule_reconnect() def _schedule_reconnect(self): # 指数退避 + 抖动 delay = min(self.max_delay, self.base_delay * (2 ** self.reconnect_attempt)) delay = delay * (0.8 + 0.4 * random.random()) print(f"Reconnecting in {delay:.2f}s (attempt {self.reconnect_attempt+1})") time.sleep(delay) self.reconnect_attempt += 1 self.connect() # 重连成功后重新订阅 if self.subscribed_symbols: self.client.subscribe(self.subscribed_symbols) def subscribe(self, symbols): self.subscribed_symbols = symbols self.client.subscribe(symbols) # SDK订阅方法 2.3 利用序列号实现断线恢复 金融行情要求数据不丢不重,建议每条推送消息携带递增的seq。客户端本地保存last_seq,重连时携带该值请求服务端回放缺失消息。 class SeqRecoveryClient(ReconnectingClient): def __init__(self, token): super().__init__(token) self.last_seq = 0 self.pending_messages = [] # 暂存乱序消息 def on_message(self, msg): seq = msg.get('seq') if seq == self.last_seq + 1: self._process(msg) self.last_seq = seq self._process_pending() elif seq > self.last_seq + 1: # 丢包,请求重传 self._request_retransmit(self.last_seq + 1, seq - 1) self.pending_messages.append(msg) else: # 重复消息,丢弃 pass def _process_pending(self): # 按序处理暂存队列 self.pending_messages.sort(key=lambda x: x['seq']) while self.pending_messages and self.pending_messages[0]['seq'] == self.last_seq + 1: msg = self.pending_messages.pop(0) self._process(msg) self.last_seq = msg['seq'] def _request_retransmit(self, from_seq, to_seq): # 发送重传请求 (需协议支持) self.client.send_websocket_message({ 'action': 'nack', 'from': from_seq, 'to': to_seq }) 三、流量控制:防止客户端被淹没 WebSocket 是全双工通道,服务端推送速度可能远快于客户端的处理能力。不加控制会导致内存暴涨、界面卡死甚至进程崩溃。 3.1 消息队列 + 速率限制 核心思路:将接收到的消息放入有界队列,由一个独立的消费者以固定速率(如每秒 100 条)取出处理。 from collections import deque import threading import time class FlowController: def __init__(self, max_size=500, rate_limit=100): self.queue = deque(maxlen=max_size) self.rate_limit = rate_limit # 每秒最大处理数 self.processed = 0 self.last_second = time.time() self.lock = threading.Lock() def enqueue(self, msg): with self.lock: if len(self.queue) == self.queue.maxlen: # 队列满,可丢弃或触发告警 return False self.queue.append(msg) return True def consume(self, callback): """在独立线程中循环调用""" now = time.time() if now - self.last_second >= 1.0: self.processed = 0 self.last_second = now with self.lock: available = self.rate_limit - self.processed count = min(available, len(self.queue)) for _ in range(count): msg = self.queue.popleft() callback(msg) self.processed += 1 3.2 优先级调度 行情数据中,tick(逐笔成交)的优先级远高于深度行情非首档数据。可以使用多个队列,按优先级处理。 class PriorityDispatcher: def __init__(self): self.high = deque() # tick self.medium = deque() # quote self.low = deque() # depth等 def dispatch(self, msg): if msg.get('type') == 'tick': self.high.append(msg) elif msg.get('type') == 'quote': self.medium.append(msg) else: self.low.append(msg) def process_one(self, callback): # 优先处理高优队列 if self.high: callback(self.high.popleft()) return True if self.medium: callback(self.medium.popleft()) return True if self.low: callback(self.low.popleft()) return True return False 3.3 背压(Backpressure)与服务端协商 当客户端积压超过阈值(如队列深度 > 200),可主动向服务端发送控制帧,请求降低推送频率或切换为批量推送。这需要协议层面的支持,例如: { "action": "slow", "reason": "queue_full" } 四、完整客户端骨架(基于示例 SDK) 将上述模块组合成一个健壮的客户端类: from itick_sdk import Client import threading class RobustWebSocketClient: def __init__(self, token): self.client = Client(token) self.flow_ctrl = FlowController(max_size=1000, rate_limit=200) self.dispatcher = PriorityDispatcher() self.heartbeat = None # HeartbeatGuard实例 self.reconnector = None # ReconnectingClient实例 # 设置回调 self.client.set_message_handler(self._on_raw_message) def _on_raw_message(self, raw_msg): # 首先入队流量控制 self.flow_ctrl.enqueue(raw_msg) # 如果SDK有应用层pong,需在此调用heartbeat.record_pong() def _consumer_loop(self): while True: # 由优先级调度器处理一条消息 self.dispatcher.process_one(self._handle_msg) time.sleep(0.001) # 1ms调度间隔 def _handle_msg(self, msg): # 业务逻辑,例如更新UI、存储等 pass def start(self): # 启动连接 self.client.connect() # 启动消费线程 threading.Thread(target=self._consumer_loop, daemon=True).start() # 启动心跳守护 self.heartbeat = HeartbeatGuard(self.client, self._on_connection_dead) self.heartbeat.start() def _on_connection_dead(self): # 触发重连 self.reconnector._schedule_reconnect() 五、可观测性与监控指标 生产环境必须暴露以下指标,用于排障和容量规划: 指标 含义 告警建议 heartbeat_timeout_total 应用层心跳超时次数 > 0 立即检查网络 reconnect_total 重连总次数 > 5 次/分钟 queue_overflow_total 队列溢出丢弃消息数 > 0 end_to_end_latency_p99 从发送到回调的延迟 > 200ms pending_queue_size 当前积压消息数 > 500 六、总结 低延迟推送优化是一项系统工程,单纯依赖 WebSocket 协议或 SDK 的默认行为远远不够。本文提供的三层优化策略: 心跳层:应用层心跳 + RTT 监控,快速发现假死连接。 重连层:指数退避 + 随机抖动 + 会话恢复,保证断线后快速、平滑地恢复数据流。 流量控制层:有界队列 + 速率限制 + 优先级调度,防止客户端被数据洪峰冲垮。 这些策略已在上千个生产节点中验证,能够显著提升弱网环境下的稳定性。最后,请根据业务场景调整参数:高频交易可缩短心跳至 10 秒,提高队列上限;普通资讯类则可适当放宽速率限制。 参考文档:https://docs.itick.org/sdk/python-sdk GitHub:https://github.com/itick-org/
浏览10
评论0
收藏0
用户头像sh_*197p2v
2026-05-27 发布
浏览32
评论5
收藏0
用户头像sh_*219t3e
2025-09-29 发布
之前我分享过一个小工具网站,支持国内主流量化平台,可以让 AI 直接帮你写各个平台的策略代码,直接生成可运行的策略代码,代码质量远高于直接使用 DeepSeek、Trae 等平台。上线之后获得了非常多朋友的好评。 大家可以直接用描述策略,然后一键生成可运行的完整策略代码,也可以把它当做一个API 查询工具。 AI工具平台:https://iris.findtruman.io/ai/tool/ai-quantitative-trading/ 我看平台正在开发SuperMind支持,很快就能支持同花顺了
浏览2875
评论67
收藏10
用户头像sh_****559rtx
2026-05-28 发布
量化路上的数据需求演变 作为一名跨境量化策略开发者,我的工作流通常分为回测和实盘两块。回测阶段数据需求很明确:需要什么区间、什么频率的 K 线,通过 REST 一次性拉取下来,干净利落。但策略一旦上线实盘,对行情的要求就从“完整”变成“及时”。任何毫秒级的延迟都会让信号失真,直接影响成交滑点和策略表现。 轮询 REST 遇到的实盘痛点 实盘初期我用定时器每隔 800 毫秒请求一次最新价,跑了两周发现两个致命问题:一是请求洪峰导致 CPU 占用过高,尤其在订阅多只股票时,并发请求让网络带宽捉襟见肘;二是“定时采样”的本质意味着我永远只能拿到离散的时间点价格,盘口发生的瞬时跳价可能被完全跳过。这种数据层面的失真,让许多原本优秀的日内动量策略实盘表现大打折扣。 从机制上理解两种接口的数据流 REST 是“请求-应答”同步模式,每次通信都需要完整的三次握手、请求、响应、断开。WebSocket 则是在一次 HTTP 升级后建立起全双工通道,此后服务端主动推送数据,客户端无需再发送请求。反映在行情场景中,一个是“人工间歇采样”,一个是“事件驱动连续流”。 功能拆解:回测用 REST,实盘靠 WebSocket 回测所需的历史行情——REST 的战场 策略回测对数据的要求是大批量、一次性加载。我用 REST 拉取历史分钟线作为特征计算的输入: import requests url = "https://api.example.com/stock/history" params = { "symbol": "AAPL", "interval": "1m", "limit": 200 } res = requests.get(url, params=params) data = res.json() print(data) 实盘实时 tick 流——WebSocket 的主场 进入实盘后,我会立即切换到 WebSocket 模式,订阅所有持仓标的的实时成交数据。AllTick API 提供的 WebSocket 通道可以同时订阅多只股票,推送结构统一,非常容易接入现有策略框架: import websocket import json def on_message(ws, message): data = json.loads(message) ticks = data.get("ticks", []) for tick in ticks: print(tick["symbol"], tick["price"], tick["volume"]) def on_open(ws): msg = { "action": "subscribe", "symbols": ["AAPL", "MSFT", "TSLA"] } ws.send(json.dumps(msg)) ws = websocket.WebSocketApp( "wss://apis.alltick.co/websocket-api/stock/transaction-quote", on_message=on_message ) ws.on_open = on_open ws.run_forever() 策略代码只需注册回调,接收到 tick 后立刻进行信号计算,真正做到了事件驱动。 差异速览表 维度 REST WebSocket 数据方式 请求响应 持续推送 连接状态 短连接 长连接 延迟表现 相对较高 更低 适用内容 历史数据、查询 实时行情、tick 系统角色 数据补充层 实时数据层 跨境量化系统的行业应用落地 在我的跨境量化架构中,REST 和 WebSocket 被明确分工:离线回测模块仅依赖 REST 历史数据;实盘交易引擎则完全挂载在 WebSocket 推送之上,同时额外使用 REST 做定时的账户、持仓查询。两者数据统一经过标准化的行情对象封装,使策略可以无感切换回测与实盘。 对于我们这类管理多市场、多标的的跨境量化团队,选择一套稳定、低延迟的实时行情源至关重要。我目前在用的 AllTick API WebSocket 服务,在开盘高峰期的推送稳定性与低延迟表现都经受住了考验,帮助我将策略滑点控制在理想范围内。
浏览18
评论0
收藏0
用户头像sh_**772oqg
2026-05-28 发布
在加密货币量化策略研发与实盘运行中,实时行情的连续性与稳定性直接影响信号生成、订单执行与回测可信度。WebSocket 作为主流的实时推送方案,在实际部署中常出现无告警、无异常日志的静默断开,导致行情中断、策略状态异常,是高频与日内策略必须解决的底层问题。 本文从量化实盘接入场景出发,对 WebSocket 断连根源进行系统性分析,并提供可直接部署的高可用连接方案,适用于行情推送、策略实盘、数据采集与回测补录等场景。 一、实盘场景与核心需求 在加密货币量化体系中,实时行情接入需满足以下稳定要求: 推送低延迟、连接高可用,支撑策略连续运行 网络波动、节点迁移后可自动恢复,不中断策略逻辑 重连后自动恢复订阅,保持行情与策略状态一致 断连、重连、异常事件可观测,便于回测与复盘 在公网、云服务器、NAT 网络等环境下,连接稳定性会被显著削弱,断连成为影响系统鲁棒性的关键因素。 二、WebSocket 静默断连的核心成因 1. 空闲超时资源回收 连接建立后若长时间无数据交互,网关、负载均衡或服务端会按策略释放闲置连接。该现象在弱网、云服务器环境中触发频率更高,是最常见的断连原因。 2. 心跳机制不匹配 服务端通过 Ping/Pong 保活机制判断连接有效性。客户端未按周期发送心跳、未正确响应服务端心跳,或心跳间隔与服务端要求不一致,均会被判定为失活连接并关闭。 3. 网络层被动中断 公网 NAT 超时、代理链路切换、网络切换、出口 IP 变动等底层网络波动,会导致 TCP 连接静默失效,应用层通常无法感知。 4. 服务端限流与过载保护 单连接订阅品种过多、消息吞吐量超限、会话时长达到上限、服务端连接数管控等策略,均可能触发无提示关闭,以保障集群稳定性。 三、高可用连接工程方案 为满足量化实盘的稳定性要求,需在客户端建立三层保活机制: 标准化心跳保活 按服务端规范固定周期发送心跳帧,监听响应状态;连续多次未收到确认则主动重连,避免被动超时。 指数退避重连 采用指数退避策略(1s→2s→4s→8s,上限 30s),避免密集重连触发限流,同时保证快速恢复。 订阅状态持久化与自动恢复 本地缓存订阅列表,重连认证完成后自动恢复订阅,实现行情无感知接续。 四、高可用连接实现示例 import json import websocket import time retry_count = 0 def on_open(ws): # 身份认证 ws.send(json.dumps({"action": "auth", "token": "YOUR_TOKEN"})) # 订阅目标品种行情 ws.send(json.dumps({"action": "subscribe", "symbol": "BTCUSDT"})) print("Connection established | Authenticated | Subscribed") def on_message(ws, message): tick_data = json.loads(message) # 行情数据注入策略逻辑/回测数据集 # print(tick_data) def on_close(ws, close_code, close_msg): global retry_count print("Connection disconnected, entering reconnection process") # 指数退避重试 delay = min(30, 2 ** retry_count) time.sleep(delay) retry_count += 1 start_connection() def start_connection(): ws = websocket.WebSocketApp( "wss://ws.alltick.co/quote-b-ws-api", on_open=on_open, on_message=on_message, on_close=on_close ) # 心跳配置:20秒心跳,10秒无响应判定超时 ws.run_forever(ping_interval=20, ping_timeout=10) if __name__ == "__main__": start_connection() 五、量化实盘部署最佳实践 所有实时行情连接必须启用心跳,不依赖默认保活机制 重连逻辑必须使用指数退避,降低服务端限流风险 单连接订阅品种数量控制在合理范围,避免过载断连 对断连、重连、心跳异常进行日志埋点,支持复盘与回测对齐 生产环境建议分离行情订阅与策略执行,提升容错能力 六、总结 WebSocket 长连接并非永久可用,其稳定性依赖心跳保活、重连策略与状态管理。在加密货币实时行情场景中,网络波动与服务端约束会显著提升断连概率,静默断开属于常态,稳定运行是工程化设计的结果。 通过标准化心跳、指数退避重连与自动恢复订阅,可显著提升行情接入可用性,保障量化策略在实盘与回测环境下的数据一致性与运行稳定性。
浏览22
评论0
收藏0
用户头像9点半量化
2026-05-28 发布
如果你正深陷一月份以来“商业航天”或近期“新闻出版”板块的长久套牢中,账户回撤让你在割肉与死扛间反复煎熬,请务必保持专注。作为一名长期从事量化交易的策略专家,我必须告诉你:解套不是靠​**“等”出来的,而是靠​“剥离成本”**动态实现的。 这套“2.5%做T法则”是我当年在券商任职期间,针对上百个交易模型进行压力测试后,筛选出的风险极小、盈亏比(Risk-Reward Ratio)最高的策略。其核心胜率高达90%,是专为长期被套(3个月以上)投资者设计的系统化“自救方案”。 核心逻辑:机械执行击败预测幻觉 职业交易员与散户的分水岭不在于预测的准头,而在于执行的纪律。2.5%法则的本质是通过固定参数对冲情绪化交易。 **1.**抛弃预测走势: 散户亏损源于盲目猜测高低点。该法则要求“机械执行”,将2.5%设定为触发指令的阈值,无需盯盘猜测,跌到位即进,涨到位即出。 **2.**高期望值模型: 这个2.5%的基准线是基于海量数据回测得出的最优波动参数,旨在波动的震荡中稳健赚取差价,逐步摊薄成本,实现从“死守”到“动态盈利”的质变。 数学拆解:如何通过2.5%实现“一箭双雕”? 定量化分析是解套的唯一出路。我们以**正T策略(先买后卖)**为例,通过具体数据看成本下移的效果: **1.**初始状态: 假设你持有1000股底仓,成本价100元。你预留了一部分“T仓位”资金。 **2.**触发买入: 当股价跌至成本价以下0.3%,且单日跌幅接近2.5%(即跌至97.5元)时,投入T仓位的一半资金(约1万元)加仓100股。 **3.**触发卖出: 当股价反弹至加仓价的2.5%上方(回升至100元)时,果断卖出加仓的100股。 **4.**战果审计: 这一买一卖间,你不仅在日内锁定了250元的现金利润,更关键的是,你的底仓成本已从100元降至约99.75元。股数未变,成本已降。 实战指南:正向与反向的操作边界 在应用此策略时,必须明确市场环境与底仓状态: 1.正T(先买后卖): 适用于趋势向上的行情,容错率极高。铁律: 必须持有底仓,且持有时间越长、股性越熟越好。千万不要在刚建仓时就急于做T,那是在增加敞口风险。 2.反T(先卖后买): 适用于弱势震荡行情。当股价触及5日或10日均线压力位,涨幅接近2.5%时,先卖掉不超过30%的底仓;待回调2.5%左右时接回,确保股数不变,成本直线下降。 职业风控:纪律是唯一的护城河 量化策略的生命线在于风控。想要掌握“龙战交易体系”的精髓,必须死守以下红线: **1.**永不满仓: 满仓即意味着失去操作的容错空间。 **2.****1%**止损: 若做T操作失误导致亏损超过1%,必须果断截断损失,严禁死扛。 **3.**不留隔夜仓: 所有的做T操作必须在日内闭环,规避次日不可控的跳空风险。 结语: 掌握硬逻辑比获取代码更有价值。建议你将此策略保存并反复推演。如果你想更深层次地提升认知维度,避开市场收割,欢迎进入“龙战交易体系”进阶。关于大家常问的技术与逻辑细节,我已全部系统化整理。请看我个人主页置顶视频的第一条,在5分钟的位置,你就能找到“回家的路”。
浏览29
评论0
收藏0
用户头像sh_*056uc6
2026-01-28 发布
做超短或者量化交易,对股票接口的稳定性和实时性要求很高,之前做量化交易,一直苦于股票数据接口不稳定,获取股票数据的实时性也不够,导致自动化交易失败,错过了很多宝贵的机会。 整理了常用到的十个股票实时行情接口,包括实时K线数据,分钟级别的K线以及日线,分笔数据、资金流数据等,都非常实用。 1、实时K线数据 获取沪深A股和ETF实时K线数据。目前支持沪深京A股和ETF基金,对应请求参数synbol为stock、etf; 目前K线级别支持5分钟、15分钟、30分钟、60分钟、日线、周线、月线、年线,对应的请求参数period分别为5m、15m、30m、1h、1d、1w、1mon、1y;除权方式有不复权、前复权、后复权,对应的参数cq分别为1、2、3;包年版支持all参数获取盘后全市场数据,仅限近一周内的日线数据。 数据更新:实时数据交易时间段实时更新,历史数据收盘后3:30更新,all参数历史数据盘后6:00更新。 示例请求: http://api.fxyz.site/wolf/time/kline?symbol=stock&code=000001&period=1d&cq=1&startDate=2026-01-19&endDate=2050-01-01&token= 2、资金流数据 获取沪深A股资金流向数据。资金流数据区分主买、主卖、特大单、大单、中单、小单等。 数据更新:历史数据盘后6:00更新 示例请求: http://api.fxyz.site/wolf/money?code=000001&tradeDate=2026-01-19&token= 3、实时指标数据 获取沪深A股实时行情数据。目前支持沪深京A股和ETF基金,对应请求参数synbol为stock、etf。提供涨速、涨跌幅、换手率、振幅、量比、内盘、外盘、ROE等行情指标数据,适用于投资研究、量化交易。包年版支持all参数获取盘中全市场实时数据。 数据更新:实时数据交易时间段每1分钟更新。 示例请求: http://**api.fxyz.site/wolf/time?**symbol=stock&code=000001&token= 4、涨跌停板 获取盘中涨停板实时数据。 数据更新:实时数据交易时间段每1分钟更新,历史数据收盘后3:30更新。 示例请求: http://**api.fxyz.site/wolf/zt?**tradeDate=2026-01-19&token= 5、日线快照 获取沪深A股和ETF实时日线行情数据。目前支持沪深京A股和ETF基金,对应请求参数synbol为stock、etf。包年版支持all参数获取盘中全市场实时数据。 数据更新:实时数据交易时间段实时更新。 示例请求: http://api.fxyz.site/wolf/time/day?symbol=stock&code=000001&token= 6、买卖五档 获取沪深A股和ETF买卖五档实时行情数据。目前支持沪深京A股和ETF基金,对应请求参数synbol为stock、etf。 数据更新:实时数据交易时间段实时更新 示例请求: http://api.fxyz.site/wolf/time/five?symbol=stock&code=000001&token= 7、逐笔交易 获取沪深A股逐笔交易数据。 数据更新:历史数据盘后6:00更新 示例请求: http://**api.fxyz.site/wolf/deal?**code=000001&tradeDate=2026-01-19&token= 8、分价数据 获取沪深A股分价数据。 数据更新:历史数据盘后6:00更新 示例请求: http://api.fxyz.site/wolf/price?code=000001&tradeDate=2026-01-19&token= 9、股票列表 获取股票的代码列表。flag取值范围:0-所有股票,1-深交所股票,2-上交所股票,3-北交所股票,4-指数,5-创业板股票,6-科创板股票,7-ETF,8-ST股票,9-退市股票 数据更新:历史数据收盘后六点更新。 示例请求: http://**api.fxyz.site/wolf/list?**flag=0&token= 10、炸板 获取盘中炸板实时数据。 数据更新:实时数据交易时间段每1分钟更新,历史数据收盘后3:30更新。 示例请求: http://api.fxyz.site/wolf/zb?tradeDate=2026-01-19&token= 参考文档:http://www.fxyz.site/#api-docs
浏览2388
评论5
收藏2
用户头像sh_***174w0d
2026-05-27 发布
最近在研究五福及社区多位大佬的ETF轮动策略后,我做了一次系统性整合,对几条策略的核心交易逻辑进行了深度杂交优化,最终形成了这套「ETF双池平滑动量轮动」。 今天,我将为大家深度拆解这条策略的核心逻辑。该策略在传统动量轮动的基础上,引入了静态+动态双池融合、加权平滑动量打分、双均线趋势过滤、行业分散机制以及严格的止损和防御机制,极大地增强了策略的实战鲁棒性。 策略参数速查表 在深入讲解逻辑之前,先把核心参数列出来,方便大家快速掌握调参空间: 参数 默认值 说明 持仓数量 1只 单满仓轮动,回测数据均基于此设置 动量计算周期 25天 越短越灵敏,越长越稳健 短期均线 MA20 趋势过滤用 长期均线 MA60 趋势过滤用 止损比例 8%(成本价92%触发) 触发后立即清仓 放量阈值 5日均量的2.5倍 超过则剔除或清仓 动态池大小 全市场成交额前100 日均成交额≥5000万 防御ETF 511880 银华日利 无目标时自动切换 下面,让我们逐一剖析这套策略的全部交易逻辑。 一、 标的池构建:静态精选 + 动态流动性捕捉 传统的ETF策略通常只在一个固定的池子里轮动,这就容易错失市场突然爆发的新热点。本策略创新性地采用了**“双池融合”**架构: 静态核心池(防守与底仓): 由130+只核心ETF组成。涵盖了宽基(沪深300、中证1000等)、核心赛道行业(半导体、医药、新能源等)、以及跨境资产(纳指、日经、标普等)。这保证了策略在任何时候都有主流资产作为基本盘。 动态流动性池(进攻与捕捉热点): 市场热点在哪里,资金就在哪里。策略每日盘前会扫描全市场所有的ETF,提取过去5日平均成交额大于5000万元的前100只ETF加入动态池。这种动态优选机制,确保了策略能够自动跟踪近期资金最活跃、流动性极佳的标的。 融合去重: 最终的候选池是“静态池”与“动态池”的并集,去重并剔除掉用于现金替代的防御型ETF(如银华日利),形成最终的“融合池”。 二、 三重核心筛选过滤逻辑 有了候选池后,策略并没有直接计算涨跌幅,而是进行了极其严苛的三重筛选: 1. 趋势护航:双均线过滤(MA20 & MA60) 动量策略最怕在熊市中接飞刀。策略引入了经典的双均线趋势判定: 条件:当日收盘价必须大于短期均线(20日),且短期均线必须大于长期均线(60日)。 作用:只有在多头排列(至少是中期多头)的标的才允许参与评分,从源头上过滤掉了处于下降通道或弱势震荡的ETF。 2. 核心打分:加权对数平滑动量(R²评分) 传统动量仅比较首尾涨跌幅,容易选到“上蹿下跳”的妖基,一买就回调。策略使用了更科学的平滑动量: 对数收益加权回归:取过去25天的收盘价,取自然对数后进行线性回归。并且赋予近期价格更高的权重(权重从1递增到2),让近期趋势主导斜率。 计算年化收益率(Annual Return):由回归直线的斜率计算得出。 拟合优度惩罚(R²):这是策略的灵魂。R²衡量的是价格曲线有多“直”。如果一只ETF稳步上涨,R²接近1;如果是暴涨暴跌,R²会很低。 最终得分 = 年化收益率 × R²。 结果:过滤掉得分过低或极度夸张的异常值(有效得分范围限制在0~5之间),选出上涨最平稳、动量最强劲的品种。 3. 量价异动排雷:成交量极值过滤 放巨量往往是主力资金分歧或出货的标志,容易形成阶段性顶部。 盘中动态预估:策略能够根据盘中交易的时间进度,推算全天的预估成交量。 放量过滤:如果当天的预估成交量超过了过去5天平均成交量的2.5倍,即使动量得分再高,也会被无情剔除,防止高位接盘。 三、 资金管理与行业分散机制 在确定了得分最高的标的后,买入阶段同样充满了细节: 行业分散限制: 策略内置了一个ETF主题分类词典(如:半导体、医药、消费、跨境、宽基等)。在按照得分从高到低选取目标时,逻辑上会优先选取不同行业的ETF,以避免同质化标的扎堆。 实战避坑提示:这里必须坦诚说明,经过大量回测验证,强行加上行业分散的实际收益效果并不明显,有时甚至会拉低收益。如果您的参数设置是多只持仓,可以尝试开启此功能防范黑天鹅;但如果您像我们默认设置的一样是单满仓轮动(只买1只),那么这个开关开不开启都无所谓,系统会直接锁定全市场动量最强劲的那一只龙头猛攻! 目标再平衡: 当持仓标的仍在目标池中时,策略不会频繁全量买卖。而是检查当前持仓市值是否偏离目标市值的15%以上。如果有较大偏离(跌了),则进行再平衡补仓,否则持股不动,节省交易成本。 四、 卖出、止损与防御切换逻辑 策略的卖出逻辑不仅看动量,还设置了坚固的风控网: 绝对止损(防黑天鹅)****: 买入时记录每一只ETF的加权平均成本价。一旦盘中价格跌破成本价的92%(即亏损8%)**,立即无条件触发止损清仓,绝不扛单。 放量清仓(防高位反转): 如果在持仓期间,盘中预估成交量突然放大到5日均量的2.5倍以上,说明资金出现巨大分歧,策略会提前获利了结或离场观望。 动量衰退调仓: 如果在每日的轮动计算中,持有的ETF掉出了前N名(N为持仓数量),说明其动量已被其他品种超越,策略会将其清仓,为新龙头腾出资金。 防御型ETF避险(空仓替代): 如果市场极度恶劣,所有的ETF都未能通过均线过滤或动量积弱(即没有计算出任何有效目标),策略绝不强行买入。此时,资金会自动切换买入防御ETF(如511880银华日利),获取类现金的低风险日结收益,耐心等待市场转暖。 五、 智能执行细节 避免涨跌停及停牌陷阱:策略在执行所有买卖订单前,都会判断标的是否停牌、是否已达涨停(不买)、是否已跌停(不卖),并自动检查T+1持仓限制,防止产生无效订单。 执行时间优化:将卖出动作设在 13:07,买入设在 13:10,避开早盘的剧烈波动,在下午趋势相对明朗时进行调仓。盘前 09:20 与收盘 14:59 都有严格的持仓同步对账机制,确保系统记录与真实账户完全一致。 六、 回测表现:短跑爆发与长跑稳健 ⚠️ 特别说明:以下所有回测数据均基于单只满仓轮动(持仓数量=1)的配置。模拟盘******采用的是3只分仓,收益有所降低但回撤也更小,后文有详细说明。 ? 亮点呈现:2026年以来表现(2026.01 - 2026.04.22) 今年以来的行情中,双池平滑动量展现了极强的"追击热点"能力,短短不到4个月,策略收益突破62%,年化收益更是飙升至453%! 指标 数值 备注 策略收益 62.61% 不到4个月 策略年化收益 453.97% ? 超额收益 56.86% 同期基准仅 3.67% 最大回撤 15.18% 最大回撤区间 3/24-4/8 夏普比率 8.288 极强的风险收益比 索提诺比率 12.625 — 阿尔法 4.532 — 胜率 / 盈亏比 63.3% /1.915 赢31次,亏18次 信息比率 7.555 — 穿越牛熊:六年期长期回测(2020.01 - 2026.04.22) 如果说短期的爆发可能是运气,那么长期的稳健才是策略真正的试金石。在长达6年的回测中(经历多轮牛熊切换),策略实现了超11倍的收益: 指标 数值 备注 策略收益 1166.05% 超11倍 策略年化收益 51.57% — 超额收益 980.60% 同期基准仅 17.16% 最大回撤 35.13% 2024年6月-9月极端行情 夏普比率 1.266 — 索提诺比率 1.937 — 阿尔法 0.486 — 胜率 / 盈亏比 49.3% /1.417 赢364次,亏375次 信息比率 1.404 — 总结: 从长期表现看,这套策略的胜率稳定在50%左右,它并不是"把把都赢"的圣杯,而是典型的"截断亏损,让利润奔跑"。通过**"均线+R²平滑"保证趋势可靠性,通过"双池融合"保证标的流动性,再辅以"异动量过滤+8%绝对止损+行业分散+无标的防御"**,构筑起了能在2026年打出惊人爆发力的立体风控体系。 七、 跟踪与交割单开源 目前这条策略我已经正式挂载了模拟盘,并接入了 [9db智能体竞技场](一个第三方策略信号跟踪平台,可查看策略每日真实交割单与持仓动态,完全透明可验证)。欢迎感兴趣的朋友前往围观! 参数说明: 上文回测中展示的是"单只满仓轮动"的极限爆发数据;而在我自己的模拟盘实战中,考虑到**心态管理,我将参数调整为了"同时持有 3 只 ETF"的分仓模式。分仓之后收益率确实比单仓降低了不少,但最大回撤也随之收窄,心态会更稳。先求活,再求快! 希望这篇深度拆解能对大家的量化策略开发有所启发!如果觉得有收获,欢迎点赞 + 关注,后续我会持续分享更多策略开发经验。欢迎在评论区留言,一起探讨ETF轮动策略的进阶优化方向!?
浏览91
评论0
收藏1
用户头像me_361829775857
2026-05-27 发布
昨晚跑因子又把内存给干爆了,一看原来是Level 2的逐笔数据没处理好。今天就跟大家盘盘这类高频数据到底有啥,以及怎么用代码把它“请”下来。 数据主要分两大类,一个是五档行情,另一个是Level 2逐笔。 五档行情大家应该熟,就是买卖盘口的前五个价位和挂单量。但高频的五档数据是“快照”,每秒可能有多条,能看出盘口的瞬间变化。 Level 2逐笔数据就“碎”多了,它记录每一笔成交和委托的明细。比如你看到一笔100手的成交,在逐笔数据里能看到它是分几笔、在什么价位上完成的,甚至能看到是主动买还是主动卖。这对理解资金真实流向特别关键,以前只看K线总觉得隔靴搔痒。 简单对比一下: 数据类别 主要包含什么 特点(个人感受) 五档快照 时间、买卖五档价/量、最新价、成交量 像定时的现场照片,能看到盘口压力,但看不到过程。 Level 2逐笔 逐笔成交(时间、价格、成交量、买卖方向)、逐笔委托 像现场监控录像,每一笔资金动作都清楚,但数据量巨大,硬盘杀手。 想自己动手分析,得先有数据源。我之前用过一个叫CMES金融数据库的,数据比较规整,省去了自己清洗的麻烦。它提供了Python接口,用起来还算方便。 # CMES金融数据库的行情数据接口示例 # 注意:需要先pip安装,入参是合约代码和日期,调用频率别太高 import cmes_data as cd # 获取某股票某天的Level2逐笔成交数据 # 数据字段包括时间、价格、成交量、成交额、买卖方向等 data = cd.get_l2_transaction(symbol='000001.SZ', trade_date='20240515') print(data.head()) 字段很多,挑几个核心的说说。逐笔成交里,买卖方向这个字段很重要,能直接看出是主动买入推上去的,还是主动砸盘卖出的。五档数据里的委托总量变化,结合价格看,能感觉出是真心想买还是在“画图”。 对了,新手不建议一上来就怼着Tick数据搞,真的容易懵。先从分钟线或者日线找感觉,等策略框架稳了,再用高频数据去抠细节、优化信号。数据是金矿,但也得先有把像样的铁锹。 好了,大概就这些。数据具体怎么用,还得看你的策略逻辑。我得去清理我的硬盘了…
浏览36
评论0
收藏0
用户头像sh_****559rtx
2026-05-27 发布
量化策略跑得再漂亮,如果数据进的管道不顺畅,所有的模型都只是纸上谈兵。我在做实盘交易和行情监控的初期,最让我头疼的就是怎样实时、完整地把全市场A股的逐笔成交数据拉进来。几千只股票,用轮询方式一个个调接口,根本不能满足策略对时效性的苛刻要求。曾经用多线程并发请求,也很快因为频控和服务器压力而碰壁。 回头来看,解决这个问题的核心不在于“请求得有多快”,而在于选择“谁主动推送”这一架构。采用了WebSocket批量订阅后,整个数据链路才变得可靠且低延迟。 轮询方案在量化实战中为何注定失败 轮询模式本质上是同步、被动的请求-响应循环。你必须在代码中设置一个定时器,不断向服务器索取每只股票的最新价。问题是,A股市场逐笔成交非常密集,尤其是在开盘、尾盘和异动拉升的瞬间,tick爆发式增长。等你轮询走完一圈,大量tick已经埋没在时间的长河里。 WebSocket推送则是全双工、异步的。你只需跟服务器说一声:“我要这些股票的实时成交”,之后每一笔买卖都会自动推送到你的客户端。你既不用操心频率控制,也不会因为等待回复而错过任何关键的信号。最关键的是,不管订阅多少标的,你和服务器之间始终只用维持一条TCP连接,资源开销极低。 行情API的批量订阅协议 不同厂商的行情接口在订阅格式上有些细微差别,但通常就以下两种主流形态: 订阅方式 消息格式 适用场景 数组模式 ["000001","000002","600036"] 策略动态生成股票池,代码处理直观 字符串拼接 "000001,000002,600036" 参数传递便捷,某些网关接口采用 更高阶的接口还支持全市场通配符订阅,你只需声明订阅“全部A股”,就能接收所有正常交易品种的逐笔数据。这类权限一般需要向数据服务商单独申请。 一次订阅多只A股的Python实现 我曾在自己的回测和实盘体系里接入过类似AllTick的行情接口,下面的代码是简化后的核心订阅逻辑。通过WebSocket连接,一次性发送10只股票的订阅指令,之后所有tick都将自动到达。 import websocket import json # 推送逐笔成交的WebSocket端点 url = "wss://apis.alltick.co/websocket-api/stock-websocket-interface-api/transaction-quote-subscription" def on_message(ws, message): # 解析tick数据包 data = json.loads(message) # 提取逐笔成交列表 for tick in data.get("ticks", []): print(f"代码:{tick['code']} 价格:{tick['price']} 时间:{tick['time']}") def on_open(ws): # 构建订阅指令,一次订阅10只代表性A股 sub_msg = { "action": "subscribe", "symbols": ["000001", "000002", "600036", "600519", "000858", "002415", "300750", "601318", "000333", "002594"] } ws.send(json.dumps(sub_msg)) ws = websocket.WebSocketApp(url, on_message=on_message) ws.on_open = on_open ws.run_forever() 脚本启动后,这10只股票的每一笔成交都会毫秒级地抵达你的策略逻辑层,完全无须担心数据延迟。 策略端如何消化实时tick流 当你的订阅范围从几十只扩大到全市场,tick的流速将呈指数级上升。为了让策略引擎稳定运行,我一般会做以下优化: 分片并行消费:以股票代码为Key进行哈希分区,将tick分流到多个处理线程,每个线程只负责固定的一部分股票,避免锁竞争。 最新价内存快照:维护一个全局字典current_price["000001"]=最新成交价,让信号计算模块可以极快读取,不需要遍历tick流。 批量落库:采用批量INSERT或者使用时序数据库的缓冲写入特性,每积累100条tick或每隔200毫秒刷一次盘,大幅降低磁盘压力。 事件驱动信号:只在价格变化超过滑点阈值时触发策略重新计算,其余微小幅跳直接忽略,提升信号稳定度。 全市场订阅的实战注意事项 如果真的需要对全市场进行实时扫描,有两项硬性条件必须满足。 其一,本地的处理能力要够。我在实战中见过单秒三四百条tick的情况,如果CPU或网络带宽跟不上,就必须在前面加一层消息队列(比如Kafka)作为缓冲,让策略按部就班地消费。 其二,一定要利用行情接口的过滤功能。很多API支持“仅推送有成交记录”的模式,能过滤掉大量无交易时段的无用推送,流量直接砍半。 量化路上的数据选型感悟 想要在A股市场做实盘量化,稳定高效的实时行情管道是地基中的地基。WebSocket全双工订阅模式,是我目前认为最平衡的方案。不要再浪费时间重构多线程轮询,那种方式在全面市场的颗粒度下根本走不通。 选型方面,重点考察数据接口的批量订阅能力、推送延迟、全市场权限开放程度和运行稳定性。我的习惯是先拿一小部分股票进行压力测试,验证推送延迟和系统吞吐达到策略要求,再逐步放开到全量。有了像AllTick这类接口的弹性订阅支持,就可以很方便地进行这种渐进式验证,让整个量化系统上线之后安然无恙。
浏览47
评论0
收藏0