发布时间:2024-05-22 12:01
if __name__ == \'__main__\':
# 省略。。。
# T = transpose
orders_df = pd.DataFrame(orders).T
orders_df.loc[:, \'pnl\'].plot.bar()
plt.show()
# print sum of pnl
print(\'sum of pnl is: \' + str(orders_df.loc[:, \'pnl\'].sum()))
# 使用 mplfinance 绘制k线图:订单交易价格与时间
bar5 = pd.read_csv(bar_path, parse_dates=[\'datetime\'])
bar5.loc[:, \'datetime\'] = [date2num(x) for x in bar5.loc[:, \'datetime\']]
# 省略。。。
def strategy(self):
# last < 0.95 *ma20 ,long position(仓位), last > ma20 *1.05, sell
if self._is_new_bar:
sum_ = 0
for item in self._Close[1:21]:
sum_ = sum_ + item
self._ma20 = sum_ / 20
if 0 == len(self._current_orders):
if self._Close[0] < 0.98 * self._ma20:
# 100000/44.28 = 2258 44.28是当前价格,10万指的你拥有的钱
# 2258 -> 2200 shares
volume = int(100000 / self._Close[0] / 100) * 100
self._buy(self._Close[0] + 0.01, volume) # 这里的0.01是为了防止挂单,我们需要即可买入
elif 1 == len(self._current_orders):
if self._Close[0] > self._ma20 * 1.02:
key = list(self._current_orders.keys())[0]
self._sell(key, self._Close[0] - 0.01)
# 省略。。。
def strategy(self):
# last < 0.95 *ma20 ,long position(仓位), last > ma20 *1.05, sell
if self._is_new_bar:
sum_ = 0
for item in self._Close[1:21]:
sum_ = sum_ + item
self._ma20 = sum_ / 20
if 0 == len(self._current_orders):
if self._Close[0] < 0.98 * self._ma20:
# 100000/44.28 = 2258 44.28是当前价格,10万指的你拥有的钱
# 2258 -> 2200 shares
volume = int(100000 / self._Close[0] / 100) * 100
self._buy(self._Close[0] + 0.01, volume) # 这里的0.01是为了防止挂单,我们需要即可买入
elif 1 == len(self._current_orders):
if self._Close[0] > self._ma20 * 1.02:
key = list(self._current_orders.keys())[0]
if self._Dt[0].date() != self._current_orders[key][\'open_datetime\'].date():
self._sell(key, self._Close[0] - 0.01)
print(\'open date is %s, close date is: %s.\'
% (self._history_orders[key][\'open_datetime\'].date(), self._Dt[0].date()))
else:
# if sam dates, sell order aborted due to T+0 limit
print(\'sell order aborted due to T+0 limit\')
else: # len() = 2
raise ValueError(\"we have more then 1 current orders\")
# Close[0] in between 0.95*ma20 and 1.05*ma20,do nothing
import requests
from time import sleep
from datetime import datetime, time, timedelta
from dateutil import parser
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
# import mplfinance as mpf
from mplfinance.original_flavor import candlestick_ohlc
from matplotlib.dates import date2num
def get_ticks_for_backtesting(tick_path, bar_path):
\"\"\"
:func: get ticks for backtesting, need two params
:param1 tick_path: 生成的回测数据路径
csv file with tick data,
when there is not tick data,
use bat_path to create tick data
example: \"E:\\\\Downloads\\\\600036_data\\\\600036_ticks.csv\"
:param2 bar_path: 历史数据的tick路径
csv file with bar data,
used in creating tick data
example: \"E:\\\\Downloads\\\\600036_data\\\\600036_5m.csv\"
:return: ticks in list with tuples in it, such as
[(datetime, last_price), (datetime, last_price)]
\"\"\"
if os.path.exists(tick_path): # 如果已存在回测数据,直接读取回测数据ticks
ticks = pd.read_csv(
tick_path,
parse_dates=[\'datetime\'],
index_col=\'datetime\'
)
tick_list = []
for index, row in ticks.iterrows():
tick_list.append((index, row[0]))
# ticks = np.array(tick_list)
ticks = tick_list
else:
bar_5m = pd.read_csv(bar_path) # 使用pandas读取csv数据
ticks = []
for index, row in bar_5m.iterrows(): # 根据不同的开盘价设置步长
if row[\'open\'] < 30:
step = 0.01
elif row[\'open\'] < 60:
step = 0.03
elif row[\'open\'] < 90:
step = 0.05
else:
step = 0.1
# in case of np.arrange(30, 30.11, 0.02), (open, high, step)
# we will not have 30.11 as the highest price,
# we might not catch high when step is more than 0.01
# that is why me need: arr = np.append(arr, row[\'high\']) and
# arr = np.append(arr, row[\'low\'])
arr = np.arange(row[\'open\'], row[\'high\'], step) # 按步长生成从open到high的数据
arr = np.append(arr, row[\'high\']) # 这个是为了弥补步长的不对等会漏掉high
arr = np.append(arr, np.arange(row[\'open\'] - step, row[\'low\'], -step)) # 按步长生成从open到low的数据
arr = np.append(arr, row[\'low\']) # 这个是为了弥补步长的不对等会漏掉low
arr = np.append(arr, row[\'close\'])
i = 0
dt = parser.parse(row[\'datetime\']) - timedelta(minutes=5)
for item in arr:
ticks.append((dt + timedelta(seconds=0.1 * i), item)) # 将数据时间模拟到0.1秒递进
i += 1
tick_df = pd.DataFrame(ticks, columns=[\'datetime\', \'price\'])
tick_df.to_csv(tick_path, index=0) # 保存到csv回测数据中
return ticks
# __init__,构造,初始化,实例化
class AstockTrading(object):
\"\"\"
:class: A stock trading platform, needs one param,
It has backtesting, paper trading, and real trading.
:param1: strategy_name: strategy_name
\"\"\"
def __init__(self, strategy_name):
self._strategy_name = strategy_name
self._Dt = [] # 交易时间
self._Open = [] # 开盘价
self._High = [] # 最高价
self._Low = [] # 最低价
self._Close = [] # 最新价
self._Volume = []
self._tick = [] # 数据
self._last_bar_start_minute = None # 最后一次更新bar的时间
self._is_new_bar = False # 是否有新bar
self._ma20 = None
# 当前订单,dict, 字典
self._current_orders = {}
# 历史订单
self._history_orders = {}
self._order_number = 0
self._init = False # for backtesting
def get_tick(self):
\"\"\"
:func: for paper trading or real trading, not for backtesting
It goes to sina to get last tick info,
address is: https://hq.sinajs.cn/list=sh600519,
sh600519 can be changed
need to set headers Referer to: https://finance.sina.com.cn
A股的开盘时间是9:15,9:15-9:25是集合竞价 -> 开盘价,9:25
9:25-9:30不交易,时间>9:30,交易开始
start this method after 9:25
tick info is organized in tuple,
such as (trade_datetime, last_price),
tick info is save in self._tick.
:param: no param
:return: None
\"\"\"
headers = {\'Referer\': \"https://finance.sina.com.cn\"}
page = requests.get(\"https://hq.sinajs.cn/list=sh600519\", headers=headers)
stock_info = page.text
mt_info = stock_info.replace(\"\\\"\", \"\").split(\"=\")[1].split(\",\")
# 最新价
last = float(mt_info[1])
trade_datetime = mt_info[30] + \' \' + mt_info[31]
self._tick = (trade_datetime, last)
def get_history_data_from_local_machine(self):
\"\"\"
:not done yet
:return:
\"\"\"
# tushare 数据来源
# self.Open = [1, 2, 3]
# self.High = [2, 3, 4]
self._Open = []
self._High = []
self._Low = []
self._Close = []
self._Dt = []
def bar_generator(self):
\"\"\"
:not done yet
:how save and import history data?
:return:
\"\"\"
# assume we have history data already
# 1、update bars,calculate 5 minutes ma20 , not daily data
# 2、compare last and ma20 -> buy or sell or pass
# assume we have history data,Open,High,Low,Close,Dt
# 这里可以是5minutes、10minutes、15minutes、20minutes、30minutes
if self._tick[0].minute % 5 == 0 and self._tick[0].minute != self._last_bar_start_minute:
self._last_bar_start_minute = self._tick[0].minute
self._Open.insert(0, self._tick[1])
self._High.insert(0, self._tick[1])
self._Low.insert(0, self._tick[1])
self._Close.insert(0, self._tick[1])
self._Dt.insert(0, self._tick[0])
self._is_new_bar = True
else:
# update current bar
self._High[0] = max(self._High[0], self._tick[1])
self._Low[0] = max(self._Low[0], self._tick[1])
self._Close[0] = self._tick[1]
self._Dt[0] = self._tick[0]
self._is_new_bar = False
def _buy(self, price, volume):
\"\"\"
:method: create am order
:param1 price: buying price
:param2 volume: buying volume
:return: none
\"\"\"
self._order_number += 1
key = \"order\" + str(self._order_number)
self._current_orders[key] = {
\"open_datetime\": self._Dt[0],
\"open_price\": price,
\"volume\": volume # 股数
}
pass
def _sell(self, key, price):
\"\"\"
:method: close a long order, It needs two params
:param1 key: long order\'s key
:param2 price: selling price
:return:
\"\"\"
self._current_orders[key][\'close_price\'] = price
self._current_orders[key][\'close_datetime\'] = self._Dt[0]
self._current_orders[key][\'pnl\'] = \\
(price - self._current_orders[key][\'open_price\']) \\
* self._current_orders[key][\'volume\'] \\
- price * self._current_orders[key][\'volume\'] * 1 / 1000 \\
- (price - self._current_orders[key][\'open_price\']) \\
* self._current_orders[key][\'volume\'] * 3 / 10000
# move order from current orders to history orders
self._history_orders[key] = self._current_orders.pop(key)
def strategy(self):
# last < 0.95 *ma20 ,long position(仓位), last > ma20 *1.05, sell
if self._is_new_bar:
sum_ = 0
for item in self._Close[1:21]:
sum_ = sum_ + item
self._ma20 = sum_ / 20
if 0 == len(self._current_orders):
if self._Close[0] < 0.98 * self._ma20:
# 100000/44.28 = 2258 44.28是当前价格,10万指的你拥有的钱
# 2258 -> 2200 shares
volume = int(100000 / self._Close[0] / 100) * 100
self._buy(self._Close[0] + 0.01, volume) # 这里的0.01是为了防止挂单,我们需要即可买入
elif 1 == len(self._current_orders):
if self._Close[0] > self._ma20 * 1.02:
key = list(self._current_orders.keys())[0]
if self._Dt[0].date() != self._current_orders[key][\'open_datetime\'].date():
self._sell(key, self._Close[0] - 0.01)
print(\'open date is %s, close date is: %s.\'
% (self._history_orders[key][\'open_datetime\'].date(), self._Dt[0].date()))
else:
# if sam dates, sell order aborted due to T+0 limit
print(\'sell order aborted due to T+0 limit\')
else: # len() = 2
raise ValueError(\"we have more then 1 current orders\")
# Close[0] in between 0.95*ma20 and 1.05*ma20,do nothing
def bar_generator_for_backtesting(self, tick):
\"\"\"
:method: for backtesting only, used to update _Open, _ High, etc, It needs just one param
:param tick: tick info in tuple, (datetime, price)
:return:
\"\"\"
if tick[0].minute % 5 == 0 and tick[0].minute != self._last_bar_start_minute:
self._last_bar_start_minute = tick[0].minute
self._Open.insert(0, tick[1])
self._High.insert(0, tick[1])
self._Low.insert(0, tick[1])
self._Close.insert(0, tick[1])
self._Dt.insert(0, tick[0])
self._is_new_bar = True
else:
# update current bar
self._High[0] = max(self._High[0], tick[1])
self._Low[0] = max(self._Low[0], tick[1])
self._Close[0] = tick[1]
self._Dt[0] = tick[0]
self._is_new_bar = False
def run_backtestting(self, ticks):
\"\"\"
:method: ticks will be used to generate bars,
when bars is long enough, call strategy()
:param ticks: list with (datetime, price) in the list
:return: none
\"\"\"
for tick in ticks:
self.bar_generator_for_backtesting(tick)
if self._init:
self.strategy()
else:
if len(self._Open) >= 100:
self._init = True
self.strategy()
# ma = AstockTrading(\'600036\') # 类实例化
# ma.get_history_data_from_local_machine()
#
# # 交易时间是9:30-11:30,13:00-15:00
# while time(9, 26) < datetime.now().time() < time(11, 32) \\
# or time(13) < datetime.now().time() < time(15, 2):
# ma.get_tick()
# ma.bar_generator()
# ma.strategy()
# # trade_time = parser.parse(ma._tick[0]).time()
# # sleep(3)
if __name__ == \'__main__\':
tick_path = \"E:\\\\Downloads\\\\600036_data\\\\600036_ticks.csv\"
bar_path = \"E:\\\\Downloads\\\\600036_data\\\\600036_5m.csv\"
ticks = get_ticks_for_backtesting(tick_path, bar_path) # 获取回测数据
ast = AstockTrading(\'ma\')
ast.run_backtestting(ticks) # 运行回测数据
print(\'ast._current_orders:\')
print(ast._current_orders)
print(\"-------------------------------------\")
print(\'ast._history_orders:\')
print(ast._history_orders)
# 使用matplotlib绘制盈亏柱状图
profit_orders = 0 # 盈利的交易数
loss_orders = 0 # 亏损的交易数
orders = ast._history_orders
for key in orders.keys():
if orders[key][\'pnl\'] >= 0:
profit_orders += 1
else:
loss_orders += 1
win_rate = profit_orders / len(orders)
loss_rate = loss_orders / len(orders)
# T = transpose
orders_df = pd.DataFrame(orders).T
orders_df.loc[:, \'pnl\'].plot.bar()
plt.show()
# print sum of pnl
print(\'sum of pnl is: \' + str(orders_df.loc[:, \'pnl\'].sum()))
# 使用 mplfinance 绘制k线图:订单交易价格与时间
bar5 = pd.read_csv(bar_path, parse_dates=[\'datetime\'])
bar5.loc[:, \'datetime\'] = [date2num(x) for x in bar5.loc[:, \'datetime\']]
fig, ax = plt.subplots()
candlestick_ohlc(
ax,
quotes=bar5.values,
width=0.2,
colorup=\"r\",
colordown=\'g\',
alpha=1.0,
)
# put orders in candle sticks
for index, row in orders_df.iterrows():
ax.plot(
[row[\'open_datetime\'], row[\'close_datetime\']],
[row[\'open_price\'], row[\'close_price\']],
color=\'darkblue\',
marker=\'o\',
)
plt.show()