作为一名全球金融行业的开发者,近些年把目光投向了澳大利亚股票市场时,最先遇到的难题就是“数据获取”——想要做实时行情分析、搭建自己的量化策略,没有靠谱的实时行情 API 接入方式,一切都是空谈。踩过不少坑后,总算摸索出了一套从 API 对接、实时数据获取到简单量化策略落地的完整流程,今天就把这些实战经验分享给同样感兴趣的朋友。

澳洲股市 API 的核心要点
和港、美、A股不同,澳洲股市(ASX)有自己的交易规则和数据体系,对接 API 前先理清两个关键:
- 数据类型:我们最关心的无非是实时成交价、买卖盘口、成交量、涨跌幅这些核心字段,优质的 API 会提供毫秒级的实时推送,而非延迟的快照数据;
- 接入方式:多数澳洲股市 API 支持 RESTful 接口(拉取数据)和 WebSocket(推送实时数据),前者适合按需查询,后者适合实时行情监控;
- 权限与成本:个人开发者通常先从免费/轻量付费套餐入手,验证功能后再升级,注意查看 API 的调用频率限制、数据覆盖范围(是否包含全市场股票,还是仅头部标的)。
二、获取实时行情
先说明前置条件:你需要先在 iTick 官网完成注册,获取 API Key(通常在开发者后台生成),确认你的账号有实时行情访问权限。
1. 环境准备
首先安装必要的 Python 库,requests 用于调用 REST 接口,websocket-client 用于实时推送,pandas 用于数据处理:
pip install requests websocket-client pandas
2. 拉取单只股票实时行情
先从最简单的 REST 接口入手,获取某只澳洲股票(比如 CBA,澳洲联邦银行)的实时数据:
import requests
url = "//api.itick.org/stock/quote?region=AU&code=CBA"
headers = {
"accept": "application/json",
"token": "your_token"
}
response = requests.get(url, headers=headers)
print(response.text)
运行这段代码,你会得到如下格式的实时数据(示例):
{
"code": 0,
"msg": null,
"data": {
"s": "CBA",
"ld": 115.25,
"o": 114.8,
"h": 115.5,
"l": 114.5,
"t": 1765526889000,
"v": 452000,
"tu": 52000000,
"ts": 0,
"ch": 0.45,
"chp": 0.39
}
}
3. WebSocket 监听实时行情推送
如果需要持续监控行情(比如做实时交易信号),REST 接口的轮询效率太低,推荐用 WebSocket 接收实时推送:
import websocket
import json
import threading
import time
# WebSocket 连接地址和 token
WS_URL = "wss://api.itick.org/stock"
API_TOKEN = "your_token"
def on_message(ws, message):
"""处理接收到的消息"""
print("Received message:", message)
data = json.loads(message)
# 处理连接成功的消息
if data.get("code") == 1 and data.get("msg") == "Connected Successfully":
print("Connected successfully, waiting for authentication...")
# 处理认证结果
elif data.get("resAc") == "auth":
if data.get("code") == 1:
print("Authentication successful")
# 认证成功后订阅数据
subscribe(ws)
else:
print("Authentication failed")
ws.close()
# 处理订阅结果
elif data.get("resAc") == "subscribe":
if data.get("code") == 1:
print("Subscription successful")
else:
print("Subscription failed:", data.get("msg"))
# 处理市场数据
elif data.get("data"):
# 打印实时行情数据
market_data = data["data"]
data_type = market_data.get("type")
symbol = market_data.get("s")
print(f"{data_type.upper()} data for {symbol}:", market_data)
def on_error(ws, error):
"""处理错误"""
print("Error:", error)
def on_close(ws, close_status_code, close_msg):
"""连接关闭回调"""
print("Connection closed")
def on_open(ws):
"""连接建立后的回调"""
print("WebSocket connection opened")
def subscribe(ws):
"""订阅行情数据"""
subscribe_msg = {
"ac": "subscribe",
"params": "CBA$AU",
"types": "tick,quote,depth"
}
ws.send(json.dumps(subscribe_msg))
print("Subscribe message sent")
def send_ping(ws):
"""定期发送心跳包"""
while True:
time.sleep(30) # 每30秒发送一次心跳
ping_msg = {
"ac": "ping",
"params": str(int(time.time() * 1000))
}
ws.send(json.dumps(ping_msg))
print("Ping sent")
if __name__ == "__main__":
# 创建 WebSocket 连接,通过header传递token
ws = websocket.WebSocketApp(
WS_URL,
header={"token": API_TOKEN},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
# 在单独的线程中启动心跳机制
ping_thread = threading.Thread(target=send_ping, args=(ws,))
ping_thread.daemon = True
ping_thread.start()
# 启动 WebSocket 连接
ws.run_forever()
运行后,你会看到实时推送的行情数据,每一次股价、盘口变动都会即时显示,这也是做实时量化策略的基础。
三、简单的澳洲股市量化示例
拿到实时数据后,就可以搭建基础的量化策略了。这里分享一个“突破买入”的简单策略思路:当股票实时价格突破近 5 分钟均价的 1.01 倍,且成交量放大时,触发买入信号。
import requests
import pandas as pd
import time
API_TOKEN = "your_token"
BASE_URL = "//api.itick.org/stock/quote"
# 存储最近5分钟的价格数据
price_history = {
"CBA": []
}
AVG_WINDOW = 300 # 5分钟(秒)
PRICE_RATIO = 1.01 # 突破比例
def get_realtime_price(symbol):
"""获取实时价格"""
headers = {
"accept": "application/json",
"token": API_TOKEN
}
params = {
"region": "AU",
"code": symbol
}
try:
response = requests.get(BASE_URL, headers=headers, params=params, timeout=5)
data = response.json()
if "data" in data:
quote = data["data"]
return quote["ld"], quote["v"], quote["t"]
except Exception as e:
print(f"获取价格失败:{e}")
return None, None, None
def check_breakout_strategy(symbol):
"""检查突破策略信号"""
price, volume, ts = get_realtime_price(symbol)
if price is None:
return
# 记录价格,只保留最近5分钟的数据
current_time = ts / 1000 # 转换为秒
price_history[symbol].append({"price": price, "volume": volume, "time": current_time})
# 过滤超出5分钟的数据
price_history[symbol] = [x for x in price_history[symbol] if current_time - x["time"] <= AVG_WINDOW]
# 至少有10个数据点才计算均价
if len(price_history[symbol]) >= 10:
prices = [x["price"] for x in price_history[symbol]]
avg_price = pd.Series(prices).mean()
# 成交量放大(对比前一个数据点)
volume_increase = volume > price_history[symbol][-2]["volume"] * 1.2 if len(price_history[symbol]) > 1 else False
# 触发突破信号
if price > avg_price * PRICE_RATIO and volume_increase:
print(f"\n【买入信号】{symbol} - 实时价:{price} | 5分钟均价:{round(avg_price,2)} | 成交量:{volume}")
else:
print(f"{symbol} - 实时价:{price} | 5分钟均价:{round(avg_price,2)} | 无信号", end="\r")
if __name__ == "__main__":
# 持续监控策略信号,每3秒查询一次
while True:
check_breakout_strategy("CBA")
time.sleep(3)
这个策略的逻辑很简单,适合新手入门:通过持续采集实时价格,计算短期均价,结合成交量判断突破有效性,避免单纯的价格假突破。实际使用时,你可以根据澳洲股市的交易时间(澳洲东部时间 9:15-16:00)调整运行时段,也可以加入止损、止盈等风控逻辑。
四、接入澳洲股市 API 的小提醒
- 时区问题:澳洲使用 AEST/AEDT 时区,比北京时间快 2-3 小时,处理时间戳时一定要做时区转换,避免数据时间错乱;
- API 限流:免费/基础套餐通常有调用频率限制(比如每秒 10 次),批量查询时要加延时,避免被限流;
- 数据验证:实时行情偶尔会有异常值(比如价格跳空),策略中要加入数据清洗逻辑,比如过滤超出合理波动范围的价格;
- 测试环境:先用模拟盘/测试 API 验证策略,不要直接对接实盘,澳洲股市的交易规则和费用也要提前了解清楚。
总结
- 接入澳洲股市 API 的核心是先明确数据需求(实时/历史),选择适配的接口类型(REST/WebSocket),iTick API 作为示例,其接入流程具备通用性;
- Python 对接 API 的关键是做好权限验证、异常处理和数据格式化,WebSocket 是获取实时行情的最优方式;
- 量化策略的落地要从简单逻辑入手,结合实时数据做信号验证,同时注意时区、限流、数据清洗等细节。
温馨提示:本文仅供代码参考,不构成任何投资建议。市场有风险,投资需谨慎
参考文档:https://blog.itick.org/stock-api/2026-australian-stock-market-itick-api-guide
GitHub:https://github.com/itick-org/

