开篇:你的移动平均线,可能一直算错了
做美股量化交易的朋友,大概率遇到过这些场景:
- 用 Polygon.io 的实时行情,算出来的 RSI 总是比 TradingView 慢一拍;
- 用 yfinance 拉历史数据回测,收益曲线很美,一上实盘就拉胯;
- 股票突然拆股(比如 1:2),你的 MA5 瞬间“腰斩”,触发了错误卖出信号;
- 明明盯着盘口,等指标发出买入指令,价格已经冲高回落。
这些问题的根源,往往不是策略本身,而是实时指标计算的技术细节——窗口选错了、乱序没处理、复权没对齐、状态管理爆炸。
今天这篇文章,我会尽量用通俗的语言,把实时计算技术指标的核心原理讲清楚。同时,我会给出可运行的代码(Flink 示例),让有技术背景的读者能直接拿去用。没有技术背景的朋友,理解原理和避坑点就足够了。
本文核心内容:
- 美股主流数据源对比(Polygon、Alpaca、TickDB 等)——延迟、精度、成交量差异
- 滚动窗口 vs 滑动窗口——为什么你的均线总滞后
- 事件时间 vs 处理时间——乱序数据如何毁掉指标
- 增量聚合与状态管理——1000 只股票如何不爆内存
- 拆股与复权——价格“腰斩”,指标不能跳
- 流计算框架选型——Flink、Spark、Kafka Streams 怎么选
- 完整实战示例(Flink + TickDB WebSocket)
一、美股数据源对比:你的实时行情从哪来?
实时指标计算的第一步是数据源。不同数据源的延迟、精度、成交量完整性差异巨大,直接影响指标准确性。
下表对比了主流美股数据源的关键指标(数据来自社区实测和官方文档)。注意 TickDB 不仅覆盖美股,还支持港股、A股、加密货币等多市场,适合全球化量化团队。
| 数据源 | 时间戳精度 | 典型延迟(P50) | 成交量完整性 | 复权支持 | 免费/付费 | 适合场景 |
|---|---|---|---|---|---|---|
| Polygon.io | 纳秒级 | < 10ms | 全 SIP 数据 | 支持(但拆股计算偶有误差) | 付费 $79/月起 | 机构级低延迟交易 |
| Alpaca | 纳秒级 | 50-100ms | 全 SIP(与 Polygon 差异仅 2.7%) | 支持 | 免费版 200次/分钟 | 个人量化、低成本 |
| TickDB | 毫秒级 | 实时推送 | 全球多市场(美股、港股、A股、加密货币等) | 自动前复权 | 订阅制 | 多市场统一接入、AI 应用 |
| TradeStation | 未明确 | 未明确 | 缺 TRF 数据(成交量少 28.5%) | 支持 | 需账户注资 $1万+ | 已有 TradeStation 账户用户 |
| Finnhub | 毫秒级 | < 100ms | 全球聚合 | 支持 | 免费版有限 | 多资产监控 |
关键发现:
- 成交量差异:TradeStation 因不含 TRF(场外交易报告)数据,成交量比 Polygon 少约 28.5%。这意味着如果你用 TradeStation 数据计算 VWAP(成交量加权平均价),结果会严重偏低。
- 延迟影响:P99 延迟 250-1000ms 时,指标会严重“过时”,导致逆向选择(策略方向对,但执行价格已变)。
- TickDB 的独特优势:一套接口覆盖美股、港股、A股、加密货币,且自动处理复权和时间戳对齐,省去多市场接入的清洗工作。
选型建议:
- 追求极低延迟(<10ms)且预算充足 → Polygon.io
- 个人量化、成本敏感 → Alpaca(免费版足够回测和低频实盘)
- 多市场(美股+港股+A股)统一接入 → TickDB
- 已有 TradeStation 账户且不介意成交量缺失 → TradeStation
二、核心概念:移动平均线是怎么算出来的?
假设你已经有了实时行情流(每秒收到 AAPL 的最新成交价),想计算过去5分钟的平均价格,并且每1分钟更新一次(即 MA5)。
2.1 滚动窗口 vs 滑动窗口
| 窗口类型 | 计算方式 | 输出频率 | 滞后性 | 适用场景 |
|---|---|---|---|---|
| 滚动窗口 | 每5分钟算一次,不重叠(如 10:00-10:05,10:05-10:10) | 每5分钟 | 高(指标5分钟才变一次) | 每分钟K线聚合 |
| 滑动窗口 | 每1分钟算一次,每次覆盖过去5分钟(如 10:05 算 10:00-10:05,10:06 算 10:01-10:06) | 每1分钟 | 低(平滑更新) | 移动平均线、RSI |
结论:MA5 必须用滑动窗口,窗口大小 5 分钟,滑动步长 1 分钟。
生活例子:你想知道自己最近5天的平均体重。滚动窗口就是每5天称一次,告诉你这5天的平均值。滑动窗口就是你每天称一次,但每次回顾过去5天。显然,滑动窗口能让你更早发现体重变化趋势。
2.2 事件时间 vs 处理时间
数据可能因为网络延迟而乱序到达。比如一笔交易在 10:01:00 发生,但 10:01:05 才到你的服务器。
| 时间类型 | 定义 | 示例 | 问题 |
|---|---|---|---|
| 处理时间 | 数据到达服务器的时间 | 10:01:05 | 将交易归入错误窗口(10:01:05 之后的窗口) |
| 事件时间 | 交易实际发生的时间(数据自带的时间戳) | 10:01:00 | 正确归入 10:01:00 窗口 |
结论:必须使用事件时间,并设置水印(Watermark) 来容忍乱序。
水印就像“交卷截止时间”:你告诉系统,“我可以容忍最多5秒的乱序,超过5秒还没到的数据我就不要了”。系统会等待一段时间,然后关闭窗口输出结果。
三、状态管理:1000 只股票会吃掉多少内存?
假设你有 1000 只股票,每只股票每秒收到 1 个价格。用滑动窗口(5分钟窗口,每1秒滑动一次,但实际每1分钟输出即可),每个窗口需要存储过去 300 个数据点(5×60=300)。如果每个数据点存完整信息(价格、成交量等),大约 100 字节,那么总内存:
1000 只 × 300 点 × 100 字节 = 30 MB
30 MB 看似不大,但如果同时计算 MA5、MA10、RSI、布林带……状态量会线性增长到 GB 级别。高频场景下(每 100 毫秒一个 tick),状态量再翻几倍。
3.1 增量聚合:只存“总和”和“个数”
| 存储方式 | 每只股票存储内容 | 内存占用(1000只) | 适用场景 |
|---|---|---|---|
| 全量存储 | 300 个价格 | 30 MB | 需要精确中位数、分位数 |
| 增量聚合 | 2 个数字(总和、个数) | 16 KB | 平均值、计数、求和 |
增量聚合的原理:每来一个新价格,就把旧的总和加上新价格,个数加 1。需要输出时,用总和除以个数。
生活例子:你想知道过去5天的平均消费金额。你不需要记住每一天花了多少钱,只需要记住“总花费”和“天数”。每天结束,把今天的花费加到总花费里,天数加1。这就是增量聚合。
3.2 状态后端选型
| 状态后端 | 存储位置 | 适用场景 | 优缺点 |
|---|---|---|---|
| HashMapStateBackend | JVM 堆内存 | 小状态(<1GB)、低延迟 | 速度快,但 GC 压力大 |
| RocksDBStateBackend | 本地磁盘 + 内存缓存 | 大状态(>1GB)、生产环境 | 稳定,支持增量 checkpoint,但延迟稍高 |
建议:1000+ 股票实时计算,强制使用 RocksDB。
四、乱序处理:网络延迟怎么破?
4.1 乱序的产生原因
- 网络抖动:A 交易 10:01:00 发生,10:01:05 到;B 交易 10:01:02 发生,10:01:03 到 → B 先到,A 后到。
- 多数据源合并:不同交易所的撮合时间不同。
- 暗池交易:FINRA TRF 在早上 8:00 集中报告前一夜的交易,时间戳显示 8:00,但实际发生在凌晨。
4.2 水印(Watermark)与允许乱序时间
| 参数 | 含义 | 推荐值 | 依据 |
|---|---|---|---|
| 允许乱序时间 | 系统等待迟到数据的最大时长 | 数据源 P99 延迟的 1.5 倍 | Polygon P99 约 50-100ms,设 500ms;免费源可能需 5 秒 |
| 空闲检测 | 某只股票多久无数据后标记为空闲 | 1 分钟 | 防止停牌股票阻塞整个系统 |
Flink 代码示例(设置水印):
DataStream<MarketData> withTimestamps = source
.assignTimestampsAndWatermarks(
WatermarkStrategy.<MarketData>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getParticipantTimestamp())
.withIdleness(Duration.ofMinutes(1))
);
生活例子:你组织一场考试,要求 10:00 交卷。你知道有些学生会迟到,所以宣布:“我等到 10:05,还没交的就不收了。”这里的 10:05 就是水印(允许乱序 5 分钟)。
五、拆股与复权:价格“腰斩”,指标不能跟着跳
5.1 问题重现
某股票 1:2 拆股,公告生效日当天,价格从 200 元变成 100 元。如果你用原始价格计算 MA5,会在拆股那一刻看到均线瞬间“腰斩”,触发错误的卖出信号。
实际上,你的资产并没有变少(1 股变成 2 股,总价值不变)。技术指标应该反映连续的价值变化,而不是人为的价格调整。
5.2 复权处理方案
| 方案 | 历史数据 | 实时价格 | 一致性 | 实现复杂度 |
|---|---|---|---|---|
| Raw(原始) | 原始 | 原始 | 差(拆股时跳变) | 低 |
| Adjusted(复权) | 复权 | 复权 | 好 | 中(需复权因子) |
| ScaledRaw | 复权 | 原始 | 中(实时与历史不一致) | 中 |
| Total Return | 含分红再投资 | 含分红再投资 | 好 | 高 |
推荐:使用 Adjusted 模式,选择数据源自动返回复权价格。例如 TickDB 的 K 线接口默认返回前复权数据,实时 WebSocket 流也推送复权价格,直接使用即可。
5.3 自己实现“复位与预热”的复杂性
如果数据源不支持自动复权,你需要:
- 监听拆股事件通知(如 Polygon 的
/reference/splits接口)。 - 清空该股票的所有指标状态(累加器、窗口缓存)。
- 通过 REST API 拉取拆股后的历史数据(ScaledRaw 模式)。
- 将历史数据重放给指标,重建状态。
- 切回实时 WebSocket 流。
这套流程极易出错,强烈建议选用自带复权的数据源。
六、流计算框架选型:Flink、Spark、Kafka Streams 怎么选?
| 框架 | 延迟 | 状态支持 | 事件时间 | 运维复杂度 | 适合场景 |
|---|---|---|---|---|---|
| Apache Flink | 亚秒级(<10ms P50) | RocksDB(大状态) | 原生支持 | 高 | 高频交易、低延迟、大状态 |
| Spark Structured Streaming | 秒级(微批 500ms-几秒) | 内存 + checkpoint | 支持有限 | 中 | 对延迟不敏感、已用 Spark 生态 |
| Kafka Streams | 亚秒级 | 本地 RocksDB | 支持 | 低(嵌入应用) | 轻量级、数据已在 Kafka |
| Databricks RTM | 毫秒级 | 云原生 | 支持 | 低(托管) | 统一离线/实时代码 |
选型建议:
- 量化团队,要求低延迟(<100ms)、大状态、事件时间 → Flink
- 已有 Spark 生态,延迟要求秒级 → Spark Structured Streaming
- 不想维护独立集群,数据在 Kafka → Kafka Streams
- 使用 Databricks 平台,希望训练与推理代码统一 → Databricks RTM
七、实战:从 TickDB 实时行情到 MA5 计算(完整示例)
本节省略了非技术细节,提供可运行的 Flink Java 代码框架。
7.1 整体架构
TickDB WebSocket (wss://api.tickdb.ai/v1/realtime?api_key=YOUR_KEY)
↓
Flink Custom Source (心跳、API Key、重连)
↓
DataStream<MarketData>
↓
KeyBy(symbol) + SlidingEventTimeWindow(5min, 1min) + AggregateFunction
↓
DataStream<MA5Result>
↓
Redis Sink (供 API 查询)
7.2 TickDB WebSocket Source(核心)
public class TickDBSource extends RichSourceFunction<MarketData> {
private WebSocketClient client;
private final String apiKey;
private final List<String> symbols;
@Override
public void run(SourceContext<MarketData> ctx) throws Exception {
// 注意:必须在 URL 中拼接 api_key 参数
client = new WebSocketClient(new URI("wss://api.tickdb.ai/v1/realtime?api_key=" + this.apiKey)) {
@Override
public void onOpen(ServerHandshake handshake) {
// 订阅股票
JSONObject sub = new JSONObject();
sub.put("action", "subscribe");
sub.put("symbols", symbols);
send(sub.toString());
// 启动心跳(每秒发送 {"cmd":"ping"},保持连接活跃)
scheduleHeartbeat();
}
@Override
public void onMessage(String message) {
JSONObject json = JSON.parseObject(message);
if (json.containsKey("data")) {
JSONObject data = json.getJSONObject("data");
MarketData event = new MarketData();
event.setSymbol(data.getString("symbol"));
event.setTimestamp(data.getLong("timestamp")); // UTC 毫秒
event.setPrice(data.getDoubleValue("price"));
ctx.collect(event);
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
// 触发重连(需实现指数退避逻辑)
}
@Override
public void onError(Exception ex) {
// 触发重连
}
};
client.connect();
while (running) Thread.sleep(1000);
}
private void scheduleHeartbeat() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
if (client.isOpen()) {
client.send("{\"cmd\":\"ping\"}");
}
}, 1, 1, TimeUnit.SECONDS); // 每秒一次
}
}
💡 架构师笔记:生产环境中,除了配置每秒的 ping 心跳,务必在
onClose或onError回调中加入指数退避的重连逻辑(Exponential Backoff),防止网络抖动导致系统彻底宕机。
7.3 增量聚合实现
public class AveragePriceAggregator
implements AggregateFunction<MarketData, Tuple2<Double, Integer>, Double> {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return Tuple2.of(0.0, 0);
}
@Override
public Tuple2<Double, Integer> add(MarketData value, Tuple2<Double, Integer> acc) {
return Tuple2.of(acc.f0 + value.getPrice(), acc.f1 + 1);
}
@Override
public Double getResult(Tuple2<Double, Integer> acc) {
return acc.f1 == 0 ? 0 : acc.f0 / acc.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}
7.4 主作业
public class RealTimeMA5Job {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MarketData> source = env.addSource(
new TickDBSource("YOUR_API_KEY", Arrays.asList("AAPL.US", "MSFT.US")));
DataStream<MarketData> withWatermarks = source
.assignTimestampsAndWatermarks(
WatermarkStrategy.<MarketData>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1))
);
DataStream<MA5Result> ma5 = withWatermarks
.keyBy(MarketData::getSymbol)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new AveragePriceAggregator(), new WindowResultProcessor());
ma5.addSink(new RedisSink());
env.execute("Real-time MA5 Calculator");
}
}
八、总结:避坑指南(小白版)
| 问题 | 一句话解释 | 解决方案 |
|---|---|---|
| 指标滞后 | 每5分钟才更新一次,跟不上行情 | 用滑动窗口,每1分钟更新 |
| 数据乱序 | 网络延迟导致数据到达顺序错乱 | 用事件时间 + 水印,允许迟到5秒 |
| 内存爆炸 | 存储了每个价格明细 | 用增量聚合,只存总和和个数 |
| 停牌阻塞 | 停牌股票卡住整个系统 | 设置空闲检测,1分钟无数据就忽略 |
| 拆股跳变 | 拆股后价格腰斩,指标失真 | 用复权价格,优先选数据源自动复权 |
| 选错框架 | 延迟太高或运维太复杂 | 低延迟选Flink,秒级延迟可选 Spark |
最后,如果你不想自己维护 Flink 集群和复杂的复权逻辑,可以考虑像 TickDB 这样的数据源——它内置了标准化字段、自动复权、WebSocket 心跳保活,还提供了 AI Skill 可以直接用自然语言查技术指标。对于量化团队来说,能省去大量底层工作,专注策略本身。另外,如果只是需要定时轮询当前正在形成的 K 线(例如获取这一分钟的最新价格),可以直接使用 TickDB 的实时 K 线接口 /v1/market/kline/latest,无需搭建 Flink 集群。
本文纯技术分享,提到的数据源均为公开服务,不构成任何投资建议。市场有风险,投资需谨慎。

