全球股票行情API:如何高效获取实时与逐笔成交数据

用户头像Fxdund
2026-03-10 发布

在量化交易、跨境投研、行情监控等场景中,毫秒级实时行情+细粒度逐笔成交数据是核心生产资料。相比于传统 K 线、盘口快照,逐笔成交(Tick 数据)能还原每一笔交易的真实细节,而实时行情则保证数据时效性,两者结合才能支撑精准策略与实时决策。但全球市场分散、交易所规则各异、数据延迟与稳定性要求严苛,本文将参考经典教程的框架,手把手教你如何通过 REST API 和 WebSocket 高效对接全球股票行情数据。
75f9ca1174529e68cd7f3af17bb2e5a6.png

一、REST API 实战:快速获取行情数据

REST API 通过 HTTP GET 请求访问,适合单次查询、批量获取或低频率的数据采集场景。所有请求的基址均为 https://api.itick.org,需要在 Headers 中携带 Token。

1.1 获取实时报价

实时报价接口返回股票的最新价、涨跌幅、成交量等核心数据。

import requests

# 配置API密钥和请求参数
API_TOKEN = 'your_token_here'  # 替换为你的实际Token
region = 'US'  # 市场区域:US美股,HK港股,SH沪市,SZ深市
code = 'AAPL'  # 股票代码,美股无需后缀

url = f"//api.itick.org/stock/quote?region={region}&code={code}"
headers = {
    "accept": "application/json",
    "token": API_TOKEN
}

response = requests.get(url, headers=headers)
if response.status_code == 200:
    data = response.json()
    if data.get('code') == 0:  # 业务状态码,0表示成功
        quote = data['data']
        print(f"股票: {quote['s']}")
        print(f"最新价: {quote['ld']}")
        print(f"涨跌幅: {quote['chp']}%")
        print(f"成交量: {quote['v']}")
    else:
        print("API错误:", data.get('msg'))
else:
    print("HTTP请求失败:", response.status_code)

响应字段说明

  • s: 股票代码(如 AAPL.US)
  • ld: 最新价(Last Price)
  • chp: 涨跌幅(Change Percent)
  • v: 成交量(Volume)
  • o/h/l/c: 开盘价/最高价/最低价/收盘价

1.2 获取逐笔成交明细

逐笔成交数据(Tick Data)记录每一笔真实的成交,包含成交时间、价格、成交量等信息,是分析市场微观结构的基础。

import requests

API_TOKEN = 'your_token_here'
region = 'HK'
code = '00700'  # 腾讯控股

url = f"//api.itick.org/stock/tick?region={region}&code={code}"
headers = {
    "accept": "application/json",
    "token": API_TOKEN
}

response = requests.get(url, headers=headers)
if response.status_code == 200:
    data = response.json()
    if data.get('code') == 0:
        ticks = data['data']  # 返回最近的逐笔数据列表
        for tick in ticks[:5]:  # 打印前5条
            print(f"时间: {tick['t']}, 价格: {tick['p']}, 成交量: {tick['v']}")
    else:
        print("API错误:", data.get('msg'))

注意:REST 接口的/tick通常返回最近的若干条成交记录,适合用于初始化展示或离线分析。如果需要实时流式逐笔数据,请使用 WebSocket。

1.3 获取盘口深度数据

盘口深度(Depth)展示当前的买卖挂单情况,iTick 支持多档盘口查询。

import requests

API_TOKEN = 'your_token_here'
region = 'SH'
code = '600519'  # 贵州茅台

url = f"//api.itick.org/stock/depth?region={region}&code={code}"
headers = {
    "accept": "application/json",
    "token": API_TOKEN
}

response = requests.get(url, headers=headers)
if response.status_code == 200:
    data = response.json()
    if data.get('code') == 0:
        depth = data['data']
        print("买盘(Bids):")
        for bid in depth['b']:  # b代表买盘,包含价格p和挂单量v
            print(f"  价格: {bid['p']}, 数量: {bid['v']}")
        print("卖盘(Asks):")
        for ask in depth['a']:  # a代表卖盘
            print(f"  价格: {ask['p']}, 数量: {ask['v']}")
    else:
        print("API错误:", data.get('msg'))

1.4 获取历史 K 线数据

