在开展外汇量化策略研发、行情数据回测、实时模型推演的过程中,长连接数据的连续性,是保障量化模型有效运行、回测结果精准的核心前提。我在长期的项目落地与参数调优中发现,多数量化策略出现的数据采样断层、实时信号缺失、回测数据失真等问题,并非策略逻辑漏洞,而是WebSocket长连接的心跳参数配置适配性不足导致。 跨境外汇数据传输存在固有的链路延迟与网络扰动特性,不合理的心跳参数,会直接造成长连接异常断开,破坏行情数据的时序完整性,进而影响实盘监控与历史回测的准确性。经过多轮不同网络环境、不同订阅负载的对照测试,我梳理出一套兼顾连接稳定性与资源负载的标准化配置方案,能够在不冗余消耗带宽、服务资源的前提下,保障7×24小时行情数据持续稳定采集。本次所有参数实测与策略验证,我均依托AllTick API外汇实时WebSocket服务完成,适配多数量化开发场景。 在高负载量化场景中,也就是同一时段批量订阅多组外汇货币对行情数据时,心跳参数的适配性会被进一步放大。我初期沿用组件默认参数配置时,频繁出现无规律断连问题,导致量化数据采样中断。经过系统性的参数迭代与策略优化,终于确定了适配外汇量化场景的心跳参数合理区间,大幅提升了整套量化数据服务的可用性。 一、外汇量化场景下,心跳参数配置的核心依据 WebSocket心跳保活机制的底层逻辑,是通过客户端周期性发送校验数据包,告知服务端当前连接处于有效工作状态,规避空闲连接被服务端自动回收切断的问题。对于外汇量化数据采集、实时策略运行场景,心跳间隔的调试需要结合三大核心维度,贴合量化业务实际需求: 1. 跨境网络传输特性:外汇行情数据属于跨境传输数据,链路极易出现短时抖动、延迟波动。若心跳周期设置过长,单次轻微网络扰动就会造成连接失效,打断时序数据采集。 2. 接口服务规范约束:市面上主流的外汇实时数据接口,均有官方明确的心跳适配标准与频率要求,参数配置必须贴合接口规范,才能规避合规性断连、限流等问题。 3. 量化业务负载规模:高频次心跳推送会持续占用网络带宽与服务算力。在多货币对同步订阅、全天候实时采样的高负载量化场景中,过于频繁的心跳请求,会大幅提升服务运行负载,影响整体服务稳定性。 结合我大量线上实测与回测配套测试结果,10秒至30秒是适配绝大多数外汇量化场景的心跳安全区间。10秒以下的短间隔配置无法带来实质性的稳定性增益,只会造成带宽资源无效损耗;30秒以上的长间隔配置容错率极低,无法抵御跨境网络的短时波动,极易被服务端主动切断连接。 二、不同心跳间隔区间的量化场景适配测评 为了给量化开发、策略研究提供可落地的参数参考,我针对三类主流心跳间隔区间,模拟了常规网络、弱网波动、高负载订阅等多数量化运行场景,整理出各区间的适配优势与短板,方便大家根据自身业务场景选型: 心跳间隔区间 场景优势 存在短板 5~10秒 连接保活容错率极高,几乎无断连情况,可适配超高精度、零数据缺失的高频量化采样场景 心跳请求频次过高,带宽与算力消耗大,多币种高负载订阅场景下,易造成服务资源冗余过载 10~30秒 稳定性与资源消耗平衡度最优,适配95%以上的外汇量化回测、实时策略运行、行情监控场景 极端弱网、高延迟的恶劣传输环境下,存在极低概率的短暂连接中断情况 30秒以上 最大程度节约带宽与服务资源,适配低频行情观测、非实时数据统计等轻量场景 网络波动容错能力极差,轻微链路扰动就会触发断连,导致时序行情数据断层,影响回测精准度 三、适配量化业务的动态心跳调控策略 固定心跳间隔仅能适配单一稳定网络环境,无法匹配量化研究中复杂多变的运行场景。为兼顾数据连续性与资源利用率,我在量化项目中统一采用**动态自适应心跳调控策略**,替代传统固定参数配置,实战效果提升显著: 常规稳定传输、行情数据正常推送时,默认采用15秒标准心跳间隔,平衡稳定性与资源消耗; 系统监测到网络延迟升高、数据推送停滞、链路抖动等异常状态时,自动将心跳间隔缩短至10秒,强化连接保活能力,规避断连风险; 待网络链路恢复稳定、行情时序数据正常连续推送后,逐步回调至15秒默认参数,减少无效资源消耗。 相较于静态固定参数,这套动态调节机制能够自适应跨境网络波动,既保障了量化数据采集的完整性,又不会产生冗余带宽消耗,更适配长期运行的量化策略服务。 四、Python 心跳保活机制完整实操代码 以下是我在量化项目中稳定复用的WebSocket心跳保活代码,通过独立异步协程实现心跳定时推送,与行情订阅、数据解析逻辑解耦,适配外汇实时数据常态化采集需求: import asyncio import websockets import json async def send_heartbeat(ws, interval=15): while True: heartbeat_msg = json.dumps({"type": "ping"}) await ws.send(heartbeat_msg) await asyncio.sleep(interval) async def subscribe_forex(): url = "wss://api.alltick.co/realtime/forex" # 以 AllTick API 为例 async with websockets.connect(url) as ws: # 启动心跳 asyncio.create_task(send_heartbeat(ws, interval=15)) # 订阅货币对行情 subscribe_msg = json.dumps({ "action": "subscribe", "symbols": ["EURUSD", "USDJPY"] }) await ws.send(subscribe_msg) while True: message = await ws.recv() data = json.loads(message) print(data) asyncio.run(subscribe_forex()) 代码核心设计遵循轻量化、高解耦的量化开发原则,单独开启协程承载心跳任务,不占用数据接收与处理主线程资源。默认15秒的心跳参数,经过长期线上量化场景实测,可适配绝大多数跨境网络环境,有效保障实时行情采集的连续性。 五、量化服务长效稳定的配套优化要点 精准的心跳参数配置是连接稳定的核心,但想要实现量化系统7×24小时无人值守稳定运行,保障回测、实盘数据零缺失,还需搭配多项配套优化策略: 1. 接入自动重连逻辑:即便心跳参数最优,极端网络故障、服务临时波动仍会引发断连。配置自动重连机制,可实现服务自主恢复,避免长时间数据采样中断。 2. 设置心跳失败阈值:建议设定3至5次连续心跳失败判定规则,多次ping包无响应时,主动终止无效连接并触发重连,避免无效占用链路、触发服务端防护机制。 3. 保持心跳数据包轻量化:多数外汇数据服务仅需基础ping数据包即可完成连接校验,无需携带冗余自定义数据,最大程度降低传输延迟与解析开销,适配高频量化采集场景。 我长期采用这套方案运行多币种实时行情订阅服务,量化数据采集连续性大幅提升,几乎无需人工监控日志排查异常,有效提升了策略研发与数据复盘效率。 六、实战研究总结 综合网络传输环境、接口规范要求、量化业务负载及长期实测数据来看,15秒是外汇实时WebSocket心跳间隔的最优基准参数。研究者可根据自身网络质量、订阅币种数量、策略运行需求,在10~20秒区间内做精细化微调。其核心优化逻辑,是在维持长连接持续活跃、保障时序数据完整的基础上,不额外增加网络带宽与服务算力负担。 在量化研究领域,相比于复杂的策略算法、数据清洗逻辑,心跳参数这类底层基础配置,往往是决定实盘稳定性、回测精准度的关键。稳定的长连接数据链路,能够彻底规避行情断连、数据缺失带来的模型偏差与策略失效问题,是外汇量化开发、行情分析、策略迭代过程中,性价比极高的基础优化项。 最近我专门针对 Supermind 平台的AI 量化代码生成平台进行了优化改进,现在效果比市面上的 DS、豆包等工具好很多。 👉 SuperMind AI量化代码生成平台 这个工具最大的特点是直接和 AI 对话就能生成完整可运行的Supermind量化策略代码。你不需要懂 Python、C# 或策略 API,只要用自然语言描述你的交易逻辑,比如:“当5日均线向上突破20日均线时买入,反向时卖出。” AI 就会自动帮你生成完整策略代码,并能直接在平台上运行。 相比于通用大模型的输出,这个平台针对量化交易进行了专门优化生成的代码结构更清晰,逻辑更准确,对策略逻辑的理解更接近量化开发者的思路,并且可用作 API 查询或策略自动生成工具 之前上线后,很多朋友反馈代码质量和可运行性都非常高,几乎不需要再手动修改。现在我们的AI量化代码生成平台已经全面支持 Supermind,你可以直接体验。如果你之前在用 DS、豆包等平台,不妨试试看这个版本,可能会刷新你对AI 写量化策略的想象。 在外汇量化策略研究、高频因子计算与回测验证过程中,历史分钟 K 线因周末、节假日休市产生的数据缺口,是影响时序连续性、指标有效性与回测可信度的关键问题。使用 AllTick API 获取外汇历史行情时,因市场天然休市导致的时序断档,会直接造成均线、波动率、动量类因子计算偏移,甚至引发策略信号失真。 本文从量化研究实战角度,提供一套稳健、可复现、无虚假数据的缺口处理方案,适用于外汇多周期回测、因子建模与实盘数据预处理。 一、量化研究中的核心数据痛点 外汇市场为 24/5 交易机制,周末与主要国际节假日无成交,接口仅返回有效 K 线,不生成空值时段,时序天然断裂。 分钟 K 线时间不连续,会导致技术指标异常、回测曲线跳变、因子分布偏离真实市场状态。 过度插值易引入人为趋势,提升模型过拟合风险,不符合量化研究的数据严谨性要求。 二、量化场景下的稳健处理思路 数据处理遵循不构造虚假行情、不扭曲真实收益结构、可标记可追溯原则: 对原始 K 线做固定频率时间对齐,强制生成连续时间索引,明确缺口位置。 采用前值填充(ffill),保持价格序列平稳,不引入额外波动。 增加填充标记字段,用于策略风控与回测日志追溯。 三、简洁稳健代码实现 import pandas as pd # 外汇分钟K线缺口标准化处理 def fill_kline_gap(file_path: str): # 读取数据并构建连续时间序列 df = pd.read_csv(file_path, parse_dates=["time"]) df = df.set_index("time").asfreq("1min") # 前值填充(回测最稳健方式) df.ffill(inplace=True) # 标记填充数据,用于策略判断 df["is_filled"] = df["volume"].isna() return df # 使用示例 df = fill_kline_gap("EURUSD_1min.csv") 四、量化研究应用价值 提升回测可信度:连续时序可保证因子计算、信号生成、收益回测的一致性,减少数据偏差。 降低过拟合风险:前值填充不引入虚假波动,使模型更贴近实盘环境。 适配多场景研究:可直接用于日内策略、高频因子、趋势跟踪模型的数据预处理。 工程化可扩展:支持批量处理多货币对,可接入数据流水线自动化执行。 五、总结 外汇分钟 K 线节假日缺口是量化研究中典型的数据预处理问题,处理核心在于保持时序连续且不破坏原始价格行为。采用固定频率对齐与前值填充的组合方案,能够在不引入虚假信息的前提下,满足回测、因子计算与可视化的严谨要求,是外汇量化研究中通用且稳健的标准处理方式。 我主做的港股日内策略需要同时覆盖全部港股通标的,光是成分股就超过500只。量化初期我写过一个REST轮询脚本,跑完全量行情普遍需要90秒以上,日内回测时发现信号产生时间与实际行情触发时间存在明显偏离,根源就在于数据到达存在系统性滞后。 后来我把行情接收模块重构为基于WebSocket的推送架构,全市场tick从产生到被我的策略主程序消费,延迟控制在50毫秒以内。对于做tick级因子和盘口异动策略的个人量化交易者而言,这意味着信号质量得到了根本性提升。 轮询的天花板与推送的优势 轮询模型下,数据延迟=轮询周期/2 + 网络往返时间,标的越多延迟越大。同时,接口限流会进一步拉长有效更新时间。WebSocket推送则把模型倒转:服务端在成交发生的瞬间即向订阅客户端广播,单个连接可承载数百个标的的订阅,无需轮询周期。实践中我选用了AllTick的行情推送接口,港股通全量标的都能通过WebSocket实时订阅,文档完善,适合量化工作流集成。 批量行情抓取的系统设计 我的数据系统由四个功能层构成: 标的同步层:定期更新港股通成分股列表,代码标准化为 .HK 格式。 订阅管理层:按接口限制分批发送订阅帧,连接建立后维持心跳保活。 数据解析层:对推送的JSON进行实时解析,提取symbol、price、volume、turnover、time等量化所需字段。 缓存与存储层:解析后的数据放入无锁队列,消费者线程批量写入数据库,供策略引擎和回测系统调用。 层级 功能 要点 标的同步 维护最新港股通成分股列表 后缀处理与停牌过滤 订阅管理 WebSocket分批订阅与保活 单次订阅数量按接口规范 数据解析 JSON转结构化tick 字段动态兼容,防止异常中断 缓存存储 队列缓冲,批量入库 解耦接收与写入,降低延迟 核心代码片段 import websocket import json def on_message(ws, message): # 实时解析港股通tick数据 data = json.loads(message) for tick in data.get("ticks", []): # 实际量化环境中将数据压入消息队列 print(f"{tick['symbol']} 最新价: {tick['price']} 成交量: {tick['volume']}") def on_open(ws): # 批量订阅目标标的 tickers = ["00700.HK", "09988.HK", "02628.HK"] ws.send(json.dumps({"action": "subscribe", "symbols": tickers})) ws = websocket.WebSocketApp("wss://api.alltick.co/stock/ws", on_message=on_message, on_open=on_open) ws.run_forever() 从数据痛点到策略增益 重构后最大的改变是tick时间戳与策略触发时间的对齐程度大幅提高,我基于实时tick构建的买卖盘口强度因子和异常放量信号不再滞后。同时,稳定的数据流让我得以安心将更多时间花在因子研究与参数优化上,而不是像过去那样反复核对数据是否掉线。对于个人量化交易者来说,一条可靠的全量行情管线就是策略迭代的底气。 尊敬的SuperMind技术支持团队, 我在使用SuperMind平台进行模拟策略开发时遇到了一个问题。我尝试使用 get_concept_relate 函数获取同花顺软件中按94(查看板块热点)看到的513个概念板块数据,但返回的概念数量为1199个,与预期不符。 我使用的代码如下: concept_info = get_concept_relate(date='now', levels=['GN001001','GN001002','GN001003'], fields=None) print(f"获取板块数量:{len(concept_info)}") print(f"{concept_info}") 然后我尝试删除available_date 为None的数据,结果显示概念板块数量是455个,还是不对。 请问我如何能取到同花顺软件中按94(查看板块热点)看到的板块清单? 做量化交易的人都知道,回测系统的核心不是策略有多花哨,而是数据有多可靠。 如果历史行情数据本身就有问题,那么再完美的回测结果也只是“垃圾进,垃圾出”。 本文从实战出发,聊聊如何通过 API 批量拉取历史行情数据,并做一套严谨的回测数据清洗流程。这些坑,我都踩过。 一、为什么历史行情数据这么难搞? 很多人以为历史行情就是“股票代码+日期+开高低收+成交量”。真上手才发现,问题一大堆: 不同数据源格式不同,有的前复权、有的后复权、有的不复权 停牌日、除权除息日、涨跌停板数据容易被忽略 API 限流、断点续传、数据缺失需要处理 国内 A 股、美股、期货的数据格式和规则差异巨大 一个合格的量化回测系统,必须能从源头保证数据的完整性、一致性、无偏性。 二、批量拉取的工程设计 2.1 基础思路 不要一次性拉全部历史数据,更不要写死日期。合理的设计应该是: 配置股票池 → 判断本地已有数据 → 只拉缺失区间 → 合并去重 → 校验一致性 2.2 代码示例:带断点续传的批量拉取 下面使用 iTick API 获取历史日线数据(前复权),并实现本地缓存与断点续传。 import requests import pandas as pd import time from pathlib import Path API_TOKEN = "your_token_here" # 替换为实际 Token BASE_URL = "https://api.itick.org" def build_headers(): """构造请求头,包含 API Token 验证""" return { "token": API_TOKEN, "Content-Type": "application/json" } def fetch_stock_history(stock_code, region="HK", k_type=8, start_date="20000101", end_date="20231231", cache_dir="./data/raw"): """ 带缓存的批量拉取,自动断点续传 参数说明: stock_code : 股票代码(港股示例:00700) region : 市场代码(HK/US/SZ/SH 等) k_type : K线类型(8:日线,9:周线,10:月线) start_date : 开始日期(格式 YYYYMMDD) end_date : 结束日期(格式 YYYYMMDD) cache_dir : 本地缓存目录 """ Path(cache_dir).mkdir(parents=True, exist_ok=True) cache_file = Path(cache_dir) / f"{stock_code}.parquet" # 已有数据则加载,仅拉取缺失区间 if cache_file.exists(): df_old = pd.read_parquet(cache_file) df_old['trade_date'] = pd.to_datetime(df_old['trade_date']) last_date = df_old['trade_date'].max() start_date = (last_date + pd.Timedelta(days=1)).strftime('%Y%m%d') if start_date > end_date: return df_old print(f"{stock_code}: 本地已有数据至 {last_date.date()},开始增量拉取...") else: df_old = pd.DataFrame() # 将日期范围转换为时间戳(iTick kType 模式下需通过 et 参数控制截止) start_ts = int(pd.Timestamp(start_date).timestamp()) end_ts = int(pd.Timestamp(end_date).timestamp()) all_data = [] current_end_ts = end_ts batch_days = 100 # 每批最多拉取约 100 个交易日 while True: # 计算当前批次的起始截止区间(基于天数回推) batch_start_ts = max(start_ts, current_end_ts - batch_days * 86400) params = { "region": region, "code": stock_code, "kType": k_type, "limit": 500, # 每次最多返回 500 根 K 线 "et": current_end_ts } try: url = f"{BASE_URL}/stock/kline" resp = requests.get(url, headers=build_headers(), params=params, timeout=15) if resp.status_code != 200: print(f"拉取失败: {stock_code}, 状态码 {resp.status_code}") time.sleep(2) continue data = resp.json() if data.get("code") == 0 and data.get("data"): batch_data = data["data"] all_data.extend(batch_data) print(f"{stock_code}: 拉取到 {len(batch_data)} 条数据") # 判断是否还有更早的数据 earliest_ts = batch_data[-1].get("t", 0) if batch_data else 0 if earliest_ts <= start_ts or len(batch_data) < 500: break current_end_ts = earliest_ts - 86400 # 继续拉取更早数据 else: print(f"拉取失败: {stock_code}, 错误信息: {data.get('msg')}") break time.sleep(0.5) # 限流控制 except Exception as e: print(f"拉取异常: {stock_code}, 错误: {e}") time.sleep(5) continue if not all_data: return df_old # 数据转换与合并 df_new = pd.DataFrame(all_data) # 将时间戳转换为日期 df_new['trade_date'] = pd.to_datetime(df_new['t'], unit='s') # 重命名字段为统一格式 df_new = df_new.rename(columns={ 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume' }) df_new = df_new[['trade_date', 'open', 'high', 'low', 'close', 'volume']] df_combined = pd.concat([df_old, df_new], ignore_index=True) if not df_old.empty else df_new df_combined = df_combined.drop_duplicates(subset=['trade_date']).sort_values('trade_date') df_combined.to_parquet(cache_file, index=False) print(f"{stock_code}: 数据保存至 {cache_file}, 共计 {len(df_combined)} 条") return df_combined 这个函数做了几件关键的事: 检查本地缓存(Parquet 格式),只拉取缺失区间 通过 limit 和分批区间控制拉取量,支持大量历史数据的自动分页 异常重试与限流睡眠 时间戳自动转换为标准化日期字段 2.3 多股票并发拉取 单线程循环拉取效率较低,可使用线程池实现并发,但仍需控制并发数以避免 API 限流: from concurrent.futures import ThreadPoolExecutor, as_completed def fetch_batch(stock_list, region="HK", max_workers=3): """ 批量拉取多只股票的历史数据 max_workers: 并发数建议 ≤ 5,防止被限流 """ results = {} with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(fetch_stock_history, code, region): code for code in stock_list } for future in as_completed(futures): code = futures[future] try: results[code] = future.result() print(f"{code}: 拉取完成") except Exception as e: print(f"{code}: 拉取失败, 错误: {e}") return results 并发数建议不超过 5,否则容易被数据源封禁。 三、回测数据清洗 Checklist 拉下来的原始数据,离直接用于回测还差好几步。这是我总结的清洗流程,每一步都不能省。 3.1 时间轴处理 # 确保交易日连续,无跳空 def align_trading_days(df, trading_calendar=None): df['trade_date'] = pd.to_datetime(df['trade_date']) df = df.sort_values('trade_date').set_index('trade_date') if trading_calendar is None: # 生成完整日历(工作日频率) full_calendar = pd.date_range(start=df.index.min(), end=df.index.max(), freq='B') else: full_calendar = trading_calendar df = df.reindex(full_calendar) return df 用工作日频率(freq='B')生成完整日历,缺失日期会自动填入 NaN,后续再填充或标记。 3.2 除权除息与复权统一 这是最大的坑! 很多新手直接用不复权数据做回测,结果会发现某天价格突然跳空低开 30%(实际上是除权),策略却以为是大跌而错误开平仓。 最佳实践:全程使用 前复权(qfq) 数据,保持历史价格连续可比。但要注意,前复权会导致早期价格出现负数(极端分红),需要做截断处理: # 剔除前复权后的负价格或极小价格 df = df[(df['close'] > 0.01) & (df['high'] > 0.01)] 3.3 涨跌停板标记 回测时,如果策略根据信号在涨停价买入,实际根本无法成交。需要提前标记: # 计算涨跌停价(A股主板±10%,科创/创业±20%,港股无涨跌停板限制) def calc_limit_prices(df, stock_code): # 根据股票代码判断市场 if stock_code.startswith('688') or stock_code.startswith('300'): limit_pct = 0.20 # 科创板/创业板 elif stock_code.startswith('600') or stock_code.startswith('000'): limit_pct = 0.10 # A股主板 else: # 港股无涨跌停板限制,直接返回 df['is_limit_up'] = False df['is_limit_down'] = False return df df['prev_close'] = df['close'].shift(1) df['upper_limit'] = df['prev_close'] * (1 + limit_pct) df['lower_limit'] = df['prev_close'] * (1 - limit_pct) # 标记一字板 df['is_limit_up'] = (df['open'] >= df['upper_limit'] - 0.001) & (df['close'] >= df['upper_limit'] - 0.001) df['is_limit_down'] = (df['open'] <= df['lower_limit'] + 0.001) & (df['close'] <= df['lower_limit'] + 0.001) return df 回测执行时,遇到 is_limit_up 且为买入信号,应跳过或转换策略。 3.4 停牌数据处理 停牌期间,没有成交,不应填充为前一日价格(会导致回测出现不合理收益)。正确做法: # 停牌日成交量应该为0或NaN,不做前向填充 df['volume'] = df['volume'].fillna(0) # 对于价格字段,停牌日保持NaN,后续回测引擎遇到NaN应直接跳过该日 3.5 数据对齐(多股票回测) 多股票回测时,需将所有股票对齐到同一个交易日历: def align_multi_stocks(stock_dfs, trading_days): """ stock_dfs: dict {code: DataFrame} trading_days: 交易日列表(pd.DatetimeIndex) """ aligned = {} for code, df in stock_dfs.items(): df_aligned = df.set_index('trade_date').reindex(trading_days) aligned[code] = df_aligned return aligned 四、数据质量校验 清洗完毕后,一定要跑一遍自动化校验: def validate_data(df, stock_code): checks = { "是否有重复日期": df.index.duplicated().sum() == 0, "是否有空价格": df[['open','high','low','close']].isna().any().any() == False, "最低价是否高于最高价": (df['low'] <= df['high']).all(), "成交量是否非负": (df['volume'] >= 0).all(), "价格序列是否单调异常": ( (df['close'] - df['close'].shift(1)).abs() / df['close'].shift(1) < 0.2 ).all(), # 除去涨跌停 } for name, result in checks.items(): print(f"{stock_code} - {name}: {'通过' if result else '失败'}") return all(checks.values()) 五、存储与版本管理建议 格式:强烈推荐 Parquet 或 Feather,比 CSV 快 10 倍以上,且占用空间小。 目录结构: data/ raw/ # 原始API拉取数据(按股票保存) cleaned/ # 清洗后数据(已复权、对齐、填充) meta/ # 股票列表、交易日历、除权因子备份 版本控制:历史数据不要放 Git,用 DVC(Data Version Control)或直接云存储(S3、OSS)。 六、个人建议 永远保留原始拉取数据,清洗脚本可重复执行。否则哪天发现清洗逻辑错了,你还得全部重拉。 不要完美主义。回测数据做不到 100%精确,但必须保证无偏性(误差在买卖双方随机出现)。 先验小样本。对某只股票拉 3 年数据,手动核对除权除息日、涨跌停日,确信流程正确后再批量跑。 备胎数据源。核心股票池至少准备两个数据源交叉验证。 最后,记住一句话:回测是用来排除坏策略的,不是用来证明好策略的。 而这一切的起点,就是靠谱的历史行情数据。希望这篇文章能帮你少走弯路。 参考文档:https://docs.itick.org/websocket/stocks GitHub:https://github.com/itick-org/ 感谢supermind解决了我多年的强迫症。 从mindgo到现在,兜兜转转,我坚持了两年多,感谢老张的陪伴,总算有了退休的希望。把biqquant平台的策略搬家到了这里,彻底放飞自我了。。训练集 15-22年 策略--超短龙头战法无未来函数绩效。我一直觉得 传统量化要迭代到AI-量化,AI-量化是未来的趋势。机器学习和深度学习算法在金融市场里面还是很多应用的空间的。做了很多策略才发现,用机器学习做出来的策略,就是比传统策略要亮眼一点而且超短T+1策略这一块 还是机器学习做出来的策略效果会比较好。继续实盘。感觉有希望了,今年退休!点赞都是有缘人,祝你23年暴富!需要框架的 邮箱 随缘给 研究了两年,终于研究出来一个无敌策略,不惧牛熊,各种行情都是稳定盈利!! 有感兴趣的朋友欢迎留言,短周期策略。持仓数量十只 求写一个简单的问财语句作为选股条件的买入和卖出的策略,本人炒股资金有点少,这个思路适合小资金,求大佬能帮忙写一个 最近在折腾高频策略,发现数据这块真是个大坑。尤其是Tick和Level2,动不动就是几百G,硬盘都顶不住。今天简单聊聊我常用的几个数据源,主要是从CMES金融数据库下载的,给刚入门的朋友。 先说说最基础的分钟线数据。这个对回测比较友好,数据量小,格式也简单。一般包含时间、开盘价、最高价、最低价、收盘价和成交量。比如看个5分钟K线,用这个就够了。 # 获取分钟线数据示例,CMES金融数据库的行情接口 # 注意入参正确,调用频率正常 import cmesdata as cmes # 获取AAPL的5分钟K线 data = cmes.get_kline(symbol='AAPL', interval='5min', start_date='20240101') 但分钟线是“总结”过的,真想看市场微观结构,还得是Tick数据。这个就细了,每一笔成交都记录,包含精确时间、价格、成交量、买卖方向。数据量巨大,不是做高频或者订单流分析的话,建议先别碰。 更细的是十档行情(Level2)。这能看到买卖盘口的深度,不只是五档。字段包括时间、十个买价买量、十个卖价卖量,还有总委托量什么的。之前用这个数据看主力合约的挂单变化,对判断短期压力支撑有点用。 为了方便对比,我列了个简单的表,是我自己平时会关注的几个点: 数据类别 大概长什么样 我的使用感受 分钟线 时间,O, H, L, C, V 省地方,回测必备,新手友好。 Tick逐笔 精确时间,价格,成交量,方向 数据狂魔,盘口重建靠它,硬盘杀手。 十档行情 时间,买1-10价/量,卖1-10价/量 看盘口深度,算盘口厚度,做市商可能更关心。 最后提一嘴,这些数据在数据库的下载页都能找到,有打包好的历史数据。用的时候注意一下数据字段的说明,别把买卖方向搞反了。数据清洗也挺费时间的,他们那边有处理好的版本,能省点事。 刚开始建议从分钟线玩起,Tick数据真的庞杂,容易处理到崩溃。有同样在折腾高频数据的朋友,欢迎交流啊,有啥压缩数据的好方法也求分享!