• python量化交易策略回测
2022-06-18 19:08:03

# 前言

• 生成1：matplotlib盈亏柱状图
• 生成2：k线图
• 生成3：投影订单进出场时间与价格到k线图

# 一、回测的主方法

if __name__ == '__main__':
ast.run_backtestting(ticks)  # 运行回测数据
print('ast._current_orders:')
print(ast._current_orders)
print("-------------------------------------")
print('ast._history_orders:')
print(ast._history_orders)

# 使用matplotlib绘制盈亏柱状图
profit_orders = 0  # 盈利的交易数
loss_orders = 0  # 亏损的交易数
orders = ast._history_orders
for key in orders.keys():
if orders[key]['pnl'] >= 0:
profit_orders += 1
else:
loss_orders += 1
win_rate = profit_orders / len(orders)
loss_rate = loss_orders / len(orders)
# T = transpose
orders_df = pd.DataFrame(orders).T
orders_df.loc[:, 'pnl'].plot.bar()
plt.show()

# 使用 mplfinance 绘制k线图：订单交易价格与时间
parse_dates=['datetime']
)
bar5.loc[:, 'datetime'] = [date2num(x) for x in bar5.loc[:, 'datetime']]

fig, ax = plt.subplots()
candlestick_ohlc(
ax,
quotes=bar5.values,
width=0.2,
colorup="r",
colordown='g',
alpha=1.0,
)

# put orders in candle sticks
for index, row in orders_df.iterrows():
ax.plot(
[row['open_datetime'], row['close_datetime']],
[row['open_price'], row['close_price']],
color='darkblue',
marker='o',
)
plt.show()


# 二、回测实现

## 1 - 获取回测数据ticks

def get_ticks_for_backtesting(tick_path, bar_path):
"""
:func: get ticks for backtesting, need two params
:param1 tick_path: 生成的回测数据路径
csv file with tick data,
when there is not tick data,
use bat_path to create tick data
:param2 bar_path: 历史数据的tick路径
csv file with bar data,
used in creating tick data
:return: ticks in list with tuples in it, such as
[(datetime, last_price), (datetime, last_price)]
"""
if os.path.exists(tick_path):  # 如果已存在回测数据，直接读取回测数据ticks
tick_path,
parse_dates=['datetime'],
index_col='datetime'
)

tick_list = []
for index, row in ticks.iterrows():
tick_list.append((index, row[0]))

ticks = np.array(tick_list)
else:
ticks = []

for index, row in bar_5m.iterrows():  # 根据不同的开盘价设置步长
if row['open'] < 30:
step = 0.01
elif row['open'] < 60:
step = 0.03
elif row['open'] < 90:
step = 0.05
else:
step = 0.1
# in case of np.arrange(30, 30.11, 0.02), (open, high, step)
# we will not have 30.11 as the highest price,
# we might not catch high when step is more than 0.01
# that is why me need: arr = np.append(arr, row['high']) and
#   arr = np.append(arr, row['low'])
arr = np.arange(row['open'], row['high'], step)  # 按步长生成从open到high的数据
arr = np.append(arr, row['high'])  # 这个是为了弥补步长的不对等会漏掉high
arr = np.append(arr, np.arange(row['open'] - step, row['low'], -step))  # 按步长生成从open到low的数据
arr = np.append(arr, row['low'])  # 这个是为了弥补步长的不对等会漏掉low
arr = np.append(arr, row['close'])

i = 0
dt = parser.parse(row['datetime']) - timedelta(minutes=5)
for item in arr:
ticks.append((dt + timedelta(seconds=0.1 * i), item))  # 将数据时间模拟到0.1秒递进
i += 1
tick_df = pd.DataFrame(ticks, columns=['datetime', 'price'])
tick_df.to_csv(tick_path, index=0)  # 保存到csv回测数据中
return ticks


## 2 - 运行回测

    def run_backtestting(self, ticks):
"""
:method: ticks will be used to generate bars,
when bars is long enough, call strategy()
:param ticks: list with (datetime, price) in the list
:return: none
"""
for tick in ticks:
self.bar_generator_for_backtesting(tick)
if self._init:
self.strategy()
else:
if len(self._Open) >= 100:
self._init = True
self.strategy()


## 3 - 为回测数据添加生成方法

    def bar_generator_for_backtesting(self, tick):
"""
:method: for backtesting only, used to update _Open, _ High, etc, It needs just one param
:param tick: tick info in tuple, (datetime, price)
:return:
"""
if tick[0].minute % 5 == 0 and tick[0].minute != self._last_bar_start_minute:
self._last_bar_start_minute = tick[0].minute
self._Open.insert(0, tick[1])
self._High.insert(0, tick[1])
self._Low.insert(0, tick[1])
self._Close.insert(0, tick[1])
self._Dt.insert(0, tick[0])
self._is_new_bar = True
else:
# update current bar
self._High[0] = max(self._High[0], tick[1])
self._Low[0] = max(self._Low[0], tick[1])
self._Close[0] = tick[1]
self._Dt[0] = tick[0]
self._is_new_bar = False


## 4 - sell中添加订单的pnl收益计算

    def _sell(self, key, price):
"""
:method: close a long order, It needs two params
:param1 key: long order's key
:param2 price: selling price
:return:
"""
self._current_orders[key]['close_price'] = price
self._current_orders[key]['close_datetime'] = self._Dt[0]
self._current_orders[key]['pnl'] = \
(price - self._current_orders[key]['open_price']) \
* self._current_orders[key]['volume'] \
- price * self._current_orders[key]['volume'] * 1 / 1000 \
- (price - self._current_orders[key]['open_price']) \
* self._current_orders[key]['volume'] * 3 / 10000