K 线数据是技术分析和策略回测的核心,iTick 支持多种周期的历史 K 线查询。

import requests

API_TOKEN = 'your_token_here'
region = 'US'
code = 'MSFT'
kType = '2'   # K线类型:1=1分钟,2=5分钟,3=15分钟,4=30分钟,5=1小时,6=2小时,7=4小时,8=日线,9=周线,10=月线
limit = 10    # 返回K线数量

url = f"//api.itick.org/stock/kline?region={region}&code={code}&kType={kType}&limit={limit}"
headers = {
    "accept": "application/json",
    "token": API_TOKEN
}

response = requests.get(url, headers=headers)
if response.status_code == 200:
    data = response.json()
    if data.get('code') == 0:
        klines = data['data']
        for k in klines:
            print(f"时间: {k['t']}, 开: {k['o']}, 高: {k['h']}, 低: {k['l']}, 收: {k['c']}, 量: {k['v']}")
    else:
        print("API错误:", data.get('msg'))

二、WebSocket 实战:实时推送逐笔与盘口数据

如果你的应用场景对延迟要求较高(如高频交易、实时监控),WebSocket 是更优的选择。iTick API 的 WebSocket 接口支持行情数据的实时推送,包括报价、逐笔成交和盘口变化。

2.1 连接与认证

WebSocket 连接地址为:wss://api.itick.org/stock(股票),需要在请求头中携带 Token 进行认证。

2.2 订阅数据

连接成功后,发送订阅消息指定需要监控的标的和数据类型:

  • params: 标的列表,格式为"代码市场",多个用逗号分隔,如`"AAPLUS,00700$HK"`
  • types: 数据类型,支持quote(报价)、tick(逐笔成交)、depth(盘口),多个用逗号分隔

2.3 心跳维持

为防止连接超时断开,客户端需定期(如每 30 秒)发送心跳包。

2.4 完整代码示例(含自动重连)

下面是一个生产级别的 WebSocket 客户端实现,包含认证、订阅、心跳、自动重连和详细日志:

import websocket
import json
import threading
import time
from loguru import logger

class iTickWebSocketClient:
    def __init__(self, token, symbols, types="quote,tick,depth"):
        """
        :param token: API Token
        :param symbols: 标的列表,格式如 ["AAPL$US", "00700$HK"]
        :param types: 订阅的数据类型,如 "quote,tick,depth"
        """
        self.token = token
        self.symbols = symbols
        self.types = types
        self.ws_url = "wss://api.itick.org/stock"
        self.ws = None
        self.is_connected = False
        self.thread = None

    def on_message(self, ws, message):
        """处理接收到的消息"""
        try:
            data = json.loads(message)

            # 处理连接成功消息
            if data.get("code") == 1 and data.get("msg") == "Connected Successfully":
                logger.info("✅ 连接成功,等待认证...")

            # 处理认证结果
            elif data.get("resAc") == "auth":
                if data.get("code") == 1:
                    logger.info("✅ 认证通过,开始订阅数据...")
                    self.subscribe()
                else:
                    logger.error(f"❌ 认证失败:{data.get('msg')}")
                    ws.close()

            # 处理订阅结果
            elif data.get("resAc") == "subscribe":
                if data.get("code") == 1:
                    logger.info(f"✅ 订阅成功!标的:{self.symbols},类型:{self.types}")
                else:
                    logger.error(f"❌ 订阅失败:{data.get('msg')}")

            # 处理实时行情数据
            elif data.get("data"):
                market_data = data["data"]
                data_type = market_data.get("type")  # quote/tick/depth
                symbol = market_data.get("s")

                # 根据数据类型分别处理
                if data_type == "tick":
                    logger.debug(f"📊 逐笔成交 {symbol}: 价格={market_data.get('p')}, 数量={market_data.get('v')}")
                elif data_type == "quote":
                    logger.debug(f"📈 实时报价 {symbol}: 最新价={market_data.get('ld')}, 涨跌幅={market_data.get('chp')}%")
                elif data_type == "depth":
                    logger.debug(f"📚 盘口深度 {symbol}: 买一={market_data.get('b')[0] if market_data.get('b') else 'N/A'}")

        except Exception as e:
            logger.error(f"消息处理异常: {e}")

    def on_error(self, ws, error):
        logger.error(f"❌ WebSocket错误: {error}")
        self.is_connected = False

    def on_close(self, ws, close_status_code, close_msg):
        logger.warning(f"🔌 连接关闭: {close_status_code} - {close_msg}")
        self.is_connected = False
        # 触发自动重连
        logger.info("3秒后尝试重连...")
        time.sleep(3)
        self.start()

    def on_open(self, ws):
        logger.info("🔗 WebSocket连接已打开")
        self.is_connected = True

        # 发送认证消息(Token放在header中,无需单独发送认证消息)
        # 某些版本的iTick WebSocket需要在header中携带token,无需显式auth

    def subscribe(self):
        """发送订阅请求"""
        params_str = ",".join(self.symbols)
        subscribe_msg = {
            "ac": "subscribe",
            "params": params_str,
            "types": self.types
        }
        self.ws.send(json.dumps(subscribe_msg))

    def send_ping(self):
        """心跳线程:每30秒发送ping"""
        while self.is_connected:
            time.sleep(30)
            try:
                ping_msg = {
                    "ac": "ping",
                    "params": str(int(time.time() * 1000))
                }
                self.ws.send(json.dumps(ping_msg))
                logger.debug("📡 发送心跳包")
            except Exception as e:
                logger.error(f"发送心跳包失败: {e}")

    def start(self):
        """启动WebSocket连接"""
        try:
            # 创建WebSocketApp,在header中携带Token
            self.ws = websocket.WebSocketApp(
                self.ws_url,
                header={"token": self.token},
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
            )

            # 启动心跳线程
            ping_thread = threading.Thread(target=self.send_ping, daemon=True)
            ping_thread.start()

            # 运行连接(这会阻塞,所以放到线程中)
            self.thread = threading.Thread(target=self.ws.run_forever, daemon=True)
            self.thread.start()

        except Exception as e:
            logger.error(f"启动WebSocket失败: {e}")

    def stop(self):
        """停止连接"""
        if self.ws:
            self.ws.close()
        self.is_connected = False

