功能描述
- 订阅 AAPL 的 1 分钟 K 线;
- 提取最新的收盘价;
- 检测价格变动是否大于一定阈值(例如 1%);
- 推送价格更新到 Telegram。
2. 准备工作
- 获取 Infoway 的 API Key:https://infoway.io
- 创建 Telegram Bot:向 @BotFather 创建一个 bot,获取
TELEGRAM_BOT_TOKEN
; - 获取 Chat ID:与 bot 聊天,然后访问
https://api.telegram.org/bot<YOUR_BOT_TOKEN>/getUpdates
获取你的chat_id
。
3. 监控股价变动
import asyncio
import json
import websockets
import aiohttp
WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey"
TELEGRAM_BOT_TOKEN = "yourTelegramBotToken"
TELEGRAM_CHAT_ID = "yourTelegramChatID"
# 这个脚本依赖实时行情数据,需要先在infoway的官网获取免费的API key
last_close_price = None
PRICE_CHANGE_THRESHOLD = 0.01 # 1%
async def send_telegram_message(message: str):
url = f"//api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message
}
async with aiohttp.ClientSession() as session:
async with session.post(url, data=payload) as response:
if response.status != 200:
print("Failed to send Telegram message:", await response.text())
async def connect_and_monitor():
global last_close_price
async with websockets.connect(WS_URL) as websocket:
init_message = {
"code": 10004,
"trace": "unique-trace-id-001",
"data": {
"arr": [{"type": 1, "codes": "AAPL"}]
}
}
await websocket.send(json.dumps(init_message))
async def send_ping():
while True:
await asyncio.sleep(30)
ping_message = {"code": 10010, "trace": "ping-001"}
await websocket.send(json.dumps(ping_message))
ping_task = asyncio.create_task(send_ping())
try:
while True:
raw_message = await websocket.recv()
message = json.loads(raw_message)
# 处理K线数据
if message.get("code") == 10004 and "data" in message:
kline_data = message["data"].get("AAPL", [])
if kline_data:
latest = kline_data[-1]
close_price = latest[4]
if last_close_price:
change = abs(close_price - last_close_price) / last_close_price
if change >= PRICE_CHANGE_THRESHOLD:
msg = f"AAPL最新价格: {close_price:.2f},变动率: {change * 100:.2f}%"
print(msg)
await send_telegram_message(msg)
last_close_price = close_price
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接关闭")
finally:
ping_task.cancel()
asyncio.run(connect_and_monitor())
latest[4]
是收盘价,K线数据数组格式一般是[时间戳, 开, 高, 低, 收, 成交量]
;PRICE_CHANGE_THRESHOLD
可根据需要调整为其他百分比;- 可扩展支持更多股票、不同周期等。
4. 监控个股RSI指标
我们也可以在监控脚本中加入RSI指标的判断,当股票的RSI大于70,或者小于30时发出预警,下面是代码示例:
import asyncio
import json
import websockets
import aiohttp
import numpy as np
from collections import deque
WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey"
TELEGRAM_BOT_TOKEN = "yourTelegramBotToken"
TELEGRAM_CHAT_ID = "yourTelegramChatID"
# RSI 参数
RSI_PERIOD = 14
PRICE_HISTORY = deque(maxlen=RSI_PERIOD + 1)
async def send_telegram_message(message: str):
url = f"//api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message
}
async with aiohttp.ClientSession() as session:
async with session.post(url, data=payload) as response:
if response.status != 200:
print("Failed to send Telegram message:", await response.text())
def calculate_rsi(prices: list) -> float:
if len(prices) < RSI_PERIOD + 1:
return None
deltas = np.diff(prices)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains)
avg_loss = np.mean(losses)
if avg_loss == 0:
return 100.0 # 避免除以0,说明价格只涨不跌
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
async def connect_and_monitor():
async with websockets.connect(WS_URL) as websocket:
init_message = {
"code": 10004,
"trace": "unique-trace-id-002",
"data": {
"arr": [{"type": 1, "codes": "AAPL"}]
}
}
await websocket.send(json.dumps(init_message))
async def send_ping():
while True:
await asyncio.sleep(30)
ping_message = {"code": 10010, "trace": "ping-002"}
await websocket.send(json.dumps(ping_message))
ping_task = asyncio.create_task(send_ping())
try:
while True:
raw_message = await websocket.recv()
message = json.loads(raw_message)
if message.get("code") == 10004 and "data" in message:
kline_data = message["data"].get("AAPL", [])
if kline_data:
latest = kline_data[-1]
close_price = latest[4]
# 收集价格用于计算 RSI
PRICE_HISTORY.append(close_price)
# RSI计算与推送
rsi = calculate_rsi(list(PRICE_HISTORY))
if rsi:
print(f"当前RSI: {rsi:.2f}")
if rsi > 70:
await send_telegram_message(f"AAPL超买警告⚠️:当前RSI = {rsi:.2f} (>70)")
elif rsi < 30:
await send_telegram_message(f"AAPL超卖警告📉:当前RSI = {rsi:.2f} (<30)")
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接关闭")
finally:
ping_task.cancel()
asyncio.run(connect_and_monitor())
- 一开始需要收集够 15 条价格数据(RSI_PERIOD+1)才开始计算 RSI;
- 你可以将
RSI_PERIOD
设置为其他值,比如 6、9、21,根据策略需求调整; - 为了避免重复推送相同方向的 RSI 信号,可以加入“信号状态”缓存,如有需要我可以帮你加。