# move order from current orders to history orders
self._history_orders[key] = self._current_orders.pop(key)


## 5 - 策略执行中调整买卖ma20的比例

• 买入比例调整为ma20 * 0.97
• 卖出比例调整为ma20 * 1.03
    def strategy(self):
# last < 0.95 *ma20 ,long position(仓位), last > ma20 *1.05, sell
if self._is_new_bar:
sum_ = 0
for item in self._Close[1:21]:
sum_ = sum_ + item
self._ma20 = sum_ / 20

if 0 == len(self._current_orders):
if self._Close[0] < 0.97 * self._ma20:
# 100000/44.28 = 2258   44.28是当前价格，10万指的你拥有的钱
# 2258 -> 2200 shares
volume = int(100000 / self._Close[0] / 100) * 100
self._buy(self._Close[0] + 0.01, volume)  # 这里的0.01是为了防止挂单，我们需要即可买入
elif 1 == len(self._current_orders):
if self._Close[0] > self._ma20 * 1.03:
key = list(self._current_orders.keys())[0]
self._sell(key, self._Close[0] - 0.01)
else:  # len() = 2
raise ValueError("we have more then 1 current orders")
# Close[0] in between 0.95*ma20 and 1.05*ma20,do nothing


# 三、回测订单分析

## 1 - 订单打印数据

ast._current_orders:
{'order5': {'open_datetime': Timestamp('2020-11-30 13:25:00.800000'), 'open_price': 45.23, 'volume': 2200}}
-------------------------------------
ast._history_orders:
{
'order1': {'open_datetime': Timestamp('2020-02-03 09:30:00'), 'open_price': 32.79, 'volume': 3000, 'close_price': 29.56000000000001, 'close_datetime': Timestamp('2020-03-20 09:50:01'), 'pnl': -9775.77299999997},

'order2': {'open_datetime': Timestamp('2020-06-18 09:30:02.500000'), 'open_price': 32.129999999999995, 'volume': 3100, 'close_price': 35.25, 'close_datetime': Timestamp('2020-07-03 09:35:01'), 'pnl': 9559.823400000016},

'order3': {'open_datetime': Timestamp('2020-10-26 09:50:01.300000'), 'open_price': 40.12999999999999, 'volume': 2400, 'close_price': 41.41000000000003, 'close_datetime': Timestamp('2020-11-03 09:30:02.400000'), 'pnl': 2971.694400000105},

'order4': {'open_datetime': Timestamp('2020-11-13 09:40:00.700000'), 'open_price': 42.14, 'volume': 2300, 'close_price': 46.80000000000004, 'close_datetime': Timestamp('2020-11-30 09:30:03.200000'), 'pnl': 10607.144600000092}}


# 五、完整源码

import requests
from time import sleep
from datetime import datetime, time, timedelta
from dateutil import parser
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
# import mplfinance as mpf
from mplfinance.original_flavor import candlestick_ohlc
from matplotlib.dates import date2num

def get_ticks_for_backtesting(tick_path, bar_path):
"""
:func: get ticks for backtesting, need two params
:param1 tick_path: 生成的回测数据路径
csv file with tick data,
when there is not tick data,
use bat_path to create tick data
:param2 bar_path: 历史数据的tick路径
csv file with bar data,
used in creating tick data
:return: ticks in list with tuples in it, such as
[(datetime, last_price), (datetime, last_price)]
"""
if os.path.exists(tick_path):  # 如果已存在回测数据，直接读取回测数据ticks
tick_path,
parse_dates=['datetime'],
index_col='datetime'
)

tick_list = []
for index, row in ticks.iterrows():
tick_list.append((index, row[0]))

ticks = np.array(tick_list)
else:
ticks = []

for index, row in bar_5m.iterrows():  # 根据不同的开盘价设置步长
if row['open'] < 30:
step = 0.01
elif row['open'] < 60:
step = 0.03
elif row['open'] < 90:
step = 0.05
else:
step = 0.1
# in case of np.arrange(30, 30.11, 0.02), (open, high, step)
# we will not have 30.11 as the highest price,
# we might not catch high when step is more than 0.01
# that is why me need: arr = np.append(arr, row['high']) and
#   arr = np.append(arr, row['low'])
arr = np.arange(row['open'], row['high'], step)  # 按步长生成从open到high的数据
arr = np.append(arr, row['high'])  # 这个是为了弥补步长的不对等会漏掉high
arr = np.append(arr, np.arange(row['open'] - step, row['low'], -step))  # 按步长生成从open到low的数据
arr = np.append(arr, row['low'])  # 这个是为了弥补步长的不对等会漏掉low
arr = np.append(arr, row['close'])

i = 0
dt = parser.parse(row['datetime']) - timedelta(minutes=5)
for item in arr:
ticks.append((dt + timedelta(seconds=0.1 * i), item))  # 将数据时间模拟到0.1秒递进
i += 1
tick_df = pd.DataFrame(ticks, columns=['datetime', 'price'])
tick_df.to_csv(tick_path, index=0)  # 保存到csv回测数据中
return ticks

# __init__，构造，初始化，实例化
"""
:class: A stock trading platform, needs one param,
:param1: strategy_name: strategy_name
"""

def __init__(self, strategy_name):
self._strategy_name = strategy_name
self._Dt = []  # 交易时间
self._Open = []  # 开盘价
self._High = []  # 最高价
self._Low = []  # 最低价
self._Close = []  # 最新价
self._Volume = []
self._tick = []  # 数据
self._last_bar_start_minute = None  # 最后一次更新bar的时间
self._is_new_bar = False  # 是否有新bar
self._ma20 = None

