万能的@sup****nd001,请给写一个实盘交易策略Python代码:当日14点50分时skdj的k值小于25且SKDJ金叉且持仓为0时买入该股票;如果买入该股票后,以成本价为基础,股价下跌20%时买入相当于持仓量的该股票,当日14点50分时skdj的k值大于70且SKDJ死叉且持仓大于100时卖出该股票。
万能的@sup****nd001,请给写一个实盘交易策略Python代码:当日14点50分时skdj的k值小于25且SKDJ金叉且持仓为0时买入该股票;如果买入该股票后,以成本价为基础,股价下跌20%时买入相当于持仓量的该股票,当日14点50分时skdj的k值大于70且SKDJ死叉且持仓大于100时卖出该股票。
import pandas as pd
import numpy as np
import talib
import yfinance as yf
import time
import datetime
symbol = "AAPL"
position = 0
cost_price = 0
trade_log = []
def compute_skdj(df, high_col, low_col, close_col, n=9, m1=3, m2=3):
"""计算SKDJ指标"""
low_list = df[low_col].rolling(n, min_periods=1).min()
high_list = df[high_col].rolling(n, min_periods=1).max()
rsv = (df[close_col] - low_list) / (high_list - low_list) * 100
k = rsv.ewm(com=m1).mean()
d = k.ewm(com=m2).mean()
return k, d
def check_skdj_signal(k, d, previous_k, previous_d):
"""检查SKDJ的金叉和死叉信号"""
golden_cross = k > d and previous_k <= previous_d
death_cross = k < d and previous_k >= previous_d
return golden_cross, death_cross
def sync_stock_price(symbol):
"""获取实时股票价格"""
data = yf.download(symbol, period="1d", interval="1m", progress=False)
current_price = data['Close'].iloc[-1]
return current_price
def run_strategy():
global position, cost_price
while True:
# 获取当前时间
current_time = datetime.datetime.now().strftime("%H:%M")
print(f"Current Time: {current_time}")
# 判断是否是交易时间(以美东时间9:30-16:00为例)
if current_time < '09:30' or current_time >= '16:00':
print("非交易时间,程序睡眠...")
time.sleep(60 * 5) # 5分钟检查一次
continue
# 判断是否到达14:50
if current_time == '14:50':
try:
# 获取历史数据(假设获取最近20天的数据)
data = yf.download(symbol, period="20d", progress=False)
# 计算SKDJ指标
k, d = compute_skdj(data, "High", "Low", "Close")
previous_k = k.iloc[-2]
current_k = k.iloc[-1]
previous_d = d.iloc[-2]
current_d = d.iloc[-1]
# 获取最新股价
current_price = sync_stock_price(symbol)
# 检查买入条件(当天14:50 SKDJ的K值小于25且金叉)
if position == 0 and current_k < 25:
golden_cross, _ = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if golden_cross:
# 下单买入
buy_quantity = 100 # 买入数量
cost_price = current_price
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Buy",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"买入成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查补仓条件(股价下跌20%)
if position > 0 and current_price <= cost_price * 0.8:
buy_quantity = position # 买入相当于持仓量的股票
cost_price = (cost_price * position + current_price * buy_quantity) / (position + buy_quantity)
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Add",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"补仓成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查卖出条件(当天14:50 SKDJ的K值大于70且死叉,且持仓大于100股)
if position > 100 and current_k > 70:
_, death_cross = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if death_cross:
# 下单卖出
sell_quantity = position
position = 0
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Sell",
"Price": current_price,
"Quantity": sell_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"卖出成功! 价格: {current_price}, 数量: {sell_quantity}")
except Exception as e:
print(f"数据或网络错误: {e}")
# 非14:50,不做任何操作
else:
time.sleep(60) # 1分钟检查一次
if name == "main":
run_strategy()
import pandas as pd
import numpy as np
import talib
import yfinance as yf
import time
import datetime
symbol = "AAPL"
position = 0
cost_price = 0
trade_log = []
def compute_skdj(df, high_col, low_col, close_col, n=9, m1=3, m2=3):
"""计算SKDJ指标"""
low_list = df[low_col].rolling(n, min_periods=1).min()
high_list = df[high_col].rolling(n, min_periods=1).max()
rsv = (df[close_col] - low_list) / (high_list - low_list) * 100
k = rsv.ewm(com=m1).mean()
d = k.ewm(com=m2).mean()
return k, d
def check_skdj_signal(k, d, previous_k, previous_d):
"""检查SKDJ的金叉和死叉信号"""
golden_cross = k > d and previous_k <= previous_d
death_cross = k < d and previous_k >= previous_d
return golden_cross, death_cross
def sync_stock_price(symbol):
"""获取实时股票价格"""
data = yf.download(symbol, period="1d", interval="1m", progress=False)
current_price = data['Close'].iloc[-1]
return current_price
def run_strategy():
global position, cost_price
while True:
# 获取当前时间
current_time = datetime.datetime.now().strftime("%H:%M")
print(f"Current Time: {current_time}")
# 判断是否是交易时间(以美东时间9:30-16:00为例)
if current_time < '09:30' or current_time >= '16:00':
print("非交易时间,程序睡眠...")
time.sleep(60 * 5) # 5分钟检查一次
continue
# 判断是否到达14:50
if current_time == '14:50':
try:
# 获取历史数据(假设获取最近20天的数据)
data = yf.download(symbol, period="20d", progress=False)
# 计算SKDJ指标
k, d = compute_skdj(data, "High", "Low", "Close")
previous_k = k.iloc[-2]
current_k = k.iloc[-1]
previous_d = d.iloc[-2]
current_d = d.iloc[-1]
# 获取最新股价
current_price = sync_stock_price(symbol)
# 检查买入条件(当天14:50 SKDJ的K值小于25且金叉)
if position == 0 and current_k < 25:
golden_cross, _ = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if golden_cross:
# 下单买入
buy_quantity = 100 # 买入数量
cost_price = current_price
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Buy",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"买入成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查补仓条件(股价下跌20%)
if position > 0 and current_price <= cost_price * 0.8:
buy_quantity = position # 买入相当于持仓量的股票
cost_price = (cost_price * position + current_price * buy_quantity) / (position + buy_quantity)
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Add",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"补仓成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查卖出条件(当天14:50 SKDJ的K值大于70且死叉,且持仓大于100股)
if position > 100 and current_k > 70:
_, death_cross = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if death_cross:
# 下单卖出
sell_quantity = position
position = 0
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Sell",
"Price": current_price,
"Quantity": sell_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"卖出成功! 价格: {current_price}, 数量: {sell_quantity}")
except Exception as e:
print(f"数据或网络错误: {e}")
# 非14:50,不做任何操作
else:
time.sleep(60) # 1分钟检查一次
if name == "main":
run_strategy()
import pandas as pd
import numpy as np
import talib
import yfinance as yf
import time
import datetime
symbol = "AAPL"
position = 0
cost_price = 0
trade_log = []
def compute_skdj(df, high_col, low_col, close_col, n=9, m1=3, m2=3):
"""计算SKDJ指标"""
low_list = df[low_col].rolling(n, min_periods=1).min()
high_list = df[high_col].rolling(n, min_periods=1).max()
rsv = (df[close_col] - low_list) / (high_list - low_list) * 100
k = rsv.ewm(com=m1).mean()
d = k.ewm(com=m2).mean()
return k, d
def check_skdj_signal(k, d, previous_k, previous_d):
"""检查SKDJ的金叉和死叉信号"""
golden_cross = k > d and previous_k <= previous_d
death_cross = k < d and previous_k >= previous_d
return golden_cross, death_cross
def sync_stock_price(symbol):
"""获取实时股票价格"""
data = yf.download(symbol, period="1d", interval="1m", progress=False)
current_price = data['Close'].iloc[-1]
return current_price
def run_strategy():
global position, cost_price
while True:
# 获取当前时间
current_time = datetime.datetime.now().strftime("%H:%M")
print(f"Current Time: {current_time}")
# 判断是否是交易时间(以美东时间9:30-16:00为例)
if current_time < '09:30' or current_time >= '16:00':
print("非交易时间,程序睡眠...")
time.sleep(60 * 5) # 5分钟检查一次
continue
# 判断是否到达14:50
if current_time == '14:50':
try:
# 获取历史数据(假设获取最近20天的数据)
data = yf.download(symbol, period="20d", progress=False)
# 计算SKDJ指标
k, d = compute_skdj(data, "High", "Low", "Close")
previous_k = k.iloc[-2]
current_k = k.iloc[-1]
previous_d = d.iloc[-2]
current_d = d.iloc[-1]
# 获取最新股价
current_price = sync_stock_price(symbol)
# 检查买入条件(当天14:50 SKDJ的K值小于25且金叉)
if position == 0 and current_k < 25:
golden_cross, _ = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if golden_cross:
# 下单买入
buy_quantity = 100 # 买入数量
cost_price = current_price
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Buy",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"买入成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查补仓条件(股价下跌20%)
if position > 0 and current_price <= cost_price * 0.8:
buy_quantity = position # 买入相当于持仓量的股票
cost_price = (cost_price * position + current_price * buy_quantity) / (position + buy_quantity)
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Add",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"补仓成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查卖出条件(当天14:50 SKDJ的K值大于70且死叉,且持仓大于100股)
if position > 100 and current_k > 70:
_, death_cross = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if death_cross:
# 下单卖出
sell_quantity = position
position = 0
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Sell",
"Price": current_price,
"Quantity": sell_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"卖出成功! 价格: {current_price}, 数量: {sell_quantity}")
except Exception as e:
print(f"数据或网络错误: {e}")
# 非14:50,不做任何操作
else:
time.sleep(60) # 1分钟检查一次
if name == "main":
run_strategy()
import pandas as pd
import numpy as np
import talib
import yfinance as yf
import time
import datetime
symbol = "AAPL"
position = 0
cost_price = 0
trade_log = []
def compute_skdj(df, high_col, low_col, close_col, n=9, m1=3, m2=3):
"""计算SKDJ指标"""
low_list = df[low_col].rolling(n, min_periods=1).min()
high_list = df[high_col].rolling(n, min_periods=1).max()
rsv = (df[close_col] - low_list) / (high_list - low_list) * 100
k = rsv.ewm(com=m1).mean()
d = k.ewm(com=m2).mean()
return k, d
def check_skdj_signal(k, d, previous_k, previous_d):
"""检查SKDJ的金叉和死叉信号"""
golden_cross = k > d and previous_k <= previous_d
death_cross = k < d and previous_k >= previous_d
return golden_cross, death_cross
def sync_stock_price(symbol):
"""获取实时股票价格"""
data = yf.download(symbol, period="1d", interval="1m", progress=False)
current_price = data['Close'].iloc[-1]
return current_price
def run_strategy():
global position, cost_price
while True:
# 获取当前时间
current_time = datetime.datetime.now().strftime("%H:%M")
print(f"Current Time: {current_time}")
# 判断是否是交易时间(以美东时间9:30-16:00为例)
if current_time < '09:30' or current_time >= '16:00':
print("非交易时间,程序睡眠...")
time.sleep(60 * 5) # 5分钟检查一次
continue
# 判断是否到达14:50
if current_time == '14:50':
try:
# 获取历史数据(假设获取最近20天的数据)
data = yf.download(symbol, period="20d", progress=False)
# 计算SKDJ指标
k, d = compute_skdj(data, "High", "Low", "Close")
previous_k = k.iloc[-2]
current_k = k.iloc[-1]
previous_d = d.iloc[-2]
current_d = d.iloc[-1]
# 获取最新股价
current_price = sync_stock_price(symbol)
# 检查买入条件(当天14:50 SKDJ的K值小于25且金叉)
if position == 0 and current_k < 25:
golden_cross, _ = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if golden_cross:
# 下单买入
buy_quantity = 100 # 买入数量
cost_price = current_price
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Buy",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"买入成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查补仓条件(股价下跌20%)
if position > 0 and current_price <= cost_price * 0.8:
buy_quantity = position # 买入相当于持仓量的股票
cost_price = (cost_price * position + current_price * buy_quantity) / (position + buy_quantity)
position += buy_quantity
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Add",
"Price": current_price,
"Quantity": buy_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"补仓成功! 价格: {current_price}, 数量: {buy_quantity}")
# 检查卖出条件(当天14:50 SKDJ的K值大于70且死叉,且持仓大于100股)
if position > 100 and current_k > 70:
_, death_cross = check_skdj_signal(current_k, current_d, previous_k, previous_d)
if death_cross:
# 下单卖出
sell_quantity = position
position = 0
trade_log.append({
"Time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Action": "Sell",
"Price": current_price,
"Quantity": sell_quantity,
"SKDJ_K": current_k,
"SKDJ_D": current_d
})
print(f"卖出成功! 价格: {current_price}, 数量: {sell_quantity}")
except Exception as e:
print(f"数据或网络错误: {e}")
# 非14:50,不做任何操作
else:
time.sleep(60) # 1分钟检查一次
if name == "main":
run_strategy()