请高手帮忙指点一下啊,多谢

用户头像afterborn
2025-02-14 发布

写了一个基于财务和技术指标的策略,一直运行不起来,不知道哪里出错了,请高手帮忙看看,,多谢多谢!

import numpy as np
import pandas as pd
from datetime import timedelta

def init(context):

配置股票池(支持多股票扩展)

context.stock_pool = ['600519.SH'] # 以茅台为例
context.current_stock = context.stock_pool[0]

财务因子配置

context.financial_metrics = {
'valuation': ['pe_ratio', 'pb_ratio', 'ps_ratio', 'market_cap'],
'profitability': ['roe', 'roa', 'gross_profit_margin'],
'growth': ['revenue_growth_rate', 'net_profit_growth_rate'],
'dividend': ['dividend_yield', 'divident_payout_ratio']
}

技术指标参数

context.technical_params = {
'MA': [5, 20, 60],
'RSI': 14,
'MACD': (12, 26, 9),
'BOLL': 20
}

数据缓存设置

context.data_cache = {
'financials': {},
'prices': pd.DataFrame()
}

注册定时任务

run_monthly(update_financials, 15) # 每月15日更新财务数据
run_daily(update_technical, time='09:30') # 每日更新技术指标
def update_financials(context):
"""财务数据更新函数"""
for metric_type in context.financial_metrics:
try:

获取最新财务数据

df = get_fundamentals(
table='valuation' if metric_type == 'valuation' else 'income',
symbols=context.stock_pool,
start_date=context.previous_date - timedelta(days=90),
end_date=context.previous_date,
fields=','.join(context.financial_metrics[metric_type])
)

    # 数据清洗处理
    df = df.drop_duplicates(subset=['symbol', 'pub_date'], keep='last')
    df['report_date'] = pd.to_datetime(df['report_date'])
  
    # 更新缓存
    context.data_cache['financials'].update(
        {row['symbol']: row for _, row in df.iterrows()}
    )
    log.info("财务数据更新完成:{}".format(metric_type))  # 修改点
  
except Exception as e:
    log.error("财务数据获取失败:{}".format(str(e)))
    notify("财务数据异常:{}".format(metric_type))

def update_technical(context):
"""技术指标更新函数"""
try:

获取历史价格数据(包含3年历史)

price_data = history_bars(
context.current_stock,
count=252*3,
frequency='1d',
fields=['close', 'high', 'low', 'volume']
)

# 转换为DataFrame处理
df = pd.DataFrame(
    price_data,
    columns=['date', 'close', 'high', 'low', 'volume']
)
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

# 计算移动平均(保持与平台Python版本兼容)
for period in context.technical_params['MA']:
    df['MA{}'.format(period)] = df['close'].rolling(period).mean()

# 计算RSI
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.ewm(alpha=1.0/context.technical_params['RSI']).mean()
avg_loss = loss.ewm(alpha=1.0/context.technical_params['RSI']).mean()
rs = avg_gain / avg_loss
df['RSI'] = 100 - (100 / (1 + rs))

# 计算MACD
short_ema = df['close'].ewm(span=context.technical_params['MACD'][0]).mean()
long_ema = df['close'].ewm(span=context.technical_params['MACD'][1]).mean()
df['MACD'] = short_ema - long_ema
df['Signal'] = df['MACD'].ewm(span=context.technical_params['MACD'][2]).mean()
df['Histogram'] = df['MACD'] - df['Signal']

# 计算布林线
ma = df['close'].rolling(context.technical_params['BOLL']).mean()
std = df['close'].rolling(context.technical_params['BOLL']).std()
df['BOLL_Upper'] = ma + 2*std
df['BOLL_Middle'] = ma
df['BOLL_Lower'] = ma - 2*std

# 更新缓存
context.data_cache['prices'] = df
log.info("技术指标计算完成")

except Exception as e:
log.error("技术指标计算失败:{}".format(str(e)))
notify("技术指标数据异常")
def handle_bar(context, bar_dict):
try:

获取当前股票数据

current_stock = context.current_stock
current_date = get_datetime().strftime('%Y-%m-%d')

# 提取最新财务数据
financials = context.data_cache['financials'].get(current_stock, {})
latest_financial = {
    'pe_ratio': financials.get('pe_ratio', np.nan),
    'roe': financials.get('roe', np.nan),
    'dividend_yield': financials.get('dividend_yield', np.nan)
}

# 获取最新技术指标
tech_data = context.data_cache['prices'].iloc[-1] if not context.data_cache['prices'].empty else {}

# 生成信号
generate_signals(context, latest_financial, tech_data)

# 监控数据质量
if len(context.data_cache['prices']) < 60:
    log.warn("技术数据不足60日,建议检查数据源")
  
# 记录关键数据
record(
    PE=latest_financial['pe_ratio'],
    ROE=latest_financial['roe'],
    RSI=tech_data.get('RSI', np.nan),
    MACD=tech_data.get('MACD', np.nan)
)

except Exception as e:
log.error("主循环异常:{}".format(str(e)))
notify("策略执行异常")
def generate_signals(context, financials, tech):
"""信号生成引擎"""

估值信号

pe_signal = '低估' if financials['pe_ratio'] < 15 else '高估' if financials['pe_ratio'] > 30 else '中性'

动量信号

rsi_signal = '超买' if tech['RSI'] > 70 else '超卖' if tech['RSI'] < 30 else '中性'

MACD金叉死叉

macd_signal = '买入' if tech['MACD'] > tech['Signal'] else '卖出'

组合信号

if pe_signal == '低估' and rsi_signal == '超卖':
context.signal = '强烈买入'
elif pe_signal == '高估' and rsi_signal == '超买':
context.signal = '强烈卖出'
else:
context.signal = macd_signal

log.info("""
信号生成报告:
估值状态:{pe}(PE={pe_val:.1f})
动量状态:{rsi}(RSI={rsi_val:.1f})
MACD信号:{macd}
最终决策:{signal}
""".format(
pe=pe_signal,
pe_val=financials.get('pe_ratio', np.nan),
rsi=rsi_signal,
rsi_val=tech.get('RSI', np.nan),
macd=macd_signal,
signal=context.signal
))

评论