# 当前订单，dict, 字典
self._current_orders = {}
# 历史订单
self._history_orders = {}
self._order_number = 0
self._init = False  # for backtesting

def get_tick(self):
"""
It goes to sina to get last tick info,
sh600519 can be changed
need to set headers Referer to: https://finance.sina.com.cn
A股的开盘时间是9:15，9:15-9:25是集合竞价 -> 开盘价，9:25
9:25-9:30不交易，时间>9:30，交易开始
start this method after 9:25
tick info is organized in tuple,
tick info is save in self._tick.
:param: no param
:return: None
"""
stock_info = page.text
mt_info = stock_info.replace("\"", "").split("=")[1].split(",")
# 最新价
last = float(mt_info[1])
trade_datetime = mt_info[30] + ' ' + mt_info[31]

def get_history_data_from_local_machine(self):
"""
:not done yet
:return:
"""
# tushare 数据来源
# self.Open = [1, 2, 3]
# self.High = [2, 3, 4]
self._Open = []
self._High = []
self._Low = []
self._Close = []
self._Dt = []

def bar_generator(self):
"""
:not done yet
:how save and import history data?
:return:
"""
# assume we have history data already
# 1、update bars，calculate 5 minutes ma20 , not daily data
# 2、compare last and ma20  -> buy or sell or pass
# assume we have history data,Open,High,Low,Close,Dt
# 这里可以是5minutes、10minutes、15minutes、20minutes、30minutes
if self._tick[0].minute % 5 == 0 and self._tick[0].minute != self._last_bar_start_minute:
self._last_bar_start_minute = self._tick[0].minute
self._Open.insert(0, self._tick[1])
self._High.insert(0, self._tick[1])
self._Low.insert(0, self._tick[1])
self._Close.insert(0, self._tick[1])
self._Dt.insert(0, self._tick[0])
self._is_new_bar = True
else:
# update current bar
self._High[0] = max(self._High[0], self._tick[1])
self._Low[0] = max(self._Low[0], self._tick[1])
self._Close[0] = self._tick[1]
self._Dt[0] = self._tick[0]
self._is_new_bar = False

"""
:method: create am order
:return: none
"""
self._order_number += 1
key = "order" + str(self._order_number)
self._current_orders[key] = {
"open_datetime": self._Dt[0],
"open_price": price,
"volume": volume  # 股数
}
pass

def _sell(self, key, price):
"""
:method: close a long order, It needs two params
:param1 key: long order's key
:param2 price: selling price
:return:
"""
self._current_orders[key]['close_price'] = price
self._current_orders[key]['close_datetime'] = self._Dt[0]
self._current_orders[key]['pnl'] = \
(price - self._current_orders[key]['open_price']) \
* self._current_orders[key]['volume'] \
- price * self._current_orders[key]['volume'] * 1 / 1000 \
- (price - self._current_orders[key]['open_price']) \
* self._current_orders[key]['volume'] * 3 / 10000

# move order from current orders to history orders
self._history_orders[key] = self._current_orders.pop(key)

def strategy(self):
# last < 0.95 *ma20 ,long position(仓位), last > ma20 *1.05, sell
if self._is_new_bar:
sum_ = 0
for item in self._Close[1:21]:
sum_ = sum_ + item
self._ma20 = sum_ / 20

if 0 == len(self._current_orders):
if self._Close[0] < 0.97 * self._ma20:
# 100000/44.28 = 2258   44.28是当前价格，10万指的你拥有的钱
# 2258 -> 2200 shares
volume = int(100000 / self._Close[0] / 100) * 100
self._buy(self._Close[0] + 0.01, volume)  # 这里的0.01是为了防止挂单，我们需要即可买入
elif 1 == len(self._current_orders):
if self._Close[0] > self._ma20 * 1.03:
key = list(self._current_orders.keys())[0]
self._sell(key, self._Close[0] - 0.01)
else:  # len() = 2
raise ValueError("we have more then 1 current orders")
# Close[0] in between 0.95*ma20 and 1.05*ma20,do nothing

def bar_generator_for_backtesting(self, tick):
"""
:method: for backtesting only, used to update _Open, _ High, etc, It needs just one param
:param tick: tick info in tuple, (datetime, price)
:return:
"""
if tick[0].minute % 5 == 0 and tick[0].minute != self._last_bar_start_minute:
self._last_bar_start_minute = tick[0].minute
self._Open.insert(0, tick[1])
self._High.insert(0, tick[1])
self._Low.insert(0, tick[1])
self._Close.insert(0, tick[1])
self._Dt.insert(0, tick[0])
self._is_new_bar = True
else:
# update current bar
self._High[0] = max(self._High[0], tick[1])
self._Low[0] = max(self._Low[0], tick[1])
self._Close[0] = tick[1]
self._Dt[0] = tick[0]
self._is_new_bar = False

def run_backtestting(self, ticks):
"""
:method: ticks will be used to generate bars,
when bars is long enough, call strategy()
:param ticks: list with (datetime, price) in the list
:return: none
"""
for tick in ticks:
self.bar_generator_for_backtesting(tick)
if self._init:
self.strategy()
else:
if len(self._Open) >= 100:
self._init = True
self.strategy()

# ma = AstockTrading('600036')  # 类实例化
# ma.get_history_data_from_local_machine()
#
# # 交易时间是9：30-11:30,13：00-15：00
# while time(9, 26) < datetime.now().time() < time(11, 32) \
#         or time(13) < datetime.now().time() < time(15, 2):
#     ma.get_tick()
#     ma.bar_generator()
#     ma.strategy()
#     # sleep(3)

