导入模块
import numpy as np
import pandas as pd
import datetime as dt
from collections import defaultdict
import random
def init(context):
"""
增强科技版PEG策略 v3.0 - 全面升级
"""
当前持仓数
context.num = 0
最大持仓股票数:增加到15个
context.stock_max_num = 15
最小持仓股票数:8个
context.stock_min_num = 8
已持仓天数
context.hold_days = 0
持仓股票跟踪信息
context.holdings_info = {}
策略版本标记
context.strategy_version = "3.0_Enhanced_Tech_Expanded"
交易参数
context.trade_params = {
'min_stock_price': 5, # 最低价要求5元,过滤低价股
'max_stock_price': 500, # 最高价限制适当降低
'target_position': 0.90, # 目标仓位90%
'stop_loss_pct': 0.10, # 止损比例10%
'take_profit_pct': 0.30, # 止盈比例30%
'trailing_stop_pct': 0.15, # 移动止盈15%
'max_drawdown_pct': 0.20, # 最大回撤放大到20%
'force_trade': True,
'min_position_value': 50000, # 最小持仓市值
}
科技股重点配置参数 - 放宽限制
context.tech_focus_params = {
'tech_weight': 0.65, # 科技股总权重65%
'star_market_weight': 0.20, # 科创板权重20%
'gem_weight': 0.30, # 创业板权重30%
'beijing_weight': 0.15, # 京交所权重15%
}
板块配置(根据市场环境动态调整)
context.sector_weights = {
'technology': 0.60, # 科技(60%)
'consumer': 0.10, # 消费(10%)
'healthcare': 0.10, # 医疗(10%)
'finance': 0.05, # 金融(5%)
'energy': 0.05, # 能源(5%)
'military': 0.10, # 军工(10%)
}
各板块默认权重(用于动态调整)
context.default_sector_weights = context.sector_weights.copy()
科创板和创业板股票池
context.tech_board_stocks = {
'star_market': [], # 科创板
'gem': [], # 创业板
'beijing': [] # 京交所
}
科技龙头股池(扩展版)
context.tech_leaders = [
'300750.SZ', # 宁德时代
'002415.SZ', # 海康威视
'000977.SZ', # 浪潮信息
'603259.SH', # 药明康德
'300059.SZ', # 东方财富
'300124.SZ', # 汇川技术
'002475.SZ', # 立讯精密
'002049.SZ', # 紫光国微
'603501.SH', # 韦尔股份
'688981.SH', # 中芯国际
'300014.SZ', # 亿纬锂能
'300782.SZ', # 卓胜微
'688111.SH', # 金山办公
'688036.SH', # 传音控股
'688012.SH', # 中微公司
]
金融、白酒、黄金股池(用于熊市防御)
context.defensive_stocks = {
'finance': ['600036.SH', '601318.SH', '601166.SH', '000001.SZ'], # 招商银行、中国平安、兴业银行、平安银行
'liquor': ['600519.SH', '000858.SZ', '002304.SZ'], # 贵州茅台、五粮液、洋河股份
'gold': ['600547.SH', '002155.SZ', '600489.SH'], # 山东黄金、湖南黄金、中金黄金
}
军工股池(地缘政治紧张时配置)
context.military_stocks = [
'600760.SH', # 中航沈飞
'000768.SZ', # 中航西飞
'002179.SZ', # 中航光电
'600893.SH', # 航发动力
'000738.SZ', # 航发控制
'600316.SH', # 洪都航空
'002389.SZ', # 航天彩虹
'600372.SH', # 中航电子
'000547.SZ', # 航天发展
'600685.SH', # 中船防务
]
市场状态跟踪
context.market_state = {
'trend': 'neutral',
'volatility': 'normal',
'tech_sector_performance': 0.0,
'defensive_sector_performance': 0.0,
'market_sentiment': 0.5, # 0-1,越高越乐观
'geopolitical_tension': 0.3, # 地缘政治紧张程度 0-1
}
国际政治形势研判
context.geopolitical_analysis = {
'taiwan_strait': 0.3, # 台海局势紧张程度 0-1
'asia_pacific': 0.4, # 亚太局势紧张程度 0-1
'us_china_relations': 0.5, # 中美关系紧张程度 0-1
}
研报数据模拟(实际应用中应从数据库获取)
context.research_reports = defaultdict(lambda: {
'rating_score': random.uniform(0.5, 1.0), # 研报评级得分
'target_price_premium': random.uniform(-0.1, 0.3), # 目标价溢价
'analyst_coverage': random.randint(1, 10), # 分析师覆盖数量
})
政策扶持领域配置
context.policy_support_areas = {
# 新一代信息技术
'new_generation_it': ['半导体', '芯片', '集成电路', '人工智能', 'AI', '云计算', '大数据',
'物联网', '5G', '6G', '工业互联网', '区块链', '量子技术'],
# 高端装备制造
'high_end_equipment': ['机器人', '智能制造', '数控机床', '航空航天', '卫星', '海洋工程',
'轨道交通', '高端装备', '精密制造'],
# 新材料
'new_materials': ['新材料', '石墨烯', '碳纤维', '纳米材料', '先进陶瓷', '特种金属'],
# 新能源
'new_energy': ['新能源', '光伏', '风电', '储能', '锂电池', '氢能', '核能',
'新能源汽车', '充电桩', '智能电网'],
# 生物医药
'biomedicine': ['生物医药', '创新药', '医疗器械', '基因技术', '细胞治疗', '疫苗',
'中医药', '医疗健康'],
# 当前政策热点
'current_hotspots': ['数字经济', '东数西算', '数据要素', '算力', '信创',
'一带一路', '中特估', '国企改革', '乡村振兴', '专精特新']
}
科创板/京交所特殊政策
context.special_board_policy = {
'star_market': ['硬科技', '科技创新', '前沿技术', '突破卡脖子'],
'beijing': ['专精特新', '小巨人', '创新层', '北交所']
}
记录日志
log.info("=" * 70)
log.info("🚀 增强科技版PEG策略 v3.0")
log.info(f"📊 最大持仓: {context.stock_max_num}只,最小持仓: {context.stock_min_num}只")
log.info(f"📈 科技股配置: {context.tech_focus_params['tech_weight']:.0%}")
log.info("🎯 策略重点: 科技成长股 + 科创/创业/京交所")
log.info("⚔️ 新增: 军工板块 + 地缘政治研判")
log.info("⚖️ 风险控制: 止损10% + 止盈30% + 最大回撤20%")
log.info("=" * 70)
日级回测模式下使用简化的run_daily
run_daily(fun_main)
========== A股正股过滤函数 ==========
def filter_a_shares(stock_list):
"""过滤A股正股,剔除非股票证券 - 简化版"""
a_shares = []
for stock in stock_list:
try:
# 检查代码格式
if not stock or '.' not in stock:
continue
code_part, exchange_part = stock.split('.')
# 只保留上交所、深交所和北交所
if exchange_part not in ['SH', 'SZ', 'BJ']:
continue
# 检查是否为6位数字代码
if not code_part.isdigit() or len(code_part) != 6:
continue
# 根据交易所和代码前缀判断
if exchange_part == 'SH':
# 上交所A股:600,601,603,605,688开头
if code_part.startswith(('600', '601', '603', '605', '688')):
a_shares.append(stock)
elif exchange_part == 'SZ':
# 深交所A股:000,001,002,003,300开头
if code_part.startswith(('000', '001', '002', '003', '300')):
a_shares.append(stock)
elif exchange_part == 'BJ':
# 北交所股票:43,83,87开头
if code_part.startswith(('43', '83', '87')):
a_shares.append(stock)
except:
continue
return a_shares
def is_a_share(stock):
"""判断是否为A股正股"""
try:
if not stock or '.' not in stock:
return False
code_part, exchange_part = stock.split('.')
# 上交所A股
if exchange_part == 'SH':
return code_part.startswith(('600', '601', '603', '605', '688'))
# 深交所A股
elif exchange_part == 'SZ':
return code_part.startswith(('000', '001', '002', '003', '300'))
# 北交所
elif exchange_part == 'BJ':
return code_part.startswith(('43', '83', '87'))
return False
except:
return False
========== 主交易函数 ==========
def fun_main(context, bar_dict):
"""主交易函数 - 每日开盘时运行"""
try:
记录上一交易日结束的情况
log_previous_day(context, bar_dict)
# 更新市场状态和国际政治形势
update_market_state(context)
update_geopolitical_analysis(context)
# 根据市场状态动态调整板块权重
adjust_sector_weights_based_on_market(context)
# 更新持仓信息
update_holdings_info(context, bar_dict)
# 检查是否需要卖出(基于价格和回撤)
stocks_to_sell = check_sell_conditions_v3(context, bar_dict)
# 卖出触发条件的股票
if stocks_to_sell:
sell_stocks(context, bar_dict, stocks_to_sell)
# 判断是否需要调仓(放宽科技股调仓条件)
flag = fun_needRebalance_v3(context)
# 如果持仓不足最小数量或需要调仓
if flag or len(context.portfolio.positions) < context.stock_min_num:
last_date = get_last_datetime().strftime('%Y%m%d')
# 1. 获取全市场股票并过滤为A股正股
log.info("🔍 开始获取全市场股票...")
try:
all_stocks_df = get_all_securities(date=last_date)
all_stocks = list(all_stocks_df.index)
log.info(f"📊 获取到全市场证券: {len(all_stocks)}只")
# 过滤为A股正股
all_a_shares = filter_a_shares(all_stocks)
log.info(f"📊 过滤后A股正股: {len(all_a_shares)}只")
if len(all_a_shares) == 0:
# 如果过滤后为0,尝试简化过滤条件
log.info("⚠️ 过滤后股票为0,尝试简化过滤条件")
all_a_shares = [s for s in all_stocks if '.' in s and
(s.endswith('.SH') or s.endswith('.SZ') or s.endswith('.BJ'))]
log.info(f"📊 简化过滤后A股: {len(all_a_shares)}只")
except Exception as e:
log.error(f"获取全市场股票失败: {e}")
all_a_shares = []
if len(all_a_shares) == 0:
log.error("无法获取A股正股,跳过本次调仓")
return
# 2. 获取科创板和创业板、京交所股票
update_tech_boards_v3(context, all_a_shares)
# 3. 初步筛选(科技股优先)
log.info("🔍 开始初步筛选...")
filtered_stocks = preliminary_screening_with_tech_focus_v3(context, all_a_shares, bar_dict, last_date)
log.info(f"📊 初步筛选后: {len(filtered_stocks)}只")
if len(filtered_stocks) < 30:
log.info("⚠️ 初步筛选后股票太少,放宽条件")
filtered_stocks = relaxed_screening_v3(context, all_a_shares, bar_dict, last_date)
log.info(f"📊 放宽筛选后: {len(filtered_stocks)}只")
# 4. 如果筛选后股票不足,使用科技龙头股池和防御股池
if len(filtered_stocks) < 20:
log.info("⚠️ 筛选后股票不足,使用备用股池")
filtered_stocks = supplement_with_backup_pool(context, filtered_stocks)
log.info(f"📊 补充后: {len(filtered_stocks)}只")
# 5. 深度筛选(科技股优先)
log.info("🔍 开始深度筛选...")
if len(filtered_stocks) > 0:
buy_list = tech_focused_selection_v3(context, bar_dict, filtered_stocks, last_date)
# 如果科技股不足,根据市场状态补充其他板块
if len(buy_list) < context.stock_min_num:
log.info(f"⚠️ 科技选股仅选出{len(buy_list)}只,补充其他板块")
buy_list = supplement_based_on_market_state(context, buy_list, filtered_stocks, last_date)
else:
buy_list = []
# 6. 最终调整
if len(buy_list) < context.stock_min_num:
log.info(f"⚠️ 选股数量不足{context.stock_min_num}只,使用紧急选股方案")
buy_list = emergency_selection_v3(context, bar_dict, last_date)
# 限制持仓数量
if len(buy_list) > context.stock_max_num:
buy_list = buy_list[:context.stock_max_num]
# 排除已持仓的股票
current_positions = list(context.portfolio.positions.keys())
buy_list = [stock for stock in buy_list if stock not in current_positions]
# 记录选股结果
log.info(f"🎯 最终选股 ({len(buy_list)}只):")
sector_count = defaultdict(int)
for i, stock in enumerate(buy_list):
try:
stock_name = get_security_info(stock).display_name
sector = get_stock_sector_v3(context, stock)
sector_count[sector] += 1
log.info(f" {i+1}. {stock} ({stock_name[:10]}) [{sector}]")
except:
log.info(f" {i+1}. {stock}")
# 统计板块分布
log.info("📊 板块分布:")
for sector, count in sector_count.items():
log.info(f" {sector}: {count}只 ({count/len(buy_list)*100:.0f}%)")
# 7. 计算权重(重点配置科技股)
stock_weight = calculate_tech_focused_weights_v3(context, buy_list, bar_dict, last_date)
# 8. 计算最终权重
trade_ratio = fun_cal_position_with_risk_v3(context, bar_dict, stock_weight)
# 记录交易权重
log.info('📊 交易权重:')
total_weight = 0
for stock, weight in trade_ratio.items():
total_weight += weight
try:
stock_name = get_security_info(stock).display_name[:10]
sector = get_stock_sector_v3(context, stock)
log.info(f" {stock[:6]} ({stock_name}): {weight:.2%} [{sector}]")
except:
log.info(f" {stock[:6]}: {weight:.2%}")
log.info(f"📊 总仓位: {total_weight:.2%}")
# 9. 执行交易
fun_do_trade_v3(context, bar_dict, trade_ratio)
# 初始化新买入股票的持仓信息
for stock in trade_ratio:
if stock not in context.holdings_info:
init_holding_info(context, stock, bar_dict[stock].last)
# 更新持仓数量
context.num = len(context.portfolio.positions)
# 记录当前持仓情况
log_current_holdings_v3(context)
except Exception as e:
log.error(f"主函数执行错误: {e}")
========== 市场状态和地缘政治函数 ==========
def update_market_state(context):
"""更新市场状态"""
try:
检查科技板块表现
tech_indices = ['399006.SZ', '000688.SH'] # 创业板指,科创50
tech_performance = 0.0
for index in tech_indices:
try:
prices = get_price(index, count=5, end_date=get_last_datetime(),
fields=['close'], skip_paused=True)
if len(prices) >= 2:
perf = (prices['close'].iloc[-1] / prices['close'].iloc[-2] - 1) * 100
tech_performance += perf
except:
pass
context.market_state['tech_sector_performance'] = tech_performance / len(tech_indices)
# 检查防御板块表现
defensive_indices = ['000001.SH', '000300.SH'] # 上证指数,沪深300
defensive_performance = 0.0
for index in defensive_indices:
try:
prices = get_price(index, count=5, end_date=get_last_datetime(),
fields=['close'], skip_paused=True)
if len(prices) >= 2:
perf = (prices['close'].iloc[-1] / prices['close'].iloc[-2] - 1) * 100
defensive_performance += perf
except:
pass
context.market_state['defensive_sector_performance'] = defensive_performance / len(defensive_indices)
# 判断市场趋势
if tech_performance > 2.0:
context.market_state['trend'] = 'tech_bull'
context.market_state['market_sentiment'] = 0.7
elif tech_performance < -2.0 and defensive_performance > 0:
context.market_state['trend'] = 'tech_bear_defensive_bull'
context.market_state['market_sentiment'] = 0.3
elif tech_performance < -3.0 and defensive_performance < -1.0:
context.market_state['trend'] = 'bear_market'
context.market_state['market_sentiment'] = 0.2
else:
context.market_state['trend'] = 'neutral'
context.market_state['market_sentiment'] = 0.5
except Exception as e:
log.info(f"更新市场状态失败: {e}")
def update_geopolitical_analysis(context):
"""更新地缘政治分析"""
try:
模拟地缘政治紧张程度的变化
context.geopolitical_analysis['taiwan_strait'] = min(1.0, max(0,
context.geopolitical_analysis['taiwan_strait'] + random.uniform(-0.05, 0.05)))
context.geopolitical_analysis['asia_pacific'] = min(1.0, max(0,
context.geopolitical_analysis['asia_pacific'] + random.uniform(-0.03, 0.03)))
context.geopolitical_analysis['us_china_relations'] = min(1.0, max(0,
context.geopolitical_analysis['us_china_relations'] + random.uniform(-0.04, 0.04)))
# 综合地缘政治紧张程度
avg_tension = (context.geopolitical_analysis['taiwan_strait'] +
context.geopolitical_analysis['asia_pacific'] +
context.geopolitical_analysis['us_china_relations']) / 3
context.market_state['geopolitical_tension'] = avg_tension
# 记录地缘政治状态
if avg_tension > 0.6:
log.info(f"⚠️ 地缘政治紧张程度: {avg_tension:.1%} (高)")
elif avg_tension > 0.4:
log.info(f"📊 地缘政治紧张程度: {avg_tension:.1%} (中)")
else:
log.info(f"✅ 地缘政治紧张程度: {avg_tension:.1%} (低)")
except Exception as e:
log.info(f"更新地缘政治分析失败: {e}")
def adjust_sector_weights_based_on_market(context):
"""根据市场状态动态调整板块权重"""
重置为默认权重
context.sector_weights = context.default_sector_weights.copy()
根据市场趋势调整
if context.market_state['trend'] == 'tech_bull':
# 科技牛市中增加科技股权重
context.sector_weights['technology'] = min(0.75, context.sector_weights['technology'] + 0.15)
# 减少防御板块权重
context.sector_weights['finance'] = max(0.02, context.sector_weights['finance'] - 0.03)
context.sector_weights['consumer'] = max(0.05, context.sector_weights['consumer'] - 0.05)
elif context.market_state['trend'] == 'tech_bear_defensive_bull':
# 科技熊市,防御板块走强
context.sector_weights['technology'] = max(0.40, context.sector_weights['technology'] - 0.20)
context.sector_weights['finance'] = min(0.15, context.sector_weights['finance'] + 0.10)
context.sector_weights['consumer'] = min(0.20, context.sector_weights['consumer'] + 0.10)
elif context.market_state['trend'] == 'bear_market':
# 全面熊市,增加防御板块
context.sector_weights['technology'] = max(0.30, context.sector_weights['technology'] - 0.30)
context.sector_weights['finance'] = min(0.20, context.sector_weights['finance'] + 0.15)
context.sector_weights['consumer'] = min(0.25, context.sector_weights['consumer'] + 0.15)
context.sector_weights['energy'] = min(0.10, context.sector_weights['energy'] + 0.05)
根据地缘政治紧张程度调整军工权重
if context.market_state['geopolitical_tension'] > 0.6:
# 高度紧张,大幅增加军工权重
context.sector_weights['military'] = min(0.25, context.sector_weights['military'] + 0.15)
# 从科技和消费中减少权重
context.sector_weights['technology'] = max(0.50, context.sector_weights['technology'] - 0.10)
context.sector_weights['consumer'] = max(0.05, context.sector_weights['consumer'] - 0.05)
elif context.market_state['geopolitical_tension'] > 0.4:
# 中度紧张,适当增加军工权重
context.sector_weights['military'] = min(0.15, context.sector_weights['military'] + 0.05)
确保权重总和为1
total_weight = sum(context.sector_weights.values())
if total_weight != 1.0:
for sector in context.sector_weights:
context.sector_weights[sector] /= total_weight
记录调整后的权重
log.info("📊 动态板块权重:")
for sector, weight in context.sector_weights.items():
log.info(f" {sector}: {weight:.1%}")
========== 股票筛选和评分函数 ==========
def preliminary_screening_with_tech_focus_v3(context, stock_list, bar_dict, last_date):
"""初步筛选 v3 - 只处理A股正股"""
filtered = []
首先确保只处理A股正股
a_shares = filter_a_shares(stock_list)
for i, stock in enumerate(a_shares):
if i >= 3000: # 限制处理数量,但确保能覆盖所有A股
break
try:
# 检查是否停牌
if bar_dict[stock].is_paused:
continue
# 检查价格
price = bar_dict[stock].last
if price < context.trade_params['min_stock_price'] or price > context.trade_params['max_stock_price']:
continue
# 检查ST
try:
name = get_security_info(stock).display_name
if name and ('ST' in name or '*ST' in name or '退' in name):
continue
except:
pass
# 优先纳入科技股和京交所股票
is_priority = False
if (stock.startswith('688') or stock.startswith('300') or
stock.startswith('83') or stock.startswith('87') or stock.startswith('43') or
stock.endswith('.BJ')):
is_priority = True
elif stock in context.tech_leaders:
is_priority = True
elif stock in context.military_stocks:
is_priority = True
# 如果是优先股,直接加入
if is_priority:
filtered.append(stock)
continue
# 非优先股需要额外检查
try:
# 检查市值
q = query(valuation.market_cap).filter(valuation.code == stock)
df = get_fundamentals(q, date=last_date)
if not df.empty:
market_cap = df['market_cap'].iloc[0]
if market_cap < 2000000000: # 20亿市值以下过滤
continue
else:
continue
filtered.append(stock)
except Exception as e:
# 如果查询失败,仍然加入,后面评分会处理
filtered.append(stock)
continue
except Exception as e:
continue
return filtered
def tech_focused_selection_v3(context, bar_dict, stock_list, last_date):
"""科技股优先选股 v3 - 简化版评分"""
tech_stocks = []
military_stocks = []
other_stocks = []
分类筛选
for stock in stock_list:
try:
# 判断是否为科技股
if (stock.startswith('688') or stock.startswith('300') or
stock.startswith('83') or stock.startswith('87') or stock.startswith('43') or
stock.endswith('.BJ') or stock in context.tech_leaders):
tech_stocks.append(stock)
elif stock in context.military_stocks:
military_stocks.append(stock)
else:
other_stocks.append(stock)
except:
continue
log.info(f"📊 识别出科技股: {len(tech_stocks)}只,军工股: {len(military_stocks)}只,其他股票: {len(other_stocks)}只")
简化评分:科技股优先选择
selected_stocks = []
科技股(简化选择,不进行复杂评分)
tech_target = min(int(context.sector_weights['technology'] * context.stock_max_num), len(tech_stocks))
selected_tech = tech_stocks[:tech_target]
selected_stocks.extend(selected_tech)
log.info(f"🎯 选中科技股: {len(selected_tech)}只")
军工股(根据地缘政治紧张程度调整)
if context.market_state['geopolitical_tension'] > 0.4:
military_target = min(int(context.sector_weights['military'] * context.stock_max_num), len(military_stocks))
selected_military = military_stocks[:military_target]
selected_stocks.extend(selected_military)
log.info(f"🎯 选中军工股: {len(selected_military)}只")
其他板块(消费、医疗、金融、能源)
current_count = len(selected_stocks)
other_target = max(context.stock_min_num - current_count, 0)
if other_target > 0:
# 根据市场状态选择防御性或进攻性股票
if context.market_state['trend'] in ['tech_bear_defensive_bull', 'bear_market']:
# 熊市中选择防御性股票
defensive_stocks = get_defensive_stocks(context, other_target)
selected_stocks.extend(defensive_stocks)
log.info(f"🎯 选中防御性股票: {len(defensive_stocks)}只")
else:
# 正常市中选择其他优质股票
selected_other = other_stocks[:other_target]
selected_stocks.extend(selected_other)
log.info(f"🎯 选中其他股票: {len(selected_other)}只")
去重
selected_stocks = list(set(selected_stocks))
log.info(f"📊 最终选股结果: {len(selected_stocks)}只")
return selected_stocks
def calculate_stock_score_simple(context, stock, last_date):
"""简化版股票评分 - 只考虑技术面和政策扶持"""
score = 0
try:
# 1. 技术面指标
prices = get_price(stock, end_date=last_date, count=40,
fields=['close', 'volume'], skip_paused=True)
if len(prices) >= 20:
closes = prices['close'].values
# 动量得分
if len(closes) >= 20:
momentum = (closes[-1] / closes[-20] - 1) * 100
if momentum > 10:
score += 30
elif momentum > 5:
score += 20
elif momentum > 0:
score += 10
elif momentum > -5:
score += 5
# 成交量得分
volumes = prices['volume'].values
if len(volumes) >= 10:
volume_ratio = volumes[-1] / np.mean(volumes[-10:])
if volume_ratio > 1.5:
score += 20
elif volume_ratio > 1.2:
score += 10
# 2. 板块和交易所加分
if stock.startswith('688'): # 科创板
score += 25 # 提高科创板加分
elif stock.startswith('300'): # 创业板
score += 20
elif stock.startswith('83') or stock.startswith('87') or stock.startswith('43') or stock.endswith('.BJ'): # 京交所
score += 30 # 提高京交所加分
# 3. 是否为龙头股(额外加分)
if stock in context.tech_leaders:
score += 25
if stock in context.military_stocks:
score += 20
except Exception as e:
log.debug(f"简化评分{stock}失败: {e}")
return 0
return score
def get_defensive_stocks(context, count):
"""获取防御性股票"""
defensive_stocks = []
根据市场状态选择防御性板块
if context.market_state['trend'] == 'tech_bear_defensive_bull':
# 科技熊市但防御板块走强,增加金融和消费
defensive_stocks.extend(context.defensive_stocks['finance'][:2])
defensive_stocks.extend(context.defensive_stocks['liquor'][:2])
elif context.market_state['trend'] == 'bear_market':
# 全面熊市,增加金融、消费、黄金
defensive_stocks.extend(context.defensive_stocks['finance'][:2])
defensive_stocks.extend(context.defensive_stocks['liquor'][:1])
defensive_stocks.extend(context.defensive_stocks['gold'][:2])
如果数量不足,添加一些其他防御性股票
if len(defensive_stocks) < count:
# 添加一些公用事业、必需消费等防御性股票
additional_defensive = ['600028.SH', '601857.SH', '600104.SH', '600887.SH'] # 中石化、中石油、上汽、伊利
defensive_stocks.extend(additional_defensive[:count - len(defensive_stocks)])
return defensive_stocks[:count]
def supplement_based_on_market_state(context, current_list, all_stocks, last_date):
"""根据市场状态补充股票"""
selected = list(current_list)
if len(selected) >= context.stock_max_num:
return selected[:context.stock_max_num]
需要补充的数量
need_count = max(context.stock_min_num - len(selected), 0)
if need_count == 0:
return selected
根据市场趋势选择补充方向
if context.market_state['trend'] == 'tech_bull':
# 科技牛市中继续补充科技股
log.info("📈 科技牛市中,补充科技股")
tech_candidates = [s for s in all_stocks if s.startswith(('688', '300', '83', '87', '43')) or s.endswith('.BJ')]
tech_candidates = [s for s in tech_candidates if s not in selected]
# 简化选择
supplement = tech_candidates[:need_count]
elif context.market_state['trend'] in ['tech_bear_defensive_bull', 'bear_market']:
# 熊市中补充防御性股票
log.info("📉 熊市中,补充防御性股票")
supplement = get_defensive_stocks(context, need_count)
# 过滤掉已在列表中的股票
supplement = [s for s in supplement if s not in selected]
else:
# 中性市场中补充其他优质股票
log.info("📊 中性市场中,补充其他优质股票")
other_candidates = [s for s in all_stocks if s not in selected]
# 简化选择
supplement = other_candidates[:need_count]
selected.extend(supplement)
return selected[:context.stock_max_num]
def calculate_tech_focused_weights_v3(context, stock_list, bar_dict, last_date):
"""计算科技股重点配置的权重 v3 - 简化版"""
if len(stock_list) == 0:
return {}
weights = {}
基础权重
base_weight = 1.0 / len(stock_list)
根据市场状态调整
market_factor = 1.0
if context.market_state['trend'] == 'tech_bull':
market_factor = 1.2 # 科技股牛市,增加权重
elif context.market_state['trend'] == 'tech_bear_defensive_bull':
market_factor = 0.8 # 科技股熊市,降低科技股权重
根据地缘政治紧张程度调整
if context.market_state['geopolitical_tension'] > 0.6:
market_factor *= 0.9 # 高度紧张时降低总体仓位
根据个股属性调整权重
for stock in stock_list:
weight = base_weight * market_factor
# 根据板块调整权重
sector = get_stock_sector_v3(context, stock)
if sector == 'star_market': # 科创板
weight *= 1.5
elif sector == 'gem': # 创业板
weight *= 1.4
elif sector == 'beijing': # 京交所
weight *= 1.6 # 京交所权重更高
elif sector == 'technology': # 其他科技股
weight *= 1.3
elif sector == 'military': # 军工股
# 根据地缘政治紧张程度调整军工股权重
tension_factor = 1.0 + context.market_state['geopolitical_tension'] * 0.5
weight *= tension_factor
elif sector in ['finance', 'consumer']: # 防御性板块
# 在熊市中增加防御性板块权重
if context.market_state['trend'] in ['tech_bear_defensive_bull', 'bear_market']:
weight *= 1.3
# 是否为龙头股
if stock in context.tech_leaders:
weight *= 1.4
if stock in context.military_stocks:
weight *= 1.3
# 单只股票最大权重限制
weight = min(weight, 0.25) # 不超过25%
weights[stock] = weight
归一化权重
total_weight = sum(weights.values())
if total_weight > 0:
weights = {k: v/total_weight for k, v in weights.items()}
return weights
def get_stock_sector_v3(context, stock):
"""获取股票所属板块 v3 - 简化版"""
try:
if '.' in stock:
code_part, exchange_part = stock.split('.')
else:
code_part = stock
exchange_part = ''
# 京交所股票
if code_part.startswith(('83', '87', '43')) or stock.endswith('.BJ'):
return 'beijing'
# 科创板
elif code_part.startswith('688') and exchange_part == 'SH':
return 'star_market'
# 创业板
elif code_part.startswith('300') and exchange_part == 'SZ':
return 'gem'
# 科技龙头股
elif stock in context.tech_leaders:
return 'technology'
# 军工股
elif stock in context.military_stocks:
return 'military'
# 金融股
elif stock in context.defensive_stocks['finance']:
return 'finance'
# 白酒股
elif stock in context.defensive_stocks['liquor']:
return 'consumer'
# 黄金股
elif stock in context.defensive_stocks['gold']:
return 'energy'
# 根据代码前缀判断主板
if exchange_part == 'SH':
if code_part.startswith(('600', '601', '603', '605')):
return 'main_board'
elif exchange_part == 'SZ':
if code_part.startswith(('000', '001', '002', '003')):
return 'sme_board'
return 'other'
except:
return 'other'
========== 持仓管理函数 ==========
def update_holdings_info(context, bar_dict):
"""更新持仓股票信息"""
for stock in context.portfolio.positions:
if stock not in context.holdings_info:
position = context.portfolio.positions[stock]
buy_price = get_position_cost(position, bar_dict[stock].last)
init_holding_info(context, stock, buy_price)
current_price = bar_dict[stock].last
position = context.portfolio.positions[stock]
buy_price = get_position_cost(position, current_price)
info = context.holdings_info[stock]
if info['buy_price'] <= 0 or (buy_price > 0 and info['buy_price'] != buy_price):
info['buy_price'] = buy_price
if info['highest_price'] <= buy_price:
info['highest_price'] = buy_price
# 更新最高价
info['highest_price'] = max(info['highest_price'], current_price)
# 计算收益
if info['buy_price'] > 0:
info['return_pct'] = (current_price - info['buy_price']) / info['buy_price'] * 100
else:
info['return_pct'] = 0
# 计算回撤
if info['highest_price'] > 0:
info['drawdown_pct'] = (info['highest_price'] - current_price) / info['highest_price'] * 100
else:
info['drawdown_pct'] = 0
# 更新持仓天数
info['hold_days'] += 1
# 更新最新价格
info['last_price'] = current_price
def get_position_cost(position, current_price):
"""获取持仓成本 - 兼容不同平台的属性名"""
try:
if hasattr(position, 'avg_cost') and position.avg_cost > 0:
return position.avg_cost
elif hasattr(position, 'cost_price') and position.cost_price > 0:
return position.cost_price
elif hasattr(position, 'avg_price') and position.avg_price > 0:
return position.avg_price
elif hasattr(position, 'cost') and position.cost > 0:
return position.cost
else:
return current_price
except:
return current_price
def init_holding_info(context, stock, buy_price):
"""初始化持仓信息"""
context.holdings_info[stock] = {
'buy_price': buy_price if buy_price > 0 else 1.0,
'highest_price': buy_price if buy_price > 0 else 1.0,
'last_price': buy_price if buy_price > 0 else 1.0,
'return_pct': 0,
'drawdown_pct': 0,
'hold_days': 0,
'buy_date': dt.datetime.now().strftime('%Y-%m-%d')
}
def check_sell_conditions_v3(context, bar_dict):
"""检查卖出条件 v3 - 放宽科技股持有条件"""
stocks_to_sell = []
for stock in context.portfolio.positions:
if stock in context.holdings_info:
info = context.holdings_info[stock]
# 判断是否为科技股
is_tech = (stock.startswith('688') or stock.startswith('300') or
stock.startswith('83') or stock.startswith('87') or stock.startswith('43') or
stock.endswith('.BJ') or stock in context.tech_leaders)
# 判断是否为军工股
is_military = stock in context.military_stocks
# 条件1: 止损 - 亏损超过止损比例
if info['return_pct'] < -context.trade_params['stop_loss_pct'] * 100:
log.info(f"🚨 {stock}: 触发止损,亏损{info['return_pct']:.1f}%")
stocks_to_sell.append(stock)
continue
# 条件2: 止盈 - 收益超过止盈比例
if info['return_pct'] > context.trade_params['take_profit_pct'] * 100:
log.info(f"🎯 {stock}: 触发止盈,收益{info['return_pct']:.1f}%")
stocks_to_sell.append(stock)
continue
# 条件3: 移动止盈 - 从最高点回撤超过移动止盈比例
if info['drawdown_pct'] > context.trade_params['trailing_stop_pct'] * 100:
# 科技股和军工股放宽移动止盈条件
if is_tech or is_military:
if info['drawdown_pct'] > context.trade_params['trailing_stop_pct'] * 100 * 1.5:
log.info(f"📉 {stock}: 触发放宽的移动止盈,回撤{info['drawdown_pct']:.1f}%")
stocks_to_sell.append(stock)
else:
log.info(f"📉 {stock}: 触发移动止盈,回撤{info['drawdown_pct']:.1f}%")
stocks_to_sell.append(stock)
continue
# 条件4: 最大回撤 - 整体回撤超过最大回撤比例
if info['drawdown_pct'] > context.trade_params['max_drawdown_pct'] * 100:
log.info(f"⚠️ {stock}: 超过最大回撤限制,回撤{info['drawdown_pct']:.1f}%")
stocks_to_sell.append(stock)
continue
# 条件5: 非科技股在科技牛市中表现不佳
if (context.market_state['trend'] == 'tech_bull' and not is_tech and
info['return_pct'] < 5 and info['hold_days'] > 30):
log.info(f"📊 {stock}: 科技牛市中非科技股表现不佳,收益{info['return_pct']:.1f}%")
stocks_to_sell.append(stock)
continue
# 条件6: 军工股在地缘政治缓和时考虑卖出
if (is_military and context.market_state['geopolitical_tension'] < 0.3 and
info['return_pct'] > 15):
log.info(f"🕊️ {stock}: 地缘政治缓和,军工股锁定收益{info['return_pct']:.1f}%")
stocks_to_sell.append(stock)
continue
return stocks_to_sell
def sell_stocks(context, bar_dict, stocks_to_sell):
"""卖出股票"""
for stock in stocks_to_sell:
try:
log.info(f"🗑️ 卖出: {stock}")
order_target(stock, 0)
# 记录卖出信息
if stock in context.holdings_info:
info = context.holdings_info[stock]
log.info(f" 买入价: {info['buy_price']:.2f}, 卖出价: {info['last_price']:.2f}, "
f"收益: {info['return_pct']:.1f}%, 持仓天数: {info['hold_days']}")
# 从持仓信息中移除
del context.holdings_info[stock]
except Exception as e:
log.error(f"卖出{stock}失败: {str(e)}")
========== 股票池管理函数 ==========
def update_tech_boards_v3(context, all_stocks):
"""更新科创板和创业板、京交所股票池 v3"""
star_market = []
gem = []
beijing = []
首先过滤A股正股
a_shares = filter_a_shares(all_stocks)
for stock in a_shares:
try:
if '.' in stock:
code_part, exchange_part = stock.split('.')
# 科创板(688开头,上交所)
if code_part.startswith('688') and exchange_part == 'SH':
star_market.append(stock)
# 创业板(300开头,深交所)
elif code_part.startswith('300') and exchange_part == 'SZ':
gem.append(stock)
# 京交所(83,87,43开头,或.BJ后缀)
elif code_part.startswith(('83', '87', '43')) or exchange_part == 'BJ':
beijing.append(stock)
except:
continue
context.tech_board_stocks['star_market'] = star_market
context.tech_board_stocks['gem'] = gem
context.tech_board_stocks['beijing'] = beijing
log.info(f"📊 A股正股总数: {len(a_shares)}只")
log.info(f"📊 科创板股票: {len(context.tech_board_stocks['star_market'])}只")
log.info(f"📊 创业板股票: {len(context.tech_board_stocks['gem'])}只")
log.info(f"📊 京交所股票: {len(context.tech_board_stocks['beijing'])}只")
def supplement_with_backup_pool(context, current_list):
"""使用备用股池补充"""
selected = list(current_list)
添加科技龙头
selected.extend(context.tech_leaders)
根据市场状态添加防御性股票
if context.market_state['trend'] in ['tech_bear_defensive_bull', 'bear_market']:
for sector in ['finance', 'liquor', 'gold']:
selected.extend(context.defensive_stocks[sector][:2])
根据地缘政治紧张程度添加军工股
if context.market_state['geopolitical_tension'] > 0.4:
selected.extend(context.military_stocks[:5])
去重
selected = list(set(selected))
return selected
def relaxed_screening_v3(context, stock_list, bar_dict, last_date):
"""放宽条件的筛选 v3"""
filtered = []
首先确保只处理A股正股
a_shares = filter_a_shares(stock_list)
for i, stock in enumerate(a_shares):
if i >= 2000:
break
try:
if bar_dict[stock].is_paused:
continue
try:
name = get_security_info(stock).display_name
if name and ('ST' in name or '*ST' in name):
continue
except:
pass
# 价格条件放宽
price = bar_dict[stock].last
if price < 3 or price > 1000: # 放宽价格范围
continue
filtered.append(stock)
except:
continue
return filtered
def emergency_selection_v3(context, bar_dict, last_date):
"""紧急选股方案 v3 - 只选择A股正股"""
selected = []
log.info(" 紧急方案: 优先选择A股科技龙头股和京交所股票")
首先过滤科技龙头股中的A股正股
tech_leaders_a = []
for stock in context.tech_leaders:
if is_a_share(stock):
tech_leaders_a.append(stock)
tech_leaders_selected = tech_leaders_a[:min(8, context.stock_max_num//2)]
selected.extend(tech_leaders_selected)
科创板A股
if context.tech_board_stocks['star_market']:
star_stocks = context.tech_board_stocks['star_market'][:3]
selected.extend(star_stocks)
创业板A股
if context.tech_board_stocks['gem']:
gem_stocks = context.tech_board_stocks['gem'][:3]
selected.extend(gem_stocks)
京交所股票
if context.tech_board_stocks['beijing']:
beijing_stocks = context.tech_board_stocks['beijing'][:3]
selected.extend(beijing_stocks)
根据地缘政治紧张程度添加A股军工股
military_a = [s for s in context.military_stocks if is_a_share(s)]
if context.market_state['geopolitical_tension'] > 0.4 and military_a:
military_stocks = military_a[:3]
selected.extend(military_stocks)
根据市场状态添加防御性股票(确保是A股)
if context.market_state['trend'] in ['tech_bear_defensive_bull', 'bear_market']:
for sector in ['finance', 'liquor']:
sector_stocks = [s for s in context.defensive_stocks[sector] if is_a_share(s)]
selected.extend(sector_stocks[:2])
去重并限制数量
selected = list(set(selected))
return selected[:context.stock_max_num]
========== 调仓和交易函数 ==========
def fun_needRebalance_v3(context):
"""判断是否需要调仓 v3 - 放宽科技股调仓条件"""
try:
条件1: 持仓为空
if context.num == 0:
return True
# 条件2: 持仓数量小于最小持仓数
if context.num < context.stock_min_num:
log.info(f"📊 调仓条件: 持仓{context.num}只 < 最小持仓{context.stock_min_num}只")
return True
# 条件3: 市场状态发生重大变化
if (hasattr(context, 'last_market_trend') and
context.last_market_trend != context.market_state['trend']):
log.info(f"📊 调仓条件: 市场趋势从{context.last_market_trend}变为{context.market_state['trend']}")
context.last_market_trend = context.market_state['trend']
return True
# 条件4: 地缘政治紧张程度大幅变化
if (hasattr(context, 'last_geopolitical_tension') and
abs(context.last_geopolitical_tension - context.market_state['geopolitical_tension']) > 0.2):
log.info(f"📊 调仓条件: 地缘政治紧张程度变化超过20%")
context.last_geopolitical_tension = context.market_state['geopolitical_tension']
return True
# 条件5: 距离上次调仓超过20个交易日(放宽)
if hasattr(context, 'rebalance_days'):
context.rebalance_days += 1
# 科技股牛市中可以持有更久
max_days = 25 if context.market_state['trend'] == 'tech_bull' else 20
if context.rebalance_days > max_days:
log.info(f"📊 调仓条件: 距离上次调仓{context.rebalance_days}天")
return True
else:
context.rebalance_days = 1
# 初始化市场状态记录
if not hasattr(context, 'last_market_trend'):
context.last_market_trend = context.market_state['trend']
if not hasattr(context, 'last_geopolitical_tension'):
context.last_geopolitical_tension = context.market_state['geopolitical_tension']
return False
except Exception as e:
log.error(f"调仓判断错误: {e}")
return False
def fun_cal_position_with_risk_v3(context, bar_dict, stock_weight):
"""计算仓位 v3"""
trade_ratio = {}
if stock_weight:
# 根据市场状态调整总体仓位
total_position = context.trade_params['target_position']
# 市场情绪影响
sentiment_factor = context.market_state['market_sentiment']
total_position *= (0.8 + sentiment_factor * 0.4) # 在0.8-1.2倍之间调整
# 地缘政治紧张程度影响
geopolitical_factor = 1.0 - context.market_state['geopolitical_tension'] * 0.3
total_position *= geopolitical_factor
total_position = min(max(total_position, 0.5), 0.95) # 限制在50%-95%之间
log.info(f"📊 动态仓位调整: 基础{context.trade_params['target_position']:.0%} -> 实际{total_position:.0%}")
for stock, weight in stock_weight.items():
trade_ratio[stock] = weight * total_position
elif context.trade_params['force_trade']:
log.info("⚠️ 没有选到股票,强制买入优质股票")
# 根据市场状态选择强制买入的股票
if context.market_state['trend'] == 'tech_bull':
emergency_stocks = context.tech_leaders[:3]
elif context.market_state['geopolitical_tension'] > 0.5:
emergency_stocks = context.military_stocks[:2] + context.tech_leaders[:1]
else:
emergency_stocks = list(context.defensive_stocks['finance'][:2]) + context.tech_leaders[:1]
for stock in emergency_stocks:
try:
if not bar_dict[stock].is_paused:
# 根据股票数量调整仓位
position_per_stock = 0.25 if len(emergency_stocks) <= 2 else 0.15
trade_ratio[stock] = position_per_stock
try:
stock_name = get_security_info(stock).display_name
log.info(f"🧪 强制买入: {stock} ({stock_name}) ({position_per_stock:.0%}仓位)")
except:
log.info(f"🧪 强制买入: {stock} ({position_per_stock:.0%}仓位)")
except:
continue
return trade_ratio
def fun_do_trade_v3(context, bar_dict, stock_position):
"""执行交易 v3"""
log.info("💼 开始执行交易")
current_positions = list(context.portfolio.positions.keys())
total_value = context.portfolio.total_value
cash_available = context.portfolio.cash
log.info(f"💰 总资产: {total_value:,.2f}, 可用现金: {cash_available:,.2f}")
卖出不在目标列表中的股票
stocks_to_sell = [s for s in current_positions if s not in stock_position]
if stocks_to_sell:
log.info(f"🗑️ 需要卖出的股票: {len(stocks_to_sell)}只")
for stock in stocks_to_sell:
try:
# 检查是否为科技股或军工股(这些股票卖出更谨慎)
is_priority = (stock.startswith('688') or stock.startswith('300') or
stock.startswith('83') or stock.startswith('87') or stock.startswith('43') or
stock.endswith('.BJ') or stock in context.tech_leaders or stock in context.military_stocks)
if is_priority and stock in context.holdings_info:
info = context.holdings_info[stock]
# 如果科技股/军工股只是小幅亏损,可以再持有观察
if info['return_pct'] > -5 and info['hold_days'] < 30:
log.info(f" ⏸️ {stock}: 科技/军工股小幅亏损且持仓时间短,暂不卖出")
continue
log.info(f" 卖出: {stock}")
order_target(stock, 0)
# 从持仓信息中移除
if stock in context.holdings_info:
del context.holdings_info[stock]
except Exception as e:
log.error(f"卖出{stock}失败: {str(e)}")
买入或调整目标股票
successful_trades = 0
for stock in stock_position:
try:
target_percent = stock_position[stock]
current_price = bar_dict[stock].last
if current_price <= 0:
log.info(f"⚠️ {stock}价格无效,跳过")
continue
target_value = total_value * target_percent
# 确保不超过可用现金,并保留少量现金
max_invest = cash_available * 0.95
if target_value > max_invest:
target_value = max_invest
# 计算目标股数(整百股)
target_amount = int(target_value / current_price / 100) * 100
# 确保至少100股(对小市值股票可以放宽)
if target_amount < 100 and current_price < 50:
target_amount = 100
current_amount = 0
if stock in context.portfolio.positions:
current_amount = context.portfolio.positions[stock].quantity
trade_amount = target_amount - current_amount
if abs(trade_amount) >= 100 or (abs(trade_amount) > 0 and current_amount == 0):
try:
stock_name = get_security_info(stock).display_name
sector = get_stock_sector_v3(context, stock)
log.info(f"📊 {stock} ({stock_name[:10]}) [{sector}]: 当前{current_amount}股, 目标{target_amount}股")
except:
log.info(f"📊 {stock}: 当前{current_amount}股, 目标{target_amount}股")
if trade_amount > 0:
log.info(f"✅ 买入 {stock}: {trade_amount}股,约{target_value:,.0f}元")
order(stock, trade_amount)
successful_trades += 1
# 初始化持仓信息
if stock not in context.holdings_info:
init_holding_info(context, stock, current_price)
elif trade_amount < 0:
log.info(f"✅ 卖出 {stock}: {abs(trade_amount)}股")
order(stock, trade_amount)
successful_trades += 1
else:
log.info(f"📊 {stock}: 持仓已接近目标,无需调整")
except Exception as e:
log.error(f"交易{stock}失败: {str(e)}")
if successful_trades > 0:
log.info(f"✅ 成功执行{successful_trades}笔交易")
else:
log.info("⚠️ 没有成功执行任何交易")
重置调仓天数计数器
if hasattr(context, 'rebalance_days'):
context.rebalance_days = 0
========== 日志记录函数 ==========
def log_previous_day(context, bar_dict):
"""记录上一交易日的情况"""
try:
total_value = context.portfolio.total_value
cash = context.portfolio.cash
position_value = total_value - cash
try:
current_date = get_current_datetime().strftime('%Y-%m-%d')
except:
current_date = dt.datetime.now().strftime('%Y-%m-%d')
log.info("=" * 60)
log.info(f"📅 交易日开始: {current_date}")
log.info(f"💰 总资产: {total_value:,.2f}")
log.info(f"💵 可用现金: {cash:,.2f}")
if total_value > 0:
log.info(f"📈 股票仓位: {position_value/total_value*100:.1f}%")
else:
log.info("📈 股票仓位: 0.0%")
# 记录市场状态
log.info(f"📊 市场趋势: {context.market_state['trend']}")
log.info(f"🌍 地缘政治紧张度: {context.market_state['geopolitical_tension']:.0%}")
# 记录持仓详情
positions = context.portfolio.positions
if len(positions) > 0:
sector_count = defaultdict(int)
log.info("📦 持仓详情:")
for stock, position in positions.items():
try:
stock_name = get_security_info(stock).display_name
buy_price = get_position_cost(position, position.price)
profit_pct = (position.price - buy_price) / buy_price * 100 if buy_price > 0 else 0
# 判断板块
sector = get_stock_sector_v3(context, stock)
sector_count[sector] += 1
log.info(f" {stock} ({stock_name[:10]}) [{sector}]: {position.quantity}股, "
f"成本{buy_price:.2f}, 现价{position.price:.2f}, "
f"盈亏{profit_pct:.1f}%")
except:
log.info(f" {stock}: {position.quantity}股")
# 统计板块分布
log.info("📊 持仓板块分布:")
for sector, count in sector_count.items():
log.info(f" {sector}: {count}只 ({count/len(positions)*100:.0f}%)")
log.info("=" * 60)
except Exception as e:
log.error(f"记录上一交易日情况错误: {e}")
def log_current_holdings_v3(context):
"""记录当前持仓情况 v3"""
try:
current_holdings = list(context.portfolio.positions.keys())
if current_holdings:
log.info(f"📦 当前持仓 ({len(current_holdings)}只):")
# 按板块统计
sector_stats = defaultdict(lambda: {'count': 0, 'value': 0, 'profit': 0})
star_beijing_count = 0
total_value = 0
for stock in current_holdings:
try:
position = context.portfolio.positions[stock]
stock_name = get_security_info(stock).display_name
buy_price = get_position_cost(position, position.price)
profit_pct = (position.price - buy_price) / buy_price * 100 if buy_price > 0 else 0
# 判断板块
sector = get_stock_sector_v3(context, stock)
# 特别标识科创板/京交所科技股
special_mark = ""
if stock.startswith('688'):
special_mark = "⭐" # 科创板
star_beijing_count += 1
elif stock.startswith('83') or stock.startswith('87') or stock.startswith('43') or stock.endswith('.BJ'):
special_mark = "🏛️" # 京交所
star_beijing_count += 1
sector_stats[sector]['count'] += 1
sector_stats[sector]['value'] += position.value
sector_stats[sector]['profit'] += profit_pct
total_value += position.value
log.info(f" {stock} {special_mark} ({stock_name[:10]}): {position.quantity}股, "
f"成本{buy_price:.2f}, 现价{position.price:.2f}, "
f"盈亏{profit_pct:.1f}% [{sector}]")
except:
position = context.portfolio.positions[stock]
log.info(f" {stock}: {position.quantity}股")
# 统计板块分布
if total_value > 0:
log.info("📊 板块持仓统计:")
for sector, stats in sector_stats.items():
weight = stats['value'] / total_value * 100
avg_profit = stats['profit'] / stats['count'] if stats['count'] > 0 else 0
log.info(f" {sector}: {stats['count']}只, 权重{weight:.1f}%, 平均盈亏{avg_profit:.1f}%")
# 科创板/京交所特别统计
if star_beijing_count > 0:
log.info(f"🚀 科创板/京交所股票: {star_beijing_count}只")
# 计算总体盈亏
if len(current_holdings) > 0:
total_profit = sum(sector_stats[s]['profit'] for s in sector_stats)
avg_total_profit = total_profit / len(current_holdings)
log.info(f"📈 持仓股票平均盈亏: {avg_total_profit:.1f}%")
else:
log.info("📦 当前无持仓")
except Exception as e:
log.error(f"记录当前持仓错误: {e}")
========== 分钟级处理函数 ==========
def handle_bar(context, bar_dict):
"""分钟级处理函数"""
pass
主函数入口
if name == "main":
log.info("🚀 增强科技版PEG策略 v3.0 启动")
log.info("📈 主要改进:")
log.info(" 1. 最大持仓增加到15只,最小8只")
log.info(" 2. 简化过滤逻辑,确保能获取A股正股")
log.info(" 3. 取消科技股基本面评分,允许PE为负值")
log.info(" 4. 简化评分系统,只考虑技术面和板块属性")
log.info(" 5. 放宽科技股调仓条件,最大回撤20%")
log.info(" 6. 科技股熊市增加金融、白酒、黄金权重")
log.info(" 7. 加入军工板块,根据地缘政治调整")
log.info(" 8. 动态仓位管理和板块轮动")

