-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathctaEngine.py
262 lines (232 loc) · 10.3 KB
/
ctaEngine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# encoding: UTF-8
'''
本文件中实现了CTA策略引擎,针对CTA类型的策略,抽象简化了部分底层接口的功能。
'''
import os
import time
import copy
import json
import pymongo
import ctaTaskPool
ctaTaskPool.taskPool = ctaTaskPool.ctaTaskPool()
import traceback
import multiprocessing
from datetime import datetime,timedelta
from collections import OrderedDict
from eventEngine import *
from ctaBase import *
from ctaFunction import *
from vtConstant import *
from ctaSetting import *
from ctaBacktesting import *
from vtObject import VtLogData
########################################################################
class CtaEngine(object):
"""CTA策略引擎"""
settingFileName = 'CTA_setting.json'
settingFileName = os.getcwd() + '\\json\\' + settingFileName
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine, settingFileName = 'CTA_setting.json'):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
settingFileName0 = settingFileName
self.settingFileName = os.getcwd() + '\\json\\' + settingFileName0
self.optimism = False
self.q = multiprocessing.Queue()
# 当前日期
self.today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
# key为策略名称,value为策略实例,注意策略名称不允许重复
self.strategyDict = {}
#----------------------------------------------------------------------
def writeCtaLog(self, content):
"""快速发出CTA模块日志事件"""
log = VtLogData()
log.logContent = content
event = Event(type_=EVENT_CTA_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def loadStrategy(self, setting):
"""载入策略"""
try:
name = setting['name']
className = setting['className']
except Exception, e:
self.writeCtaLog(u'载入策略出错:%s' %e)
return
# 获取策略类
strategyClass = STRATEGY_CLASS.get(className, None)
if not strategyClass:
self.writeCtaLog(u'找不到策略类:%s' %className)
return
# 防止策略重名
if name in self.strategyDict:
pass
else:
# 创建策略实例
strategy = strategyClass(self, setting)
self.strategyDict[name] = strategy
#----------------------------------------------------------------------
def updateStrategy(self, name):
"""更新策略配置"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
with open(self.settingFileName) as f:
l = json.load(f)
for setting in l:
if setting[u'name'] == name:
self.callStrategyFunc(strategy, strategy.onUpdate,setting)
#----------------------------------------------------------------------
def backtestStrategy(self, name, startTime = '20161001', endTime = '20161030', slippage = 0, mode = 'T'):
"""回测单个策略"""
setting_bt = {}
with open(self.settingFileName) as f:
l = json.load(f)
for setting in l:
if setting[u'name'] == name:
setting_bt = setting
setting_bt['StartTime'] = startTime
setting_bt['EndTime'] = endTime
q = True if mode[0:2] == 'BV' else False
xmode = 'bt-perf' if mode[0:2] in ['BP','TP'] else 'bt-f'
xmode = 'bt-c' if mode[0:2] in ['BC','TC'] else xmode
t = ctaTaskPool.taskPool.addTask('bt',args=(setting_bt, startTime, endTime, slippage, self.optimism, mode, q), mode = xmode, runmode = mode)
#----------------------------------------------------------------------
def backtestRollingStrategy(self, name, optimizationSetting, startTime = '20161001', endTime = '20161030', rollingDays=20, slippage = 0, mode = 'T'):
"""滚动优化回测单个策略"""
setting_bt = {}
with open(self.settingFileName) as f:
l = json.load(f)
for setting in l:
if setting[u'name'] == name:
setting_bt = setting
setting_bt['StartTime'] = startTime
setting_bt['EndTime'] = endTime
q = True if mode == 'BV' else False
ctaTaskPool.backtestingRollingE(setting_bt, optimizationSetting, startTime, endTime, rollingDays, slippage, self.optimism, mode, q)
self.putStrategyEvent(name)
#----------------------------------------------------------------------
def backtestSplitStrategy(self, name, startTime = '20161001', endTime = '20161030', splitDays=20, slippage = 0, mode = 'T'):
"""分段回测单个策略"""
setting_bt = {}
with open(self.settingFileName) as f:
l = json.load(f)
for setting in l:
if setting[u'name'] == name:
setting_bt = setting
q = True if mode == 'BV' else False
xmode = 'bt-perf' if mode[0:2] in ['BP','TP'] else 'bt-f'
xmode = 'bt-c' if mode[0:2] in ['BC','TC'] else xmode
# 生成分段回测序列
dataStartDate = datetime.strptime(startTime, '%Y%m%d') if len(startTime) == 8\
else datetime.strptime(startTime, '%Y%m%d %H:%M:%S')
dataEndDate = datetime.strptime(endTime, '%Y%m%d') if len(endTime) == 8\
else datetime.strptime(endTime, '%Y%m%d %H:%M:%S')
timeList = deque([])
dataStepDate = dataStartDate
while dataStepDate+timedelta(days=splitDays) <= dataEndDate:
dataStepDate += timedelta(days=splitDays)
timeList.append(dataStepDate)
timeList.append(dataEndDate)
dtSt = dataStartDate.strftime('%Y%m%d')
while timeList:
# 设置回测用的数据起始日期
dtEd = timeList.popleft().strftime('%Y%m%d')
setting_bt['StartTime'] = dtSt
setting_bt['EndTime'] = dtEd
t = ctaTaskPool.taskPool.addTask('bt',args=(copy.deepcopy(setting_bt), dtSt, dtEd, slippage, self.optimism, mode, q, False), mode = xmode, runmode = mode)
dtSt = dtEd
#----------------------------------------------------------------------
def optimizeStrategy(self, name, optimizationSetting, startTime = '20161001', endTime = '20161030', slippage = 0, mode='T'):
"""参数扫描"""
setting_bt = {}
with open(self.settingFileName) as f:
l = json.load(f)
for setting in l:
if setting[u'name'] == name:
setting_bt = setting
ctaTaskPool.optimizeE(setting_bt,optimizationSetting,startTime,endTime,slippage,self.optimism,mode)
self.putStrategyEvent(name)
#----------------------------------------------------------------------
def reportStrategy(self):
"""策略组合报告"""
strategyNames = []
strategyBases = []
for name,strategy in self.strategyDict.items():
strategyNames.append(name)
strategyBases.append(strategy.capital)
rtn_table,cap_table = get_daily_rtn(strategyNames,strategyBases)
weis = get_best_wei(rtn_table,30)
print weis
plotPortfolioCurve(cap_table,weis)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存策略配置"""
with open(self.settingFileName, 'w') as f:
l = []
for strategy in self.strategyDict.values():
setting = {}
for param in strategy.paramList:
value = str(strategy.__getattribute__(param))
if not param == 'name' and value.isdigit():
value = eval(value)
elif not param == 'name':
try:
value = float(value)
except Exception, e:
pass
setting[param] = value
l.append(setting)
jsonL = json.dumps(l, indent=4)
f.write(jsonL)
#----------------------------------------------------------------------
def loadSetting(self):
"""读取策略配置"""
with open(self.settingFileName) as f:
l = json.load(f)
for setting in l:
self.loadStrategy(setting)
#----------------------------------------------------------------------
def getStrategyVar(self, name):
"""获取策略当前的变量字典"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
varDict = OrderedDict()
for key in strategy.varList:
varDict[key] = strategy.__getattribute__(key)
return varDict
else:
self.writeCtaLog(u'策略实例不存在:' + name)
return None
#----------------------------------------------------------------------
def getStrategyParam(self, name):
"""获取策略的参数字典"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
paramDict = OrderedDict()
for key in strategy.paramList:
paramDict[key] = strategy.__getattribute__(key)
return paramDict
else:
self.writeCtaLog(u'策略实例不存在:' + name)
return None
#----------------------------------------------------------------------
def setStrategyParam(self, name, paramDict):
"""设置策略的参数字典"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
for key in strategy.paramList:
strategy.__setattr__(key, paramDict[key])
else:
self.writeCtaLog(u'策略实例不存在:' + name)
return None
#----------------------------------------------------------------------
def putStrategyEvent(self, name):
"""触发策略状态变化事件(通常用于通知GUI更新)"""
event = Event(EVENT_CTA_STRATEGY+name)
self.eventEngine.put(event)
#----------------------------------------------------------------------
def output(self, content):
"""输出内容"""
self.writeCtaLog(content)