if __name__ == '__main__':
ast.run_backtestting(ticks)  # 运行回测数据
print('ast._current_orders:')
print(ast._current_orders)
print("-------------------------------------")
print('ast._history_orders:')
print(ast._history_orders)

# 使用matplotlib绘制盈亏柱状图
profit_orders = 0  # 盈利的交易数
loss_orders = 0  # 亏损的交易数
orders = ast._history_orders
for key in orders.keys():
if orders[key]['pnl'] >= 0:
profit_orders += 1
else:
loss_orders += 1
win_rate = profit_orders / len(orders)
loss_rate = loss_orders / len(orders)
# T = transpose
orders_df = pd.DataFrame(orders).T
orders_df.loc[:, 'pnl'].plot.bar()
plt.show()

# 使用 mplfinance 绘制k线图：订单交易价格与时间
parse_dates=['datetime']
)
bar5.loc[:, 'datetime'] = [date2num(x) for x in bar5.loc[:, 'datetime']]

fig, ax = plt.subplots()
candlestick_ohlc(
ax,
quotes=bar5.values,
width=0.2,
colorup="r",
colordown='g',
alpha=1.0,
)

# put orders in candle sticks
for index, row in orders_df.iterrows():
ax.plot(
[row['open_datetime'], row['close_datetime']],
[row['open_price'], row['close_price']],
color='darkblue',
marker='o',
)
plt.show()

更多相关内容
• 博文《Python量化交易策略及回测系统》详细的介绍了Python量化交易策略编写的全过程，包括： 1、数据的获取 2、量化交易策略回测系统的编写 3、量化交易策略的设计 4、使用量化交易策略及回测系统对多个股票进行回测...
• ## Python量化交易策略及回测系统

千次阅读 多人点赞 2022-03-11 20:30:37
目录前言：行文思路1、模块导入2、数据获取3、股票数据类型转换4、策略编写5、回测系统编写6、实例化策略非面向对象的编程 前言：行文思路 由于本文篇幅较长，而且文中关于python数据分析的知识点、python金融量化的...

# 前言：行文思路

由于本文篇幅较长，而且文中关于python数据分析的知识点、python金融量化的知识点较多，因此在文章的开头先给出本文整体思路，以便读者能够拥有较为清晰的脉络通读全文。
第一部分：模块导入，主要是将后面需要用到的模块进行导入（简单，非重点）
第二部分：数据获取，鉴于在网络上股票数据不易找到，Wind金融终端等数据库数据收费，通过多方查找，终于让我找到了能够免费下载股票数据的模块，建议大家收藏（简单，非重点）
第三部分：将股票数据转化为新的数据类型，通过上面的方法下载下来的数据类型是我们常见的DataFrame，虽然pandas的功能已经很强大了，但是为了加入新的数据指标以及方便下一步操作，最好还是将DataFrame数据转化为一种新的数据类型（较难，非重点）
第四部分：策略编写，也就是利用代码将我们在股票市场中的交易原则表达出来（较难，重点）
第五部分：回测系统编写，股票回测即是基于历史已经发生过的真实行情数据，在历史上某一段时间内，模拟真实金融市场中股票的买入、卖出，得出这个时间段内的盈利率等数据（较难，重点）
第六部分：实例化策略并回测得到收益，继承一个策略类，得到一个实际的例子，利用股票数据回测得出结果（简单，重点）

# 1、模块导入

import akshare as ak
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import namedtuple
from collections import OrderedDict
from collections.abc import Iterable
from functools import reduce
from abc import ABC, abstractmethod


akshare：用于下载股票的交易数据(前复权)
collections：对基本数据类型做进一步处理,实现特定的数据类型
abc：主要定义了基本类和最基本的抽象方法，可以为子类定义共有的接口API，不需要具体实现

# 2、数据获取

# 获取某一只股票一段时间的数据
stock_sz300750_df = ak.stock_zh_a_daily(symbol="sz300750", start_date="20200103", end_date="20211231", adjust="qfq")
# list_date = list(stock_sz300750_df['date'])
# stock_sz300750_df.index = list_date


函数ak.stock_zh_a_daily()用于获取A股股票数据

# 3、股票数据类型转换

由于后面写了两个量化交易策略，而且策略中的有部分指标不相同，所以在这一部分以及下面回测系统两部分面向对象编程，有部分函数只用于策略一，有部分只用于策略二。

# 将股票数据转化为新的数据类型
def __init__(self, price_array, date_array=None):
self.price_array = price_array
self.date_array = self._init_days(date_array)
self.change_array = self._init_change()
self.s_short = self._init_sma_short(price_array)
self.s_long = self._init_sma_long(price_array)
self.stock_dict = self._init_stock_dict()

def _init_change(self):
# 收益率

change_array = self.price_array.pct_change(periods=1)
return change_array

def _init_days(self, date_array):
# 日期序列

date_array = [str(date) for date in date_array]
return date_array

def _init_sma_short(self, price_array):
# 5日移动平均线

s_short = price_array.rolling(window=5, min_periods=5).mean()
return s_short

def _init_sma_long(self, price_array):
# 30日移动平均线

s_long = price_array.rolling(window=30, min_periods=30).mean()
return s_long

def _init_stock_dict(self):
# 将股票的日期、收盘价、涨跌幅转化为一个新的数据类型

stock_namedtuple = namedtuple('stock', ('date', 'price', 'change', 's_short', 's_long'))

