求写一个简单的问财语句作为选股条件的买入和卖出的策略,本人炒股资金有点少,这个思路适合小资金,求大佬能帮忙写一个
求写一个简单的问财语句作为选股条件的买入和卖出的策略,本人炒股资金有点少,这个思路适合小资金,求大佬能帮忙写一个
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格
from datetime import datetime, time
import pandas as pd
from mindgo_api import *
import numpy as np
import talib
def init(context):
context.holding_period = 2 # 股票持有天数(2天)
context.information = {} # 存储股票额外信息的字典
context.stock_data = {} # 存储股票数据的字典
context.limit_up_stocks = set() # 用集合存储涨停股代码
context.sold_stocks = set() # 已卖出股票集合
set_commission(PerShare(type='stock', cost=0.00022, min_trade_cost=5)) # 设置交易佣金万2.2不免5
set_slippage(PriceSlippage(0.002)) # 设置交易滑点0.2%
def open_auction(context, bar_dict):
try:
sell_previous_stocks(context, bar_dict)
# 筛选并买入符合条件的股票
query_wencai = get_stock_wencai()
result = query_iwencai(query=query_wencai, domain='股票', df=True)
if not result.empty:
# 获取集合竞价数据并筛选符合条件的股票
auction_data = get_call_auction(bar_dict, result['股票代码'].tolist()) # 传入bar_dict
buy_stocks = [stock for stock in auction_data
if auction_data[stock]['condition1'] and auction_data[stock]['condition2']]
# 买入符合条件的股票(满仓)
buy_new_stocks(context, bar_dict, buy_stocks) # 传入bar_dict
except Exception as e:
print(f"集合竞价运行出错: {str(e)[:30]}")
def get_stock_wencai():
y_date = get_previous_trading_date(get_datetime()).strftime('%Y%m%d')
query = (
f"{y_date}的连续涨停天数>=4,"
"非st,非创业板,非科创板,非北证a股"
)
return query
def get_call_auction(bar_dict, stocks_list):
today_dt = get_datetime()
result = {}
start_time = today_dt.replace(hour=9, minute=15, second=0)
end_time = today_dt.replace(hour=9, minute=26, second=0)
for stock in stocks_list:
try:
# 获取今日涨跌停价
high_limit = bar_dict[stock].high_limit
low_limit = bar_dict[stock].low_limit
# 获取集合竞价tick数据 - 使用trade_date作为时间字段
ticks = get_tick(stock, start_time, end_time, ['current', 'trade_date'])
if ticks.empty:
print(f"股票{stock}无集合竞价数据")
continue
# 将trade_date转换为datetime并设置为索引
ticks['datetime'] = pd.to_datetime(ticks['trade_date'])
ticks.set_index('datetime', inplace=True)
# 调试输出数据样例
print(f"{stock} 获取到{len(ticks)}条tick数据,时间范围:{ticks.index.min()} - {ticks.index.max()}")
# 定义时间筛选函数
def time_in_range(t, start, end):
return (t.time() >= start) and (t.time() <= end)
# 条件1:9:15-9:20最低价 <= 跌停价*1.02
mask1 = [time_in_range(t, time(9,15), time(9,20)) for t in ticks.index]
min_price_15_20 = ticks[mask1]['current'].min()
condition1 = min_price_15_20 <= low_limit * 1.02 # 跌停价*1.02
# 条件2:9:18-9:26最高价 >= 跌停价*1.05
mask2 = [time_in_range(t, time(9,18), time(9,26)) for t in ticks.index]
max_price_18_25 = ticks[mask2]['current'].max()
condition2 = max_price_18_25 >= low_limit * 1.05 # 跌停价*1.05
# 存储结果
result[stock] = {
'condition1': condition1,
'condition2': condition2,
'min_price': min_price_15_20,
'max_price': max_price_18_25,
'high_limit': high_limit,
'low_limit': low_limit
}
print(f"{stock} 条件1:{condition1} 条件2:{condition2} "
f"最低价:{min_price_15_20:.2f} 最高价:{max_price_18_25:.2f} "
f"跌停价:{low_limit:.2f} 涨停价:{high_limit:.2f}")
except Exception as e:
print(f"处理股票{stock}出错:{str(e)}")
continue
return result
def buy_new_stocks(context, bar_dict, stocks_list):
if not stocks_list:
print("没有符合条件的股票可买入")
return
available_cash = context.portfolio.total_value
per_stock_amount = available_cash / len(stocks_list) if len(stocks_list) > 0 else 0
for stock in stocks_list:
try:
# 每个股票买入分配金额
order_value(stock, per_stock_amount)
print(f"买入 {stock} 金额: {per_stock_amount:.2f}元")
# 记录买入信息(使用bar_dict获取开盘价)
info = Information()
info.buy_date = get_datetime().strftime('%Y%m%d')
info.buy_price = bar_dict[stock].open
context.information[stock] = info
except Exception as e:
print(f"买入股票{stock}失败: {str(e)}")
def sell_previous_stocks(context, bar_dict):
today = get_datetime()
previous_date = get_previous_trading_date(today).strftime('%Y%m%d')
for stock in list(context.portfolio.positions):
info = context.information.get(stock)
if info and info.buy_date == previous_date: # 如果是两天前买入的股票
try:
order_target(stock, 0) # 全部卖出
print(f"卖出股票 {stock},买入日期: {info.buy_date}")
context.sold_stocks.add(stock)
except Exception as e:
print(f"卖出股票{stock}失败: {str(e)}")
class Information:
def init(self):
self.buy_date = "" # 买入日期
self.buy_price = 0 # 买入价格

