全部
文章&策略
学习干货
问答
官方

notebook中调用其他文件

用户头像sh_*874qop
2026-02-01 发布
%run stock_process.ipynb 这个会出错误是因为 %run 是 Jupyter 的魔法命令,如何在supermind中的notebook中使用,还是文件地址怎么写
浏览15
评论1
收藏0
用户头像神盾局量子研究部
2023-05-15 发布
背景与目的 之前我们有了策略回测代码,到实盘要经过熟悉实盘API、写代码、调试代码的环节,大概还需要1-2周的时间才能实盘,有非常多的用户到这一步 会束手无策,甚至放弃! 现在,有了回测代码直接实盘的功能,可以省去这个步骤,让刚入门的朋友也可以直接拿回测代码进行实盘了。很棒!为我们的工程师点赞! 此外还新增了一些接口,方便实盘交易。 不断降低实盘的门槛是我们的目标,如果您有任何好的想法意见请随时留言! 本功能需要重启研究环境才能生效! 策略实盘交易(回测代码1分钟实盘) ?调用方法: research_trade( name, source_code, capital_base=100000, frequency='DAILY', stock_market='STOCK', benchmark=None, trade_api=None, signal_mode=True, dry_run=False, recover_dt=False, ) ​ ?参数说明: name:str,策略名称,会在./persist/下生成一个同名目录,用于存放持久化的策略信息 source_code:str,策略代码,可从策略研究模块中直接复制,代码置于"""..."""中 capital_base: float,初始资金量 如果接入了TradeAPI对象,且 signal_mode=False,那么此参数无意义 frequency: str,策略频率,'DAILY'或'MINUTE' stock_market: str,策略类型,默认'STOCK' benchmark: str,基准指数 trade_api: TradeAPI对象,绑定需要仿真交易的资金账号 如果不传入TradeAPI对象,即 trade_api=None,此时为模拟交易 如果传入TradeAPI对象,此时为仿真交易 signal_mode: bool,(新增)默认为 True signal_mode=True,此时策略实际上运行的时初始资金为capital_base的模拟交易,context、get_orders等方法返回的结果均为模拟交易中计算的数据,与资金账号的数据无关;策略下单在模拟交易撮合成交后,才会通过trade_api下单至柜台 signal_mode=False,此时策略中context、get_orders等方法返回的结果均为从 柜台查询,策略下单也会直接下至柜台 dry_run: bool,试运行,立即返回,默认为 False recover_dt: bool或 str,(新增)是否断点运行,默认为 False recover_dt=False,从当前时点开始执行,不从断定运行 recover_dt=True,从上次策略结束时点开始运行 recover_dt='today',从当日开始运行,此模式下只会补执行 before_trading与 open_auction,handle_bar依旧从当前时间开始执行 recover_dt='yyyyMMdd HH:mm',从指定时间开始运行 ?️ 返回值: RealtimeService类 ?作用: 模拟交易:撮合机制与回测相同 仿真交易:通过仿真柜台撮合,更贴近真实交易环境 ❗注意事项: 策略需在9:00前开启运行,否则在未设置recover_dt的情况下,会跳过before_trading等步骤 初始化TradeAPI时需要指定下单策略order_policy,MarketPolicy为市价下单;LimitPolicy为限价下单。如未指定,由于策略下单时使用均价,可能存在多位小数,最终实盘账户下单的时候可能产生废单 signal_mode=True时,如想在context中获得仿真账号的持仓、资金等数据,可以使用同步函数 sync_trade_api() ?示例: from tick_trade_api import TradeAPI #初始化TradeAPI时需要指定下单策略,MarketPolicy为市价下单;LimitPolicy为限价下单 trade_api=TradeAPI('69271711',order_policy=MarketPolicy) source_code=""" # 股票策略模版 def init(context): pass # 盘前执行 def before_trading(context): pass # 开盘时运行函数 def handle_bar(context, bar_dict): order_id = order('000001.SZ', 100) print(get_orders()) try: cancel_order(order_id) except: print('撤单失败') print(get_open_orders()) print(get_tradelogs()) print(context.portfolio.stock_account) print(context.portfolio.positions) """ rtrade = research_trade( '研究环境策略', source_code, frequency='MINUTE', trade_api=trade_api, signal_mode=False, recover_dt='today' ) ​ trade_api=TradeAPI('69271711',order_policy=MarketPolicy) 中的账号是模拟资金账号或者是实盘资金账号。 其他更新 这次还增加了几个功能 策略框架中增加 : cancel_order_all() 全撤 get_tradelogs()获取当日全部成交订单 get_orders() 获取委托,和get_order()一致,主要时和tradeapi中函数名对齐 tradeapi增加: get_open_orders() 获取当日未成订单 cancel_order_all() 全撤
浏览38624
评论12
收藏73

如何获取十年国债收益率