# 使用以被赋值的date_array等进行OrderedDict的组装
stock_dict = OrderedDict((date, stock_namedtuple(date, price, change, s_short, s_long)) for date, price, change, s_short, s_long
in zip(self.date_array, self.price_array, self.change_array, self.s_short, self.s_long))
return stock_dict

def filter_stock(self, want_up=True, want_calc_sum=False):
# 判断交易日股票是上涨还是下跌
# Python中的三目表达式的写法
filter_func = (lambda p_day: p_day.change > 0) if want_up else (lambda p_day: p_day.change < 0)
# 使用filter_func做筛选函数
want_days = list(filter(filter_func, self.stock_dict.values()))

if not want_calc_sum:
return want_days

# 需要计算涨跌幅和
change_sum = 0.0
for day in want_days:
change_sum += day.change
return change_sum


相同指标：date、price、change
策略一：s_short、s_long分别为5日移动平均线和30日移动平均线；可以根据自己的需求更改参数数据
策略二：函数filter_stock()，用于判断交易日股票是上涨还是下跌

最后将DataFrame数据转换为自定义数据类型OrderedDict

trade_days = StockTradeDays(stock_sz300750_df['close'], stock_sz300750_df['date'])
print(day)


# 4、回测系统编写

class TradeStrategyBase(ABC, object):   # 只能被继承，不能实例化
# 交易策略抽象基类

@abstractmethod
# 买入策略基类
pass

@abstractmethod
def sell_strategy(self, *args, **kwargs):
# 卖出策略基类
pass

# 交易回测系统

"""
"""
# 交易盈亏结果序列
self.profit_array = []

# 执行交易回测

# 以时间驱动，完成交易回测

# 如果有持有股票，加入交易盈亏结果序列
self.profit_array.append(day.change)
#                 print("日期：{}，持有中".format(day.date))

# hasattr: 用来查询对象有没有实现某个方法
# 买入策略执行
#                     print("日期：{}，买入策略执行".format(day.date))

# 卖出策略执行
#                     print("日期：{}，卖出策略执行".format(day.date))


# 5、策略编写

class TradeStrategy1(TradeStrategyBase):
"""
交易策略1: 利用5日移动平均线与30日移动平均线交叉点进行股票买卖
当5日移动平均线从下往上穿过30日移动平均线时，买入股票并持有
当5日移动平均线从上往下穿过30日移动平均线时，卖出股票
"""
def __init__(self, stock_df):
self.keep_stock_day = -1

# 只有当长期移动平均线的数据有了，才能进行下一步操作

# 今日移动平均线值

# 昨日移动平均线值

if today_short > today_long and yesterday_short < yesterday_long:
# 买入条件成立：
self.keep_stock_day = 1

# 只有当长期移动平均线的数据有了，才能进行下一步操作

# 今日移动平均线值

# 昨日移动平均线值

if today_short < today_long and yesterday_short > yesterday_long:
# 卖出条件成立：
self.keep_stock_day = 0


移动平均线是将一定时期内的股票价格加以平均，把不同时间的平均值连接起来形成一根MA，利用长短期的移动平均线交叉点观察股票价格变动趋势的一种技术指标。因此，只有到了第30天才可以获得30日移动平均值，才可能进行买卖。
判断买入条件：当短期移动平均线从下往上穿过长期移动平均线时，可以认为短期内股价的趋势向上，股价可能会上涨
判断卖出条件：当短期移动平均线从上往下穿过长期移动平均线时，可以认为短期内股价的趋势向下，股价可能会下跌

class TradeStrategy2(TradeStrategyBase):
"""
交易策略2: 追涨杀跌策略，当股价连续两个交易日上涨
当股价连续两个交易日下跌
且下跌幅度超过阀值默认s_sell_change_threshold()，卖出股票
"""
def __init__(self):
self.keep_stock_day = 0
self.s_sell_change_threshold = -0.05   #下跌卖出阀值

if self.keep_stock_day == 0 and trade_ind >= 1:
"""
当没有持有股票的时候self.keep_stock_day == 0 并且
"""
# 昨天是否股价上涨
# 两天总涨幅
if today_down and yesterday_down and down_rate > self.s_buy_change_threshold:
self.keep_stock_day += 1

# 昨天是否股价下跌
# 两天总跌幅
if today_down and yesterday_down and down_rate < self.s_sell_change_threshold:
# 卖出条件成立：连跌两天，跌幅超过s_sell_change_threshold
self.keep_stock_day = 0

@property
# getter获取属性函数

# setter属性赋值
"""
上涨阀值需要为float类型
"""
# 上涨阀值只取小数点后两位

@property
def s_sell_change_threshold(self):
# getter获取属性函数
return self.__s_sell_change_threshold

@s_sell_change_threshold.setter
def s_sell_change_threshold(self, s_sell_change_threshold):
# setter属性赋值
if not isinstance(s_sell_change_threshold, float):
"""
上涨阀值需要为float类型
"""
# 上涨阀值只取小数点后两位
self.__s_sell_change_threshold = round(s_sell_change_threshold, 2)


策略二可以认为是非理性人在股票市场中交易时，遇到多日上涨且上涨幅度较大时，会认为股票有继续上涨的趋势，为了获利所以买入股票；但当某一股票连续下跌且下跌幅度超过心理预期时，会认为股票又继续下跌的趋势，为了止损卖出股票。
策略二中买入股票条件为：当股价连续两个交易日上涨且上涨幅度超过0.05，买入股票并持有
卖出条件为：当股价连续两个交易日下跌且下跌幅度超过-0.05，卖出股票
相关参数可以根据需求修改

