分享一个基于实时行情数据API的监控脚本

用户头像sh_***5125ml
2025-06-26 发布

功能描述

  • 订阅 AAPL 的 1 分钟 K 线;
  • 提取最新的收盘价;
  • 检测价格变动是否大于一定阈值(例如 1%);
  • 推送价格更新到 Telegram。

2. 准备工作

  1. 获取 Infoway 的 API Keyhttps://infoway.io
  2. 创建 Telegram Bot:向 @BotFather 创建一个 bot,获取 TELEGRAM_BOT_TOKEN
  3. 获取 Chat ID:与 bot 聊天,然后访问 https://api.telegram.org/bot<YOUR_BOT_TOKEN>/getUpdates 获取你的 chat_id

3. 监控股价变动

import asyncio
import json
import websockets
import aiohttp

WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey"
TELEGRAM_BOT_TOKEN = "yourTelegramBotToken"
TELEGRAM_CHAT_ID = "yourTelegramChatID"

# 这个脚本依赖实时行情数据,需要先在infoway的官网获取免费的API key

last_close_price = None
PRICE_CHANGE_THRESHOLD = 0.01  # 1%

async def send_telegram_message(message: str):
    url = f"//api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    payload = {
        "chat_id": TELEGRAM_CHAT_ID,
        "text": message
    }
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=payload) as response:
            if response.status != 200:
                print("Failed to send Telegram message:", await response.text())

async def connect_and_monitor():
    global last_close_price

    async with websockets.connect(WS_URL) as websocket:
        init_message = {
            "code": 10004,
            "trace": "unique-trace-id-001",
            "data": {
                "arr": [{"type": 1, "codes": "AAPL"}]
            }
        }
        await websocket.send(json.dumps(init_message))

        async def send_ping():
            while True:
                await asyncio.sleep(30)
                ping_message = {"code": 10010, "trace": "ping-001"}
                await websocket.send(json.dumps(ping_message))

        ping_task = asyncio.create_task(send_ping())

        try:
            while True:
                raw_message = await websocket.recv()
                message = json.loads(raw_message)

                # 处理K线数据
                if message.get("code") == 10004 and "data" in message:
                    kline_data = message["data"].get("AAPL", [])
                    if kline_data:
                        latest = kline_data[-1]
                        close_price = latest[4]

                        if last_close_price:
                            change = abs(close_price - last_close_price) / last_close_price
                            if change >= PRICE_CHANGE_THRESHOLD:
                                msg = f"AAPL最新价格: {close_price:.2f},变动率: {change * 100:.2f}%"
                                print(msg)
                                await send_telegram_message(msg)

                        last_close_price = close_price

        except websockets.exceptions.ConnectionClosed:
            print("WebSocket连接关闭")
        finally:
            ping_task.cancel()

asyncio.run(connect_and_monitor())

  • latest[4] 是收盘价,K线数据数组格式一般是 [时间戳, 开, 高, 低, 收, 成交量]
  • PRICE_CHANGE_THRESHOLD 可根据需要调整为其他百分比;
  • 可扩展支持更多股票、不同周期等。

4. 监控个股RSI指标

我们也可以在监控脚本中加入RSI指标的判断,当股票的RSI大于70,或者小于30时发出预警,下面是代码示例:

import asyncio
import json
import websockets
import aiohttp
import numpy as np
from collections import deque

WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey"
TELEGRAM_BOT_TOKEN = "yourTelegramBotToken"
TELEGRAM_CHAT_ID = "yourTelegramChatID"

# RSI 参数
RSI_PERIOD = 14
PRICE_HISTORY = deque(maxlen=RSI_PERIOD + 1)

async def send_telegram_message(message: str):
    url = f"//api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    payload = {
        "chat_id": TELEGRAM_CHAT_ID,
        "text": message
    }
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=payload) as response:
            if response.status != 200:
                print("Failed to send Telegram message:", await response.text())

def calculate_rsi(prices: list) -> float:
    if len(prices) < RSI_PERIOD + 1:
        return None

    deltas = np.diff(prices)
    gains = np.where(deltas > 0, deltas, 0)
    losses = np.where(deltas < 0, -deltas, 0)

    avg_gain = np.mean(gains)
    avg_loss = np.mean(losses)

    if avg_loss == 0:
        return 100.0  # 避免除以0,说明价格只涨不跌

    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

async def connect_and_monitor():
    async with websockets.connect(WS_URL) as websocket:
        init_message = {
            "code": 10004,
            "trace": "unique-trace-id-002",
            "data": {
                "arr": [{"type": 1, "codes": "AAPL"}]
            }
        }
        await websocket.send(json.dumps(init_message))

        async def send_ping():
            while True:
                await asyncio.sleep(30)
                ping_message = {"code": 10010, "trace": "ping-002"}
                await websocket.send(json.dumps(ping_message))

        ping_task = asyncio.create_task(send_ping())

        try:
            while True:
                raw_message = await websocket.recv()
                message = json.loads(raw_message)

                if message.get("code") == 10004 and "data" in message:
                    kline_data = message["data"].get("AAPL", [])
                    if kline_data:
                        latest = kline_data[-1]
                        close_price = latest[4]

                        # 收集价格用于计算 RSI
                        PRICE_HISTORY.append(close_price)

                        # RSI计算与推送
                        rsi = calculate_rsi(list(PRICE_HISTORY))
                        if rsi:
                            print(f"当前RSI: {rsi:.2f}")
                            if rsi > 70:
                                await send_telegram_message(f"AAPL超买警告⚠️:当前RSI = {rsi:.2f} (>70)")
                            elif rsi < 30:
                                await send_telegram_message(f"AAPL超卖警告📉:当前RSI = {rsi:.2f} (<30)")

        except websockets.exceptions.ConnectionClosed:
            print("WebSocket连接关闭")
        finally:
            ping_task.cancel()

asyncio.run(connect_and_monitor())

  • 一开始需要收集够 15 条价格数据(RSI_PERIOD+1)才开始计算 RSI;
  • 你可以将 RSI_PERIOD 设置为其他值,比如 6、9、21,根据策略需求调整;
  • 为了避免重复推送相同方向的 RSI 信号,可以加入“信号状态”缓存,如有需要我可以帮你加。

评论