动量轮动策略简介:
动量轮动是一种基于股票价格的趋势跟随策略,即选择近期表现较好的股票进行买入,然后在趋势反转时卖出。这种策略的核心是利用市场的惯性,即股票的价格趋势会在一定时间内保持不变或持续上涨或下跌。
动量轮动策略可以帮助投资者识别股票价格的短期趋势,并在趋势反转时及时卖出。这种策略可以帮助投资者降低风险,避免错失投资机会。
计算动量
计算动量可以使用股票价格的历史数据。一般来说,动量指标可以使用以下公式进行计算:
动量 = (当前价格 / N天前的价格 - 1)* 100%
其中,N表示计算动量的时间窗口,可以根据投资者的需求进行调整。例如,如果N=20,那么动量指标就是当前价格与20天前的价格之间的比率,再减去1,最后乘以100%。如果动量指标为正数,表示股票价格在过去N天内上涨;如果动量指标为负数,表示股票价格在过去N天内下跌。
卡尔曼滤波策略详细介绍:
卡尔曼滤波是一种常用的信号处理技术,可以用来估计系统的状态。在股票交易中,卡尔曼滤波可以用来预测股票价格的走势。卡尔曼滤波的核心是利用系统的动态模型和观测数据来估计系统的状态。具体来说,卡尔曼滤波将系统的状态表示为一个向量,然后通过观测数据来更新这个向量。
卡尔曼滤波的计算过程包括两个步骤:预测和更新。
-
预测步骤: 预测步骤用于估计系统的状态。具体来说,预测步骤根据系统的动态模型和上一次的状态估计,预测出当前时刻的状态估计和状态协方差矩阵。预测步骤的公式如下:预测状态 = 系统动态模型 * 上一次状态估计 预测协方差矩阵 = 系统动态模型 * 上一次协方差矩阵 * 系统动态模型转置 + 过程噪声协方差矩阵
-
更新步骤: 更新步骤用于根据观测数据来更新状态估计和状态协方差矩阵。具体来说,更新步骤根据观测数据和预测的状态估计,计算出当前时刻的状态估计和状态协方差矩阵。更新步骤的公式如下:
卡尔曼增益 = 预测协方差矩阵 * 观测矩阵转置 * (观测矩阵 * 预测协方差矩阵 * 观测矩阵转置 + 观测噪声协方差矩阵)^(-1) 更新状态 = 预测状态 + 卡尔曼增益 * (观测数据 - 观测矩阵 * 预测状态) 更新协方差矩阵 = (单位矩阵 - 卡尔曼增益 * 观测矩阵)* 预测协方差矩阵
其中,系统动态模型、过程噪声协方差矩阵、观测矩阵和观测噪声协方差矩阵需要根据具体的股票交易场景进行定义和调整。
RSRS因子择时:
RSRS因子择时是一种基于股票价格的择时策略,即根据股票价格的变化趋势来进行买卖决策。具体来说,RSRS因子择时使用斜率作为指标,计算出斜率的标准化值,然后根据标准化值的大小来判断是否买入、卖出或持有股票。
RSRS因子的计算过程包括以下几个步骤:
-
计算股票价格的对数收益率:对数收益率可以用来度量股票价格的变化幅度。具体来说,对数收益率可以使用以下公式进行计算:
对数收益率 = ln(当前价格 / 前一个价格),其中,ln表示自然对数,当前价格是指当前时刻的股票价格,前一个价格是指前一个时刻的股票价格。
-
计算滑动窗口内的斜率:斜率可以用来度量股票价格的变化趋势。具体来说,斜率可以使用线性回归的方法来计算。假设滑动窗口的大小为N,那么可以使用以下公式来计算斜率:
斜率 = cov(X, Y) / var(X),其中,X表示时间序列,Y表示对数收益率序列,cov表示协方差,var表示方差。
-
计算标准化斜率:标准化斜率可以用来判断股票价格的变化趋势是否显著。具体来说,标准化斜率可以使用以下公式进行计算:
标准化斜率 = (斜率 - 均值) / 标准差, 其中,均值和标准差可以使用滑动窗口内的数据来计算。
-
根据标准化斜率的大小进行买卖决策:如果标准化斜率大于一定的阈值,表示股票价格的变化趋势显著,可以考虑买入股票;如果标准化斜率小于一定的阈值,表示股票价格的变化趋势不显著,可以考虑卖出股票;如果标准化斜率在阈值之间,可以考虑持有股票。阈值的大小可以根据具体的股票交易场景进行调整。
代码
# 初始化函数,全局只运行一次
def init(context):
# 设置基准收益:沪深300指数
set_benchmark('000300.SH')
# 打印日志
log.info('策略开始运行,初始化函数全局只运行一次')
# 设置股票每笔交易的手续费为万分之二(手续费在买卖成交后扣除,不包括税费,税费在卖出成交后扣除)
set_commission(PerShare(type='stock',cost=0.0002))
# 设置股票交易滑点0.5%,表示买入价为实际价格乘1.005,卖出价为实际价格乘0.995
set_slippage(PriceSlippage(0.005))
# 设置日级最大成交比例25%,分钟级最大成交比例50%
# 日频运行时,下单数量超过当天真实成交量25%,则全部不成交
# 分钟频运行时,下单数量超过当前分钟真实成交量50%,则全部不成交
set_volume_limit(0.25,0.5)
g.stock_pool = [
'510050.SH', # 上证50ETF
'159928.SZ', # 中证消费ETF
'510300.SH', # 沪深300ETF
'159949.SZ', # 创业板500
]
# 备选池:用流动性和市值更大的50ETF分别代替宽指ETF,500与300ETF保留一个
g.stock_num = 1 #买入评分最高的前stock_num只股票
g.momentum_day = 20 #最新动量参考最近momentum_day的
g.ref_stock = '000300.SH' #用ref_stock做择时计算的基础数据
g.N = 18 # 计算最新斜率slope,拟合度r2参考最近N天
g.M = 600 # 计算最新标准分zscore,rsrs_score参考最近M天
g.score_threshold = 0.7 # rsrs标准分指标阈值
g.slope_series = initial_slope_series()[:-1] # 除去回测第一天的slope,避免运行时重复加入
set_holding_stocks({'000001.SZ': 200,
'000002.SZ': 500,
'600519.SH': 700,
'300033.SZ': 800})
## 开盘时运行函数
def handle_bar(context, bar_dict):
# 获取时间
time = get_datetime().strftime('%Y-%m-%d %H:%M:%S')
# 打印时间
log.info('{} 盘中运行'.format(time))
my_trade(context)
check_lose(context)
# 卡尔曼滤波
def kalman_filter(observations,damping=1):
from pykalman import KalmanFilter
observation_covariance = damping
initial_value_guess = observations[0]
transition_matrix = 1
transition_covariance = 0.1
kf = KalmanFilter(
initial_state_mean=initial_value_guess,
initial_state_covariance=observation_covariance,
transition_matrices=transition_matrix,
observation_covariance=observation_covariance,
transition_covariance=transition_covariance,
transition_offsets=None)
pre, cov = kf.smooth(observations)
return pre
# 20日收益率动量拟合取斜率最大的
def get_rank(context,stock_pool):
import numpy as np
rank = []
for stock in g.stock_pool:
data = history(stock, bar_count = g.momentum_day, fre_step = '1d', fields = ['close'], skip_paused = True, fq = 'pre', is_panel = 1)
data.close = data.close/data.close[0]
pred_data = kalman_filter(np.array(data.close),0.15) # 调用卡尔曼滤波
score = np.polyfit(np.arange(20),pred_data[-20:],1)[0]
rank.append([stock, score])
rank.sort(key=lambda x: x[-1],reverse=True)
# log.info(rank)
return rank[0]
# 这里求R2的式子有点问题,但是这个效果更好,原因未找到!
def get_ols(x, y):
import numpy as np
slope, intercept = np.polyfit(x, y, 1)
r2 = 1 - (sum((y - (slope * x + intercept))**2) / ((len(y) - 1) * np.var(y, ddof=1)))
return (intercept, slope, r2)
def initial_slope_series():
data = get_price(g.ref_stock, bar_count = g.N + g.M, end_date = get_datetime().strftime('%Y%m%d'), fre_step = '1d', fields = ['high', 'low'], skip_paused = True, fq = 'pre', is_panel = 1)
return [get_ols(data.low[i:i+g.N], data.high[i:i+g.N])[1] for i in range(g.M)]
# 因子标准化
def get_zscore(slope_series):
import numpy as np
mean = np.mean(slope_series)
std = np.std(slope_series)
return (slope_series[-1] - mean) / std
# 只看RSRS因子值作为买入、持有和清仓依据,前版本还加入了移动均线的上行作为条件
def get_timing_signal(context,stock):
data = history(g.ref_stock, bar_count = 18, fre_step = '1d', fields = ['high', 'low','volume'], skip_paused = True, fq = 'pre', is_panel = 1)
intercept, slope, r2 = get_ols(data.low, data.high)
g.slope_series.append(slope)
rsrs_score = get_zscore(g.slope_series[-g.M:]) * r2
log.info('rsrs_score {:.3f}'.format(rsrs_score))
if rsrs_score > g.score_threshold: return "BUY"
elif rsrs_score < -g.score_threshold: return "SELL"
else: return "KEEP"
#4-1 交易模块-自定义下单
#报单成功返回报单(不代表一定会成交),否则返回None,应用于
def order_target_value_(security, value):
if value == 0:
log.error("Selling out %s" % (security))
else:
log.error("Order %s to value %f" % (security, value))
# 如果股票停牌,创建报单会失败,order_target_value 返回None
# 如果股票涨跌停,创建报单会成功,order_target_value 返回Order,但是报单会取消
# 部成部撤的报单,聚宽状态是已撤,此时成交量>0,可通过成交量判断是否有成交
return order_target_value(security, value)
#4-2 交易模块-开仓
#买入指定价值的证券,报单成功并成交(包括全部成交或部分成交,此时成交量大于0)返回True,报单失败或者报单成功但被取消(此时成交量等于0),返回False
def open_position(security, value):
o_id = order_target_value_(security, value)
order = get_order(o_id)
if order != None and order.filled_amound > 0:
return True
return False
#4-3 交易模块-平仓
#卖出指定持仓,报单成功并全部成交返回True,报单失败或者报单成功但被取消(此时成交量等于0),或者报单非全部成交,返回False
def close_position(position):
security = position.symbol
o_id = order_target_value_(security, 0) # 可能会因停牌失败
order = get_order(o_id)
if order != None:
if order.status == OrderStatus.held and order.filled_amount == order.amount:
return True
return False
def adjust_position(context, buy_stocks):
for stock in context.portfolio.positions:
if stock not in buy_stocks:
log.info("[%s]已不在应买入列表中" % (stock))
position = context.portfolio.positions[stock]
close_position(position)
else:
log.info("[%s]已经持有无需重复买入" % (stock))
position_count = len(context.portfolio.positions)
if g.stock_num > position_count:
value = context.portfolio.available_cash / (g.stock_num - position_count)
for stock in buy_stocks:
if context.portfolio.positions[stock].amount == 0:
if open_position(stock, value):
if len(context.portfolio.positions) == g.stock_num:
break
# 交易主函数,先确定ETF最强的是谁,然后再根据择时信号判断是否需要切换或者清仓
def my_trade(context):
check_out_list = get_rank(context,g.stock_pool)
timing_signal = get_timing_signal(context,g.ref_stock)
print('今日自选及择时信号:{} {}'.format(check_out_list,timing_signal))
if timing_signal == 'SELL':
for stock in context.portfolio.positions:
position = context.portfolio.positions[stock]
close_position(position)
elif timing_signal == 'BUY' or timing_signal == 'KEEP':
adjust_position(context, check_out_list)
else: pass
def check_lose(context):
for stock in context.portfolio.positions:
securities=position
cost=context.portfolio.positions[stock].cost_basis
price=context.portfolio.positions[stock].last_price
ret=100*(price/cost-1)
value=context.portfolio.positions[stock].market_value
amount=context.portfolio.positions[stock].amount
if ret <=-90:
order_target_value(stock, 0)
print("!!!!!!触发止损信号: 标的={},标的价值={},浮动盈亏={}% !!!!!!"
.format(securities,format(value,'.2f'),format(ret,'.2f')))