# 6、实例化策略

# 实例化策略1
print('回测策略1总盈亏为：{}%'.format(reduce(lambda a, b: a + b, trade_loop_back.profit_array) * 100))


经过前面的所有步骤之后，就可以实例化一个交易策略，利用交易数据进行回测，可得到相应的结果：

# 实例化策略2
print('回测策略2总盈亏为：{}%'.format(reduce(lambda a, b: a + b, trade_loop_back.profit_array) * 100))


结果：

# 非面向对象的编程

由于对面向对象编程不太擅长，所以我对两个策略又分别写了新的程序，以判断上文面向对象程序是否正确

changes_list_1 = []
flag = -1
short2 = day.s_short
long2 = day.s_long
if pd.isna(long1):
continue
if flag == 1:
changes_list_1.append(day.change)
print("日期：{}，持有中".format(day.date))
if short2 > long2 and short1 < long1:
flag = 1
print("日期：{}，买入策略执行".format(day.date))
if short2 < long2 and short1 > long1:
flag = 0
print("日期：{}，卖出策略执行".format(day.date))

print('回测策略1总盈亏为：{}%'.format(reduce(lambda a, b: a + b, changes_list_1) * 100))
plt.plot(np.array(changes_list_1).cumsum())


结果：

# 策略2
changes_list_2 = []
flag = 0
today_down = day.change
if flag > 0:
changes_list_2.append(day.change)
print("日期：{}，持有中".format(day.date))
if today_down > 0 and yesterday_down > 0 and today_down + yesterday_down > 0.01:
flag += 1
print("日期：{}，买入策略执行".format(day.date))
if today_down < 0 and yesterday_down < 0 and today_down + yesterday_down < -0.01:
flag = 0
print("日期：{}，卖出策略执行".format(day.date))

print('回测策略2总盈亏为：{}%'.format(reduce(lambda a, b: a + b, changes_list_2) * 100))
plt.plot(np.array(changes_list_2).cumsum())


结果：

# 分析总结

以上策略只用于量化分析，并不适合用于实际交易，之所以有较高的盈利，得益于宁王领衔的新能源板块的强势，大家也可以试试其他的股票，比如药明康德（代码：SH603259)

可以看出策略对该股票进行回测交易时，获得的盈利并不客观，甚至出现较大的亏损，因此，需要对相关策略进行参数调整修改，或者发掘其他更为有效的策略……

最后，大家如果觉得文章写的不错的话，可以点赞、收藏、关注三连哦~文中出现的所有代码已经打包上传至我的资源了，可以下载下来研究分析和运行查看

展开全文
• 这篇文章主要介绍如何使用Python对一些简单的交易策略进行回测，对这块比较感兴趣的朋友可以看一看。1.获取证券数据本文以A股市场为例，先获取A股近10年的数据并保存到数据库。1.1.安装数据库（MongoDB）为了提升...

这篇文章主要介绍如何使用Python对一些简单的交易策略进行回测，对这块比较感兴趣的朋友可以看一看。

1.获取证券数据

本文以A股市场为例，先获取A股近10年的数据并保存到数据库。

1.1.安装数据库（MongoDB）

为了提升运行效率，需要将证券数据保存到本地数据库，这里我们选择的数据库是MongoDB，安装过程在此不再赘述，参照http://www.runoob.com/mongodb/mongodb-window-install.html即可，比较简单。

1.2.编写数据库操作类

安装完数据库，我们先编写一个工具类来管理数据库的增删改查等操作：

class DBManager:

def __init__(self, table_name):

self.client = MongoClient("127.0.0.1", 27017)

self.db = self.client["my_database"]

self.table = self.db[table_name]

def clsoe_db(self):

self.client.close()

# 获取股票代码列表(sz格式)

def get_code_list(self):

return self.table.find({}, {"ticker": 1}, no_cursor_timeout=True)

# 查询多条数据

def find_by_key(self, request=None):

if request is None:

request = {}

return self.table.find(request)

# 查询单条数据

def find_one_by_key(self, request=None):

if request is None:

request = {}

return self.table.find_one(request)

# 添加单条数据

# 添加一条数据

post['created_time'] = created_time

return self.table.insert_one(post)

1.3.获取数据

获取证券数据的途径主要有两种，第一种是去网上找现成的数据接口，通过调用接口获取数据，这种方式简单便捷，数据的准确性有保障；第二种是自己编写数据爬虫获取数据，这种方式会相对麻烦一点。本文采用的是第一种方式。使用的数据接口是http://www.baostock.com/。

调用数据接口：

code_list = dm.get_code_list() # 获取股票代码列表

for item in code_list:

max_try = 8 # 失败重连的最大次数

ticker = item["ticker"]

for tries in range(max_try):

if rs.error_code == '0':

parse_pager(rs, ticker) # 解析数据

break

elif tries < (max_try - 1):

sleep(2)

continue

else:

log.logger.error("加载数据失败：" + str(ticker))

log.logger.info("加载数据完成")

bs.logout()

解析数据并保存到数据库：

# 解析数据并保存到数据库

def parse_pager(content, ticker):

while content.next():

item_row = content.get_row_data()

__dict = {

"date": item_row[0],

"code": item_row[1],

"open": item_row[2],

"high": item_row[3],

"low": item_row[4],

"close": item_row[5],

"volume": item_row[6],

"amount": item_row[7],

"turn": item_row[9],

"pctChg": item_row[10]

}

2.编写交易逻辑