用户头像sh_**084h43
2026-01-31 发布
想回测股债平衡, 如何获取到十年国债收益率呢
浏览14
评论1
收藏0
用户头像sh_***174w0d
2026-01-31 发布
引言:为什么高手的方法总是如此简单? 在瞬息万变的股市中,为什么我们散户总是激情满满地冲进加速上涨的股票,买在情绪的最高点,最终却往往亏损离场?我们不禁会问,那些真正叱咤风云的顶级操盘手,他们究竟藏着什么不为人知的交易秘诀? 答案或许会颠覆你的认知:真正的顶级高手,其买点往往“普通到不能再普通”。他们不会在万众瞩目的加速阶段去追高,而是选择在一个关键的转折点悄然入场。本文将为你揭示这个颠覆常识的顶级交易心法——“弱转强”,你将学到如何摒弃无效的追涨,精准识别那个阻力最小、爆发力最强的黄金买点。 1.交易的真谛:纯粹,只在“转强”的瞬间出手 顶级交易策略的核心,就在于**“纯粹”****。** 高手会主动放弃那些看似诱人的加速上涨阶段,因为他们深知,那里的风险远大于机会。他们会将全部的精力,专注于“弱转强”这一个关键节点。这种专注,让交易变得极为纯粹。 你只要别做加速,专注弱转强,只在那个转强的瞬间出手,那你的交易就会很纯粹,纯粹的东西才能达到极致啊。 为什么“纯粹”如此重要?因为它能帮助交易者过滤掉市场的噪音和情绪的干扰。当你的眼中只有一个目标、一个模式时,你就能摆脱追涨杀跌的冲动,冷静地等待最具爆发力的转折点出现,一击即中。 2.颠覆认知:如何用成交量看懂真正的“强”与“弱” 要掌握“弱转强”,首先必须重新定义你对“强”与“弱”的理解。在实战中,它们往往和你直觉的判断恰恰相反。 ●真正的**“强”****(实际是陷阱):** 这里的“强”,指的是前一天出现“缩量加速”或“高开秒板”的股票。这种形态看似强势逼人,但背后隐藏着巨大的风险。缩量意味着里面的获利筹码几乎没有卖出,筹码锁定度极高。 ●真正的**“弱”****(实际是机会):** 这里的“弱”,指的是前一天是“放量板”,无论它的形态是摇摇欲坠的“烂板”,还是充满争议的“分歧板”,其核心特征只有一个——“换手充分”。这才是机会的温床。 总结来说,普通人眼中一字封板的“强”,在高手看来是风险;而普通人眼中充满分歧、换手巨大的“弱”,反而是酝酿下一波行情的绝佳土壤。 3.黄金买点:为什么“弱转强”才是阻力最小的方向? “弱转强”策略背后的核心逻辑,在于寻找市场中阻力最小的方向。 当前一天经历“放量”(弱)时,实际上已经完成了一次彻底的换手。那些不坚定的筹码、早期的获利盘,都在巨大的成交量中被清洗出局。这使得大量的资金成本都集中在了前一天的涨停板附近。这个充满分歧的放量过程,正是将弹簧奋力压缩的动作。 试想一下,第二天这只股票如果高开,这些前一天刚进场的资金,仅仅赚取了一个高开的利润,他们会有强烈的卖出欲望吗?答案是否定的。此刻,盘中的抛压非常轻,拉升的阻力极小。这个高开转强的瞬间,就是弹簧被压缩到极致后猛然释放的那一刻,充满了爆发力。 记住你做交易一定是要去阻力最小的地方。 成功的交易从来不是硬碰硬地对抗抛压,而是要顺势而为,找到市场合力最强、阻力最小的路径。这正是“弱转强”模式的精髓所在。 4.从“术”到“道”:策略虽好,但只能用于“人气核心票” 到这里,我们讨论的成交量分析、买点判断,都还停留在“术”的层面,即具体的技术方法。然而,真正能让交易者做大做强的,是更高阶的“道”的层面——回归股票本身,明白这个技术应该用在什么地方。 请务必记住,“弱转强”这个策略的适用范围极为苛刻,它只能聚焦于两市的“人气核心票”,绝对不能用于其他的“杂毛”股。 为什么?因为只有万众瞩目的人气核心股,才能聚集起足够强大的市场合力,去完成这种从巨大分歧(弱)到重新一致(强)的高难度K线形态转换。这股“合力”正是创造“阻力最小路径”的关键。而在那些无人问津的杂毛股上,类似形态大概率不是市场共识的体现,而更可能是单个庄家设下的陷阱。 结语:成为一个专注且专一的交易者 真正的交易秘诀,从来不是什么复杂的公式或神奇的指标。它就是将正确的“术”(弱转强),运用在正确的“道”(人气核心股)之上。 成功的关键,最终归结于交易者本身。正如文中所说,你需要努力成为一个“专注且专一的人”。 在信息爆炸、各种交易理论层出不穷的今天,我们是该不断寻找下一个“圣杯”,还是应该将一个简单、纯粹的策略反复打磨,修炼到极致?这个问题,值得每一位交易者深思。
浏览72
评论1
收藏0
用户头像Fxdund
2026-01-31 发布
在量化交易和个人投资管理中,实时监控股票价格并设置自动预警是提升决策效率的关键。今天,我将手把手教你如何使用 Python 对接实时股票 API,并构建一个功能完整的股票价格预警系统。 系统架构设计 我们的预警系统将包含以下核心模块: 数据获取模块 - 通过 WebSocket 连接实时股票 API 数据处理模块 - 解析和处理实时行情数据 预警规则引擎 - 根据预设条件触发预警 通知模块 - 通过多种渠道发送预警信息 技术栈选择 Python 3.8+ - 主要编程语言 WebSocket 客户端 - 实时数据接收 Pandas - 数据处理和分析 SQLite/Redis - 预警规则和状态存储 SMTP/Telegram API - 预警通知发送 第一步:配置开发环境 # requirements.txt # 实时股票API客户端和数据处理库 websocket-client>=1.3.0 requests>=2.28.0 pandas>=1.5.0 numpy>=1.24.0 python-dotenv>=0.21.0 schedule>=1.1.0 redis>=4.5.0 # 可选的通知库 python-telegram-bot>=20.0.0 twilio>=8.0.0 第二步:实现基于 iTick WebSocket的实时数据连接器 # tick_data_connector.py import websocket import json import threading import time import logging from datetime import datetime import pandas as pd from typing import Dict, List, Callable, Optional class ITickWebSocketClient: """ iTick Stocks WebSocket 实时行情客户端 """ def __init__(self, token: str, symbols: List[str]): self.token = token self.symbols = symbols # 例如 ["AAPL$US", "TSLA$US"] self.ws_url = "wss://api.itick.org/stock" self.ws = None self.is_connected = False self.is_authenticated = False self.data_callbacks = [] self.error_callbacks = [] # 数据缓存(以 symbol 如 "AAPL$US" 为 key) self.price_cache: Dict[str, Dict] = {} self.last_update: Dict[str, datetime] = {} logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) def add_data_callback(self, callback: Callable): self.data_callbacks.append(callback) def add_error_callback(self, callback: Callable): self.error_callbacks.append(callback) def on_message(self, ws, message): try: data = json.loads(message) # 连接成功 if data.get("msg") == "Connected Successfully": self.logger.info("WebSocket 连接成功") # 认证结果 elif data.get("resAc") == "auth": if data.get("code") == 1: self.is_authenticated = True self.logger.info("认证成功") self._subscribe() else: self.logger.error("认证失败") ws.close() # 订阅结果 elif data.get("resAc") == "subscribe": if data.get("code") == 1: self.logger.info("订阅成功") else: self.logger.error(f"订阅失败: {data.get('msg')}") # 心跳响应 elif data.get("resAc") == "pong": self.logger.debug("收到 pong") # 实际行情数据 elif data.get("data"): market_data = data["data"] data_type = market_data.get("type", "") symbol = market_data.get("s") or market_data.get("s") # tick/quote 使用 s if data_type == "tick": tick = self._parse_tick_data(market_data) self.price_cache[symbol] = tick self.last_update[symbol] = datetime.now() for callback in self.data_callbacks: callback(tick) # 如需处理 quote、depth,可在此扩展 except Exception as e: self.logger.error(f"消息处理异常: {e}") def _parse_tick_data(self, raw: Dict) -> Dict: """解析 iTick tick 数据""" return { 'symbol': raw.get('s'), 'price': raw.get('ld'), # 最新成交价 'volume': raw.get('v', 0), 'timestamp': datetime.fromtimestamp(raw.get('t', 0) / 1000).isoformat(), 'received_at': datetime.now().isoformat() } def on_error(self, ws, error): self.logger.error(f"WebSocket 错误: {error}") for cb in self.error_callbacks: cb(error) def on_close(self, ws, close_status_code, close_msg): self.is_connected = False self.logger.info("WebSocket 连接关闭") def on_open(self, ws): self.is_connected = True self.logger.info("WebSocket 已打开,等待认证...") def _subscribe(self): """发送订阅消息""" subscribe_msg = { "ac": "subscribe", "params": ",".join(self.symbols), "types": "tick,quote,depth" # 可根据需要添加 K 数据 } self.ws.send(json.dumps(subscribe_msg)) self.logger.info(f"已发送订阅: {subscribe_msg['params']}") def _send_ping(self): """心跳线程""" while self.is_connected: time.sleep(30) if self.is_connected: ping_msg = { "ac": "ping", "params": str(int(time.time() * 1000)) } self.ws.send(json.dumps(ping_msg)) self.logger.debug("发送 ping") def connect(self): headers = {"token": self.token} self.ws = websocket.WebSocketApp( self.ws_url, header=headers, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) # 主线程运行 WebSocket wst = threading.Thread(target=self.ws.run_forever) wst.daemon = True wst.start() # 启动心跳线程 ping_thread = threading.Thread(target=self._send_ping) ping_thread.daemon = True ping_thread.start() # 等待连接建立 for _ in range(30): if self.is_authenticated: break time.sleep(1) def disconnect(self): if self.ws: self.ws.close() self.is_connected = False def get_current_price(self, symbol: str) -> Optional[float]: if symbol in self.price_cache: return self.price_cache[symbol].get('price') return None 第三步:构建预警规则引擎 # alert_engine.py import sqlite3 from datetime import datetime, timedelta from typing import Dict, List, Any import json import logging class AlertEngine: """预警规则引擎""" def __init__(self, db_path: str = "alerts.db"): self.db_path = db_path self.active_alerts = {} self.logger = logging.getLogger(__name__) self._init_database() def _init_database(self): """初始化数据库""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 创建预警规则表 cursor.execute(''' CREATE TABLE IF NOT EXISTS alert_rules ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, rule_type TEXT NOT NULL, condition TEXT NOT NULL, threshold REAL, value TEXT, is_active INTEGER DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_triggered TIMESTAMP ) ''') # 创建预警历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS alert_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, alert_rule_id INTEGER, symbol TEXT NOT NULL, trigger_price REAL, trigger_value TEXT, triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (alert_rule_id) REFERENCES alert_rules (id) ) ''') conn.commit() conn.close() # 从数据库加载活动预警 self._load_active_alerts() def _load_active_alerts(self): """从数据库加载活动预警规则""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( "SELECT * FROM alert_rules WHERE is_active = 1" ) for row in cursor.fetchall(): rule_id = row[0] self.active_alerts[rule_id] = { 'symbol': row[1], 'rule_type': row[2], 'condition': row[3], 'threshold': row[4], 'value': row[5], 'last_triggered': row[7] } conn.close() self.logger.info(f"加载了 {len(self.active_alerts)} 个活动预警规则") def add_alert_rule(self, rule_data: Dict) -> int: """添加预警规则""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO alert_rules (symbol, rule_type, condition, threshold, value) VALUES (?, ?, ?, ?, ?) ''', ( rule_data['symbol'], rule_data['rule_type'], rule_data['condition'], rule_data.get('threshold'), json.dumps(rule_data.get('value', {})) )) rule_id = cursor.lastrowid conn.commit() conn.close() # 添加到活动预警 self.active_alerts[rule_id] = { 'symbol': rule_data['symbol'], 'rule_type': rule_data['rule_type'], 'condition': rule_data['condition'], 'threshold': rule_data.get('threshold'), 'value': rule_data.get('value', {}), 'last_triggered': None } self.logger.info(f"添加预警规则: {rule_data['symbol']} - {rule_data['rule_type']}") return rule_id def check_price_alert(self, symbol: str, price: float) -> List[Dict]: """检查价格预警""" triggered_alerts = [] for rule_id, rule in self.active_alerts.items(): if rule['symbol'] != symbol: continue if rule['rule_type'] == 'price': triggered = self._evaluate_price_condition( price, rule['condition'], rule['threshold'] ) if triggered: # 检查是否在冷却期内(避免频繁触发) if self._is_in_cooldown(rule['last_triggered']): continue alert_info = { 'rule_id': rule_id, 'symbol': symbol, 'rule_type': 'price', 'condition': rule['condition'], 'threshold': rule['threshold'], 'trigger_price': price, 'message': self._generate_alert_message( symbol, price, rule['condition'], rule['threshold'] ) } triggered_alerts.append(alert_info) # 更新最后触发时间 self._update_last_triggered(rule_id) return triggered_alerts def _evaluate_price_condition(self, price: float, condition: str, threshold: float) -> bool: """评估价格条件""" if condition == 'above': return price > threshold elif condition == 'below': return price < threshold elif condition == 'cross_above': # 需要历史价格数据,这里简化为单次比较 return price >= threshold elif condition == 'cross_below': return price <= threshold elif condition == 'percentage_change': # 需要历史价格计算百分比变化 # 实际应用中需要更复杂的实现 return False return False def _is_in_cooldown(self, last_triggered, cooldown_minutes: int = 5) -> bool: """检查是否在冷却期内""" if not last_triggered: return False if isinstance(last_triggered, str): last_triggered = datetime.fromisoformat(last_triggered) cooldown_end = last_triggered + timedelta(minutes=cooldown_minutes) return datetime.now() < cooldown_end def _update_last_triggered(self, rule_id: int): """更新最后触发时间""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' UPDATE alert_rules SET last_triggered = CURRENT_TIMESTAMP WHERE id = ? ''', (rule_id,)) conn.commit() conn.close() # 更新内存中的记录 if rule_id in self.active_alerts: self.active_alerts[rule_id]['last_triggered'] = datetime.now().isoformat() def _generate_alert_message(self, symbol: str, price: float, condition: str, threshold: float) -> str: """生成预警消息""" if condition == 'above': return f"🚨 {symbol} 价格突破 {threshold},当前价格: {price:.2f}" elif condition == 'below': return f"⚠️ {symbol} 价格跌破 {threshold},当前价格: {price:.2f}" else: return f"📈 {symbol} 触发预警条件,当前价格: {price:.2f}" 第四步:实现通知发送器 # notifier.py import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import logging from typing import Dict class AlertNotifier: """预警通知器""" def __init__(self, config: Dict): self.config = config self.logger = logging.getLogger(__name__) def send_email(self, to_email: str, subject: str, body: str) -> bool: """发送邮件通知""" try: msg = MIMEMultipart() msg['From'] = self.config.get('email_from') msg['To'] = to_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) with smtplib.SMTP( self.config.get('smtp_server'), self.config.get('smtp_port', 587) ) as server: server.starttls() server.login( self.config.get('email_user'), self.config.get('email_password') ) server.send_message(msg) self.logger.info(f"邮件发送成功: {to_email}") return True except Exception as e: self.logger.error(f"邮件发送失败: {e}") return False def send_telegram(self, chat_id: str, message: str) -> bool: """发送Telegram通知(简化版)""" try: # 实际应用中需要使用python-telegram-bot库 # 这里使用requests模拟 import requests bot_token = self.config.get('telegram_bot_token') if not bot_token: self.logger.warning("未配置Telegram Bot Token") return False url = f"https://api.telegram.org/bot{bot_token}/sendMessage" payload = { 'chat_id': chat_id, 'text': message, 'parse_mode': 'HTML' } response = requests.post(url, json=payload) if response.status_code == 200: self.logger.info(f"Telegram消息发送成功: {chat_id}") return True else: self.logger.error(f"Telegram发送失败: {response.text}") return False except ImportError: self.logger.warning("未安装requests库,无法发送Telegram消息") return False except Exception as e: self.logger.error(f"Telegram发送错误: {e}") return False def send_console_notification(self, alert_info: Dict): """控制台通知(开发调试用)""" print("\n" + "="*50) print("🚨 股票预警触发 🚨") print(f"股票: {alert_info['symbol']}") print(f"价格: {alert_info['trigger_price']:.2f}") print(f"条件: {alert_info['condition']} {alert_info['threshold']}") print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("="*50 + "\n") 第五步:整合主应用程序 # main.py import json import time import signal import sys from datetime import datetime from typing import List import logging from tick_data_connector import TickDataAPIClient from alert_engine import AlertEngine from notifier import AlertNotifier class StockAlertSystem: """股票预警系统主程序""" def __init__(self, config_path: str = "config.json"): # 加载配置 with open(config_path, 'r') as f: self.config = json.load(f) # 设置日志 self._setup_logging() self.logger = logging.getLogger(__name__) # 初始化组件 self.api_client = TickDataAPIClient( api_key=self.config['itick']['token'], symbols=self.config['monitor_symbols'] ) self.alert_engine = AlertEngine( db_path=self.config.get('database_path', 'alerts.db') ) self.notifier = AlertNotifier(self.config['notifications']) # 添加预定义预警规则 self._setup_default_alerts() # 运行标志 self.running = False def _setup_logging(self): """配置日志系统""" log_config = self.config.get('logging', {}) log_level = getattr(logging, log_config.get('level', 'INFO')) logging.basicConfig( level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_config.get('file', 'stock_alert.log')), logging.StreamHandler() ] ) def _setup_default_alerts(self): """设置默认预警规则""" default_alerts = self.config.get('default_alerts', []) for alert_rule in default_alerts: self.alert_engine.add_alert_rule(alert_rule) def handle_tick_data(self, tick_data: Dict): """处理实时tick数据""" symbol = tick_data['symbol'] price = tick_data['price'] # 检查是否触发预警 triggered_alerts = self.alert_engine.check_price_alert(symbol, price) # 发送通知 for alert in triggered_alerts: self._send_alert_notifications(alert) def _send_alert_notifications(self, alert_info: Dict): """发送预警通知""" message = alert_info['message'] # 控制台通知(开发用) self.notifier.send_console_notification(alert_info) # 邮件通知 if self.config['notifications'].get('enable_email', False): for email in self.config['notifications'].get('email_recipients', []): self.notifier.send_email( email, f"股票预警: {alert_info['symbol']}", message ) # Telegram通知 if self.config['notifications'].get('enable_telegram', False): for chat_id in self.config['notifications'].get('telegram_chat_ids', []): self.notifier.send_telegram(chat_id, message) # 记录到日志 self.logger.info(f"预警触发: {message}") def run(self): """运行主程序""" self.running = True # 设置信号处理 signal.signal(signal.SIGINT, self.shutdown) signal.signal(signal.SIGTERM, self.shutdown) # 注册数据回调 self.api_client.add_data_callback(self.handle_tick_data) # 连接API self.logger.info("正在连接Tick API...") self.api_client.connect() # 主循环 self.logger.info("股票预警系统已启动") try: while self.running: # 检查系统状态 self._monitor_system_health() time.sleep(1) except KeyboardInterrupt: self.shutdown() except Exception as e: self.logger.error(f"系统运行错误: {e}") self.shutdown() def _monitor_system_health(self): """监控系统健康状态""" # 这里可以添加更多的健康检查逻辑 # 例如:检查API连接状态、数据库连接、磁盘空间等 # 示例:检查最近数据更新时间 for symbol in self.config['monitor_symbols']: last_update = self.api_client.last_update.get(symbol) if last_update: time_diff = (datetime.now() - last_update).total_seconds() if time_diff > 60: # 超过60秒没有数据 self.logger.warning(f"{symbol} 数据更新延迟: {time_diff:.0f}秒") def shutdown(self, signum=None, frame=None): """关闭系统""" self.logger.info("正在关闭系统...") self.running = False # 断开API连接 self.api_client.disconnect() self.logger.info("系统已关闭") sys.exit(0) if __name__ == "__main__": # 创建配置文件(示例) sample_config = { "itick": { "token": "your_iTick_api_key_here", }, "monitor_symbols": ["AAPL$US", "TSLA$US", "GOOGL$US", "MSFT$US"], "default_alerts": [ { "symbol": "AAPL$US", "rule_type": "price", "condition": "above", "threshold": 230.0, "value": {} }, { "symbol": "TSLA$US", "rule_type": "price", "condition": "below", "threshold": 300.0, "value": {} } ], "notifications": { "enable_email": False, "email_user": "your_email@gmail.com", "email_password": "your_app_password", "smtp_server": "smtp.gmail.com", "smtp_port": 587, "email_from": "your_email@gmail.com", "email_recipients": ["recipient@example.com"], "enable_telegram": False, "telegram_bot_token": "your_bot_token", "telegram_chat_ids": ["your_chat_id"] }, "logging": { "level": "INFO", "file": "stock_alert.log" }, "database_path": "alerts.db" } # 保存示例配置 with open("config_sample.json", "w") as f: json.dump(sample_config, f, indent=2) print("示例配置文件已生成: config_sample.json") print("请复制并修改为 config.json,然后运行系统") # 实际运行代码(取消注释以下行) # system = StockAlertSystem("config.json") # system.run() 第六步:扩展功能和优化建议 1. 添加更多预警规则类型 # 技术指标预警 def add_technical_alert(self, symbol: str, indicator: str, condition: str, value: float): """添加技术指标预警""" pass # 实现MACD、RSI、布林带等指标预警 # 成交量异常预警 def add_volume_alert(self, symbol: str, volume_multiplier: float = 2.0): """添加成交量异常预警""" pass 2. 实现数据持久化和分析 def save_tick_data_to_database(self, tick_data: Dict): """保存tick数据到数据库""" conn = sqlite3.connect("tick_data.db") cursor = conn.cursor() cursor.execute(''' INSERT INTO tick_data (symbol, price, volume, timestamp, bid, ask, bid_size, ask_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( tick_data['symbol'], tick_data['price'], tick_data['volume'], tick_data['timestamp'], tick_data['bid'], tick_data['ask'], tick_data['bid_size'], tick_data['ask_size'] )) conn.commit() conn.close() 3. 添加 Web 界面(使用 Flask 或 FastAPI) # web_interface.py from flask import Flask, render_template, request, jsonify app = Flask(__name__) @app.route('/') def dashboard(): """监控仪表板""" return render_template('dashboard.html') @app.route('/api/alerts', methods=['GET']) def get_alerts(): """获取预警列表API""" # 从数据库查询预警 return jsonify({"alerts": []}) @app.route('/api/add_alert', methods=['POST']) def add_alert(): """添加预警规则API""" rule_data = request.json # 添加到预警引擎 return jsonify({"success": True}) 部署和运维建议 服务器选择:使用云服务器(AWS EC2、阿里云 ECS)确保 24 小时运行 进程管理:使用 Supervisor 或 systemd 管理进程 日志监控:配置日志轮转和监控告警 备份策略:定期备份数据库和配置文件 安全考虑: API 密钥使用环境变量存储 数据库加密 定期更新依赖库 故障排除和调试 # debug_utils.py def check_api_connection(): """检查API连接状态""" pass def simulate_tick_data(): """模拟tick数据用于测试""" pass def validate_alert_rules(): """验证预警规则配置""" pass 总结 通过本教程,你已经学会了: 如何使用 Python 连接实时股票 API 构建一个可扩展的预警规则引擎 实现多种通知方式(邮件、Telegram 等) 设计一个完整的股票监控系统架构 下一步改进方向: 性能优化:使用异步 IO 提高并发处理能力 机器学习集成:添加基于机器学习的智能预警 多市场支持:扩展支持加密货币、外汇等市场 移动应用:开发手机 App 实时接收预警 参考文档:https://blog.itick.org/quant-trading/stock-api-integration-with-telegram-alerts GitHub 项目地址:https://github.com/itick-org/
浏览21
评论0
收藏0
用户头像sh_*219t3e
2025-10-11 发布
亲测最好用的AI编写量化策略工具,可以让 AI 直接写各个平台的策略代码,直接生成可运行的策略代码,代码质量远高于直接使用 DeepSeek、Trae 等平台。 大家可以直接用描述策略,然后一键生成可运行的完整策略代码,也可以把它当做一个API 查询工具。 最新消息,已经支持SuperMind等主流量化平台啦,并且实盘亲测过了,很适合小白用户,上线之后获得了非常多朋友的好评。 **🚀️ AI工具平台:https://iris.findtruman.io/ai/tool/ai-quantitative-trading/**
浏览1789
评论22
收藏5
用户头像sh_*219t3e
2025-11-06 发布
最近我专门针对 Supermind 平台的AI 量化代码生成平台进行了优化改进,现在效果比市面上的 DS、豆包等工具好很多。 👉 SuperMind AI量化代码生成平台 这个工具最大的特点是直接和 AI 对话就能生成完整可运行的Supermind量化策略代码。你不需要懂 Python、C# 或策略 API,只要用自然语言描述你的交易逻辑,比如:“当5日均线向上突破20日均线时买入,反向时卖出。” AI 就会自动帮你生成完整策略代码,并能直接在平台上运行。 相比于通用大模型的输出,这个平台针对量化交易进行了专门优化生成的代码结构更清晰,逻辑更准确,对策略逻辑的理解更接近量化开发者的思路,并且可用作 API 查询或策略自动生成工具 之前上线后,很多朋友反馈代码质量和可运行性都非常高,几乎不需要再手动修改。现在我们的AI量化代码生成平台已经全面支持 Supermind,你可以直接体验。如果你之前在用 DS、豆包等平台,不妨试试看这个版本,可能会刷新你对AI 写量化策略的想象。
浏览1337
评论25
收藏0
用户头像mx_*92566r
2026-01-30 发布
在高频或多品种量化交易的日常研究中,数据的获取方式往往决定了整个策略迭代的效率。过去,我们常通过不同渠道来收集行情数据,但分散的数据源会带来接口不同步、格式不一致、以及历史数据补全困难等问题。 随着交易逻辑和回测模型的复杂度提升,稳定、标准化的数据接口逐渐成为量化研究流程中不可或缺的一环。 数据接口的需求与基础功能 对策略研究者而言,理想的数据接口通常需要满足以下几方面要求: 实时与历史并行:既能获取最新市场行情,也能直接用于历史回测和信号验证。 精度与稳定性:数据粒度足够细、延迟可控,能真实反映市场波动特征。 调用便捷:通过API即可灵活请求,不依赖复杂的数据清洗流程。 以 AllTick API 为例,它覆盖外汇、股票、加密资产等多类市场数据,参数化接口调用方式可以无缝接入到研究框架中,大大减少了手动采集或格式转换的时间成本。 回测环节中的关键痛点 策略回测是量化研究的核心环节,数据的连续性与执行效率直接影响结论的可靠性。我们在实际研究中主要会关注: 数据延迟:数据传输是否足够快,回测环境能否近似实时条件。 历史完整性:历史行情是否存在缺口,撮合价和成交量是否同步。 计算性能:当策略需要处理高频、多资产样本时,回测框架的吞吐效率是否充足。 数据接口能够显著减少这些环节中的摩擦成本。通过结构化数据输入,研究者能快速在不同时间段、不同市场间复现策略表现,并便于做参数敏感性分析。 实战示例:外汇行情的获取与简单回测 以下示例展示使用API获取EUR/USD分钟数据并进行均线策略验证的基本流程。 import requests import pandas as pd import numpy as np import matplotlib.pyplot as plt # 这里定义了API的地址 url = "https://api.alltick.co/v1/forex/quotes" # 请求外汇数据 response = requests.get(url, params={'pair': 'EURUSD', 'interval': '1m'}) data = response.json() # 把数据转换成DataFrame格式,方便后续处理 df = pd.DataFrame(data['quotes']) # 这里做了一个简单的均线策略回测 df['ma_5'] = df['close'].rolling(window=5).mean() df['ma_20'] = df['close'].rolling(window=20).mean() # 策略:当5分钟均线突破20分钟均线时买入 df['signal'] = np.where(df['ma_5'] > df['ma_20'], 1, 0) # 可视化回测结果 plt.figure(figsize=(10, 6)) plt.plot(df['timestamp'], df['close'], label='EUR/USD Close Price') plt.plot(df['timestamp'], df['ma_5'], label='5-period Moving Average') plt.plot(df['timestamp'], df['ma_20'], label='20-period Moving Average') plt.title('EUR/USD Price with Moving Averages') plt.legend() plt.show() 该示例展示了一个常见的流程: 利用API拉取行情 → 进行技术指标计算 → 输出信号 → 在历史样本上验证策略逻辑。 回测效率与可扩展性 在策略开发中,我们通常会进一步关注回测系统的可扩展性。以下做法能显著提升研究流程效率: 数据预处理:拉取后立即去除无效样本、异常点,减少无意义的计算量; 并行化:利用多线程或分布式框架并行执行多参数回测,提高总体吞吐; 计算复杂度控制:在验证初期先采用简化模型,锁定核心逻辑后再扩展精细结构。 结构化的数据接口可作为统一的数据入口,使这些优化策略更易实现,也方便在不同策略间保持一致的数据基线。 总结 一个高效的量化研究体系,往往建立在高质量、可复现的数据基础之上。 通过使用标准化的数据接口,研究者能够降低数据接入与维护成本,把精力集中在模型设计与性能验证上。 在策略日益复杂化的当下,提升数据获取与回测效率,也意味着能更快验证假设、优化模型,并最终加速策略迭代的闭环。
浏览42
评论0
收藏0
用户头像sh_***174w0d
2026-01-30 发布
简介:为何牛市里,多数人依旧亏钱? A股市场从不缺少机会,甚至可以说“隔三差五就有翻倍股”,即便是指数下跌的熊市里也不乏结构性行情。但一个残酷的现实是:为什么在这样一个机会遍地的市场,绝大多数散户依然不赚钱? 或许答案就藏在这句市场老话里: 赚钱的人千篇一律,亏钱的人五花八门。 问题的根源究竟在哪里?这篇文章将为你揭示,所有顶级高手身上都具备的一个共同特征,也是他们在股市里持续盈利的核心心法。 1.核心原罪:不是市场无情,而是你“太贪心” 经过十多年的市场观察,可以发现导致亏损的最主要原因,往往不是技术不行或运气不佳,而是两个字——“太贪心”。 就像最近的行情,商业航天、AI、半导体、有色、金融全面开花,指数也一度冲高到4100点附近。很多人的心态是,看到哪个板块涨就想追哪个,生怕错过任何一个机会,甚至“恨不得把账户里面那几百块钱余额都要找个低价股给买进去”。 这种心态,其实是交易的大忌。我自己也曾走过弯路,对此深有体会: 因为我自己也经过这个暂停也想打,那个主弹呢也想抓的阶段,就是什么都想要,结果什么都得不到。 这段话的意思是,我曾经也想抓住每一个涨停板,参与每一波主升浪,但当你的眼中全是机会,想抓住所有热点时,贪婪就会蒙蔽你的双眼,让你失去最宝贵的判断力和纪律性。最终的结果往往是什么都想抓,却什么都抓不住,甚至被市场反复收割。可以说,如果管不住贪心,给你十个牛市也改变不了亏钱的命运。 2.第一个行动:给你的自选股“断舍离” 那么,如何从行动上对抗“贪心”这个天性呢?一个极其有效且可操作的方法就是:将你的自选池股票数量控制在五个以内。 这个方法看似简单,其背后的逻辑却十分深刻。它不仅仅是为了让你能够“聚焦核心票”,更深层的目的,是在训练你“不贪心”的肌肉。当你主动放弃那些看似诱人的“其他机会”,强迫自己只关注少数几只精选个股时,你的心态会变得更加专注和沉稳。 这其实是一种投资中的生活哲学: 其实炒股的道理和人生是一样的,要有断舍离的智慧,烂人烂事烂股票,该割就割了。 人的精力、时间和福报都是有限的。学会主动减少选择,才能在真正属于你的关键机会出现时,集中全部火力,精准出击。 3.第二个行动:用“耐心”对抗市场浮躁 当你通过“断舍离”战胜了广撒网的贪婪之后,下一个挑战便是如何在一个点上深耕。这就引出了顶级高手的另一个相辅相成的特质:极度的“耐心”。 他们从不被市场的短期波动所干扰,更不会因为某个板块大涨而心浮气躁。他们只会雷打不动地执行自己的交易系统,耐心等待那个完全符合自己系统标准的买点信号出现。 这个信号可以有很多种形式,例如: ●放量突破关键压力位 ●创出新高后的缩量回踩 ●个股走势出现弱转强信号 ●五日线向上穿越20日均线 关键在于,在信号出现之前,“不论市场多亢奋都要管住自己的手”。这种纪律性,是风格不飘移的体现,更是对抗内心贪婪和浮躁的终极防火墙。 耐心不是无所作为的“等死”,而是一种基于策略的主动选择,是等待最佳出击时机的高手智慧。这正是专业交易者与业余散户的根本区别所在。 结语:少即是多,做一个不贪心的人 总结来看,想在股市里持续赚钱,秘诀不在于预测市场的每一次涨跌,而在于向内求索——克制自己的贪婪,通过“断舍离”保持聚焦,并用“耐心”等待属于自己的机会。 成功的投资,最终是一场心性的修炼。 人的能量和福报是有限的,做一个不贪心的人可能会有意想不到的结果。 从今天起,审视一下你的投资组合和交易习惯,你能为自己“断舍离”掉什么?
浏览72
评论0
收藏0
用户头像Jacktick
2026-01-30 发布
在开发交易工具或量化策略时,选择一个靠谱的数据源(Data Provider)往往是第一道坎。市面上的选择浩如烟海,从老牌的 Alpha Vantage 到行业标杆 Polygon.io,各有所长。但在 2026 年的今天,对于独立开发者和中小型量化团队来说,“开发者体验”(DX)和** **“性价比” 正成为选型的决定性因素。 今天,我们站在工程落地的角度,对三款主流美股数据 API 进行一次深度盘点。 1. Polygon.io:行业的“黄金标准” 定位:机构级、低延迟。 优势: Polygon 直接连接美国交易所的数据流(SIP),提供极致的低延迟。其 WebSocket 稳定性极高,几乎是高频交易团队的首选。其 API 文档被誉为行业教科书,规范且详尽。 适用场景: 预算充足、服务器部署在北美(AWS us-east)、对毫秒级延迟极其敏感的机构团队。 考量点:** **价格门槛。如果要获取 Level 2 (盘口深度) 数据或解锁全市场权限,每月的订阅费对于独立开发者来说是一笔不小的开支。 2. Alpha Vantage:经典的入门之选 定位:技术分析、初学者友好。 优势: 它是无数 Python 教程的常客。AV 最大的特色是内置了大量** **技术指标 (Technical Indicators) 计算,比如直接返回 RSI、MACD 的值,省去了开发者在客户端手写公式的麻烦。 适用场景: 做策略回测、技术分析研究、不需要高频实盘数据的学生或研究员。 考量点: 近年来其免费版的限流策略(Rate Limit)日益严格,且主要侧重于日线/分钟线级别的聚合数据,在** **Tick 级实盘推送 能力上相对较弱。 3. TickDB:专为开发者打造的“全能新秀” 定位:高性价比、全球聚合、极客友好。 优势: TickDB 是近年在 GitHub 社区活跃起来的新兴力量,其架构设计非常符合现代全栈开发者的直觉。 All-in-One (万能转接头):它打破了市场壁垒。你只需维护一套代码,就能同时接入** ****美股 (US)、港股 (HK)、加密货币 (CRYPTO) 和 ****外汇 (FOREX)**。对于做跨市场套利的团队来说,这能极大降低系统复杂度。 极简集成 (RESTful):如果你喜欢 Polygon 的设计风格,你会对 TickDB 感到亲切。标准的 JSON 格式,不依赖臃肿的 SDK。 亚洲优化:针对亚洲地区(中国大陆、香港、新加坡)的开发者,TickDB 优化了边缘节点的连接速度,缓解了跨洋传输的高延迟痛点。 下放高级权益:它向普通开发者开放了** Level 2 (订单簿深度) 和 **WebSocket 推送,这在其他平台通常是企业级套餐的专属。 💻 代码体验:Talk is Cheap TickDB 的接入方式非常 "Pythonic",没有任何多余的动作。 注意:与部分 API 不同,TickDB 的聚合查询参数名为复数 symbols ,这允许你一次请求同时拉取** AAPL.US 和 **BTCUSDT 的最新报价。 import requests # 目标:获取 AAPL (美股) 和 BTC (加密货币) 的实时快照 url = "https://api.tickdb.ai/v1/market/ticker" # ✅ 关键点:参数名为 'symbols' (复数),支持逗号分隔 params = { "symbols": "AAPL.US,BTCUSDT" } # 🔑 极简鉴权:只需 Header 带个 Key headers = { "X-API-Key": "YOUR_REAL_KEY" } try: resp = requests.get(url, headers=headers, params=params) data = resp.json() if data['code'] == 0: for item in data['data']: print(f"Symbol: {item['symbol']}, Price: {item['price']}") else: print(f"Error: {data['message']}") except Exception as e: print(f"Request failed: {e}") (代码逻辑基于 TickDB OpenAPI v1.0.0 标准校验) 📊 选型建议 需求场景 推荐 API 核心理由 机构高频 / 就在华尔街 Polygon.io 物理距离最近,SIP 直连,不差钱。 纯技术指标研究 / 教学 Alpha Vantage 内置指标丰富,教程多。 独立开发 / 量化实盘 / 跨市场 TickDB 性价比之王。一套接口搞定美股+Crypto,且支持 WebSocket 实盘与 L2 深度。
浏览34
评论0
收藏0