# 使用示例
if __name__ == "__main__":
    # 配置日志
    logger.add("itick_ws.log", rotation="500 MB", level="INFO")

    # 初始化客户端
    client = iTickWebSocketClient(
        token="your_token_here",
        symbols=["AAPL$US", "00700$HK", "600519$SH"],
        types="quote,tick"  # 只订阅报价和逐笔成交
    )

    # 启动连接
    client.start()

    # 保持主线程运行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("程序终止")
        client.stop()

三、实战要点与避坑指南

3.1 市场与代码格式

iTick 使用region参数区分市场,不同市场的代码格式如下:

市场 region 值 代码示例 说明
美股 US AAPL 直接使用英文代码
港股 HK 00700 数字代码,无需前缀
沪市 SH 600519 6 位数字代码
深市 SZ 000001 6 位数字代码

3.2 批量操作

iTick 支持批量查询,将单数接口的code改为codes,多个代码用逗号分隔:

  • GET /stock/quotes?region=US&codes=AAPL,MSFT,GOOGL

3.3 连接稳定性

生产环境必须实现自动重连机制。上述 WebSocket 示例已经包含了断线重连,建议根据业务需求调整重连间隔和最大重连次数。

3.4 数据缓存策略

对于历史 K 线等低频变动数据,可以在本地缓存 1 分钟以上,减少不必要的 API 调用。对于实时行情,建议直接使用 WebSocket 推送,避免轮询带来的延迟和资源浪费。

四、总结

本文详细介绍了如何高效获取全球股票的实时行情与逐笔成交数据。通过 REST API,你可以快速实现单次查询和历史数据获取;通过 WebSocket,则可以构建低延迟的实时数据管道,满足量化交易和高频监控的需求。希望本文的代码示例和实战经验能帮助你在量化交易和金融应用开发的道路上少走弯路。

下一步,你可以尝试将实时数据接入自己的策略引擎,或结合数据库构建历史数据仓库,开启你的量化之旅。


温馨提示:本文代码仅供参考,不构成任何投资建议。市场有风险,投资需谨慎。

GitHub 项目地址https://github.com/itick-org/

评论