为了便于描述，本文选择了一个较为简单的交易逻辑。以周为交易周期，每周一开盘前分析各股的周macd数据，满足交易条件则以开盘价买入并持有一周，再以当周五的收盘价卖出。这个交易逻辑比较简单且实操性强，回测的结果也有可圈可点之处（回测结果见文末）。交易逻辑的核心代码如下：

if wmacd_list[-1] > 0 >= wmacd_list[-2]: # 判断某支股票是否符合当前交易逻辑

if np.mean(volume_list[-5:-1]) < volume_list[-1]:

if 0.1 >= diff_list[-1] >= 0:

data = [x for x in dm_tk.find_one_by_key({"ticker": item["ticker"]})["data_list"] if x["date"] == cur_date][0]

result_list.append(data)

def get_w_macd(price_list): # 生成每支股票的周macd数据

ema_12_list = list()

for index in range(len(price_list)):

if index == 0:

ema_12_list.append(price_list[0])

else:

ema_12_list.append(round(ema_12_list[index - 1] * 11 / 13 + price_list[index] * 2 / 13, 4))

ema_26_list = list()

for index in range(len(price_list)):

if index == 0:

ema_26_list.append(price_list[0])

else:

ema_26_list.append(round(ema_26_list[index - 1] * 25 / 27 + price_list[index] * 2 / 27, 4))

diff_list = list()

for index in range(len(ema_12_list)):

diff = ema_12_list[index] - ema_26_list[index]

diff_list.append(diff)

dea_list = list()

for index in range(len(diff_list)):

if index == 0:

dea_list.append(diff_list[0])

else:

dea_list.append(round(dea_list[index - 1] * 0.8 + diff_list[index] * 0.2, 4))

wmacd_list = list()

for index in range(len(dea_list)):

bar = (diff_list[index] - dea_list[index]) * 3

wmacd_list.append(bar)

return wmacd_list, diff_list, dea_list

以上只是该交易策略的部分代码，读者不需要看懂其中的逻辑，实际操作过程中应该使用自己的交易策略。

3.模拟交易操作

编写好交易策略后，我们开始对交易策略进行回测。首先我们设定一些初始数据，这些数据是我们日常交易中常见到的，比如初始资金总额、当前可用资金、最大仓位等等，由于该交易策略比较简单，所以我们只需要设定起始资金就可以了：

capital_base = 1000000 # 起始资金设定为100万

history_capital = list() # 用于记录交易结果

接着我们创建一条时间轴，所有的交易操作都将跟随时间轴进行：

# 生成时间轴

def date_range(start, end, step=1, format="%Y-%m-%d"):

strptime, strftime = datetime.datetime.strptime, datetime.datetime.strftime

days = (strptime(end, format) - strptime(start, format)).days + 1

return [strftime(strptime(start, format) + datetime.timedelta(i), format) for i in range(0, days, step)]

date_list = date_range("2016-01-01", "2016-12-31") # 生成2016-01-01至2016-12-31的所有时间点

生成好时间轴后，使用for循环遍历时间轴（模拟时间推进，并且过滤掉周末和节假日），按照之前设定的交易策略，我们在每周一和周五进行买入卖出操作即可。由于该策略不涉及加减仓，故我们对交易过程进行了简化，通过直接计算得出每周的收益。

对于更为复杂的交易策略，建议开发者分别实现开仓、平仓和加减仓等各种操作：

for cur_date in date_list:

if datetime.datetime.strptime(cur_date, "%Y-%m-%d").weekday() == 4: # 判断当前日期是否需要操作

result_list = list() # 用于记录当前时间符合交易条件的股票代码

for item in code_list: # 遍历各支股票，筛选出符合交易条件的股票

-执行交易逻辑-

if 符合交易条件:

result_list.append(data)

# 计算本次操作的收益

if result_list:

capital = capital_base / len(result_list) # 对当前资金进行均分

temp_capital = 0

for item in result_list:

close_price = float(item["close"])

open_price = float(item["open"])

max_price = float(item["high"])

profit = (close_price - open_price) / open_price

temp_capital += (capital * (1 + profit))

capital_base = temp_capital

history_capital.append(capital_base) # 记录本次操作后剩余的资金

4.统计结果和绘图

模拟交易完成后我们来对结果进行统计，由于我们已经将交易的过程记录在history_capital中，此时我们可以轻松的计算出收益率：

net_rate = (history_capital[-1] - history_capital[0]) / history_capital[0] # 计算回测结果

log.logger.info("total_profit：" + str(round(net_rate * 100, 2)) + "%")

为了让交易的结果更加直观，我们还可以将其绘制成折线图，这里使用matplotlib进行绘图：

plt.subplot(111)

lable_x = np.arange(len(history_capital))

plt.plot(lable_x, history_capital, color="r", linewidth=1.0, linestyle="-")

plt.xlim(lable_x.min(), lable_x.max() * 1.1)

plt.ylim(min(history_capital) * 0.9, max(history_capital) * 1.1)

plt.grid(True)

plt.show()

回测结果展示（该收益曲线是在限制最大仓位的条件下得出的，如果取消该限制，收益率将更高）：

2014年全年收益

2015年全年收益

2016年全年收益

2017年全年收益

到此为止，我们就完成了对某个交易策略进行回测的全部流程，从回测结果中可以看出，该交易策略在2014-2017这4年中有着不错的表现。按照策略的规则，交易者只需要在每周一开盘前运行策略，开盘后买进策略推荐的股票，最后在周五收盘前卖掉即可，是易于实盘操作的。

读者也可以自由变换交易逻辑来获取不同的结果，通过对回测结果进行分析，可以对我们日常的交易带来一些帮助。

与我交流：1003882179@qq.com

展开全文
• python大作业股票量化回测源代码股票量化回测Python解决方案。 类的划分 数据读取类：ReadFile 所在文件：fileRW.py 读入pickle类型的原始数据。 单只股票信息管理类：StockInfo 所在文件：stockInfo.py 给定股票...
• 我们可以尝试做这样的改进：在股票a，b，c……的历史数据上分别进行策略回测，找到一个能够稳定收益策略B，来避免时间成本浪费的问题。但是这样仍然存在问题，在等待股票a出现买点的时候，股票b，c……的买点可能也...
• Python量化交易入门 量化交易的历史 Python量化交易项目怎么做 文章目录学习目标：一、基础回测框架二、云端的框架三、不去实现一个回测框架的原因四、RiceQuant回测平台介绍4.1 注册账号 学习目标： 了解常见的...
**同学们前面两期量化交易内容：**

了解常见的回测框架

# 一、基础回测框架

Zipline本身只支持美国的证券，无法更好的使用数据，本地运行速度慢

# 二、云端的框架

提供部分满足需求的数据（但是平台数据质量不行，指标不完整）
策略运行在远端服务器

这些线上平台提供了本地专业版，但是需要收费

# 三、不去实现一个回测框架的原因

1、没有完整的股票行情和基本面数据
2、回测平台是载体，重点在于快速验证策略
3、证券投资机构各自使用回测框架不同，没有通用的框架

# 四、RiceQuant回测平台介绍

网址：https://www.ricequant.com/

## 4.1 注册账号

展开全文
• Python量化交易回测框架介绍 Python量化交易策略创建运行流程 6. Python量化交易：数据获取接口 文章目录学习目标：1、用于股票的交易函数1.1 交易函数API1.1.1 order_shares - 指定股数交易（股票专用）返回...
• 策略回测 实盘交易 历史数据均免费来自于网络 Wind免费个人接口 TuShare 实盘微信提醒及交互 一键挂机 全自动交易 模拟交易，支持9个模拟账号 实盘和回测共用同一策略代码 实盘策略编写模板 选股策略编写模板 自动...
• python 量化策略回测Pairs trading is one of the many mean-reversion strategies. It is considered non-directional and relative as it aims to trade on both related stocks with similar statistical and ...
• 唐奇安通道策略-python量化 这里简单的介绍关于唐奇安通道策略的相关理论以及python代码，抛砖引玉。 前言 唐奇安通道是海龟交易策略中需要应用到的一个指标。 简单而言唐奇安通道是由一条上轨线、中线和下线组成，...
• 使用qteasy创建并优化一个大小盘轮动选股投资策略，通过2011年到2020年十年间的回测，实现比沪深300高20倍的回报，年化收益26%
• 这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题，有助于目录的生成如何改变...Python双均线策略回测 欢迎使用Markdown编辑器 你好！ 这是你第一次使用 Markdown编辑器 所展示的欢迎页
• 本模块为量化策略回测框架中的指标系统，该模块用于衡量策略表现，现可支持如下指标： 表 1：回测系统策略表现衡量指标及说明 指标名称 代码指标 指标含义说明 年化收益率 Return 策略的年化收益率 年化...
• 点击上方“Python爬虫与数据挖掘”，进行关注回复“书籍”即可获赠Python从入门到进阶共10本电子书今日鸡汤桃李春风一杯酒，江湖夜雨十年灯。一、简介大家好，我是Snowball。今天...
• ./example量化策略模板示例 ./collect市场行情采集服务 ./datamatrix包含一些datamatrix示例。 ./backtest包含一些策略回测示例。 ./notebook存放策略研究相关文件，如.ipynb文件 ./web量化交易接口API（纯...
• 量化交易策略之海龟交易python版，用户可修改参数进行自定义，可借助米匡、聚宽等网站平台实现量化交易
• 期货CTA策略
• ## python实现量化交易策略

千次阅读 多人点赞 2021-11-17 17:43:27
python实现量化交易策略 1 前言 相信大家都听说过股票，很羡慕那些炒股大佬，觉得量化投资非常高深，本文教大家用python实现简单的量化交易策略。 2 构建策略 炒股是一个概率游戏，强如巴菲特也没办法保证这只股票...
• 利用Python编写好策略，选择选好的股票池。 2。设置开始和结束的时间点，然后设定资金池 3。通过股票池和日期获得股票数据，然后按照设定的间隔，比如每天/每 分钟调用回测函数。 4。下单后，交易软件处理交易。 5。...
• Python量化交易】——1、封装交易所API 在刚刚过去的一个星期里，博主一直在捣鼓 Python量化交易 的内容。在写这篇文章的时候已经用python实现...【Python量化交易】——2、利用python实现网格法交易策略以及回测 【P
• RQAlpha的逻辑也将会在Ricequant的一些回测部分使用，Ricequant - 是一个开放的量化算法交易社区，有免费的服务器资源给大家测试、实盘模拟您的交易算法，并且可以将交易信号通过微信和邮件实时推送给大家。...
• 好友提出要验证连续下跌买入止盈止损卖出策略，本文对该策略回测和实现做分析记录。 买入条件中，连续下跌定义为收盘价连续4日低于前1日的收盘价。卖出条件中，止盈率设置为10%，止损率设置为5%。回测初始资金100000...
• 以笔记（14）中介绍的均线交叉策略为例，实现不同长期、短期均线参数组合的优化测试，回测股票为000001平安银行，回测周期为2018年1月1日至2020年4月15日。 方案1——使用多个list 在向cerebro中添加策略时，使用...

...