Skip to content

Commit

Permalink
Update Version 3.5.8
Browse files Browse the repository at this point in the history
  • Loading branch information
shinny-pack authored and shinny-mayanqiong committed Apr 30, 2024
1 parent b263bae commit ded2cec
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 48 deletions.
2 changes: 1 addition & 1 deletion PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: tqsdk
Version: 3.5.7
Version: 3.5.8
Summary: TianQin SDK
Home-page: https://www.shinnytech.com/tqsdk
Author: TianQin
Expand Down
4 changes: 2 additions & 2 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
# built documents.
#
# The short X.Y version.
version = u'3.5.7'
version = u'3.5.8'
# The full version, including alpha/beta/rc tags.
release = u'3.5.7'
release = u'3.5.8'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
10 changes: 10 additions & 0 deletions doc/version.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

版本变更
=============================
3.5.8 (2024/04/29)

* 增加::py:class:`~tqsdk.tools.DataDownloader` 增加 write_mode 参数,作为写入模式参数,并且在模式为 'a' 时,不写入标题行
* 修复:用户在使用 :py:class:`~tqsdk.TargetPosScheduler` 时可能出现重复创建 :py:class:`~tqsdk.TargetPosTask` 实例的问题
* 优化::py:class:`~tqsdk.tools.DataDownloader` 在下载没有任何成交数据的合约时能够及时退出,避免等待超时或报错
* 优化:重构 :py:class:`~tqsdk.TargetPosTask` 退出时释放资源部分,优化异步代码中 :py:class:`~tqsdk.TargetPosTask` 的使用方法
* 优化:回测时,如果订阅没有任何成交数据的合约,构造的空数据给下游,避免程序一直等待
* 优化:对发送给合约服务器的数据进行检查,避免发送不合法的数据,提前报错通知用户


3.5.7 (2024/04/23)

* 修复::py:meth:`~tqsdk.TqApi.get_kline_data_series`、:py:meth:`~tqsdk.TqApi.get_tick_data_series` 接口在指定时间段没有数据时报错
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setuptools.setup(
name='tqsdk',
version="3.5.7",
version="3.5.8",
description='TianQin SDK',
author='TianQin',
author_email='[email protected]',
Expand Down
2 changes: 1 addition & 1 deletion tqsdk/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '3.5.7'
__version__ = '3.5.8'
7 changes: 6 additions & 1 deletion tqsdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2823,7 +2823,12 @@ async def show_symbols_info(symbols):
"""
if self._stock is False:
raise Exception("期货行情系统(_stock = False)不支持当前接口调用")
symbol_list = [symbol] if isinstance(symbol, str) else symbol
if isinstance(symbol, str):
symbol_list = [symbol]
else:
if len(symbol) == 0:
raise Exception("symbol 参数不能为空列表。")
symbol_list = symbol
if any([s == "" for s in symbol_list]):
raise Exception(f"symbol 参数 {symbol} 中不能有空字符串。")
backtest_timestamp = _datetime_to_timestamp_nano(self._get_current_datetime()) if isinstance(self._backtest,
Expand Down
35 changes: 25 additions & 10 deletions tqsdk/backtest/backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async def _generator_diffs(self, keep_current):
# klines 请求,需要记录已经发送 api 的数据
for symbol in diff.get("klines", {}):
for dur in diff["klines"][symbol]:
for kid in diff["klines"][symbol][dur]["data"]:
for kid in diff["klines"][symbol][dur].get('data', {}):
rs = self._sended_to_api.setdefault((symbol, int(dur)), [])
kid = int(kid)
self._sended_to_api[(symbol, int(dur))] = _rangeset_range_union(rs, (kid, kid + 1))
Expand Down Expand Up @@ -390,22 +390,24 @@ def _generator_quotes_diffs(self, quotes_helper) -> dict:
# 如果先订阅 A 合约(有夜盘),时间停留在夜盘开始时间, 再订阅 B 合约(没有夜盘),那么 B 合约的行情(前一天收盘时间)应该发下去,
# 否则 get_quote(B) 等到收到行情才返回,会直接把时间推进到第二天白盘。
continue
item = quotes_helper[key]["kline_or_tick"]
if item is None:
# item 如果是 None 的话,没有可以生成行情的信息的数据,那么不生成 quote_diff
continue
diffs = None
if self._quotes[symbol]['min_duration'] == 0 and dur == 0:
# tick 生成行情
tick = quotes_helper[key]["kline_or_tick"]
diffs = TqBacktest._get_quote_diffs_from_tick(symbol, tick)
diffs = TqBacktest._get_quote_diffs_from_tick(symbol, item)
if self._quotes[symbol]['min_duration'] != 0:
# kline 生成行情
when = quotes_helper[key]["when"]
timestamp = quotes_helper[key]["timestamp"] # quote 行情时间
kline = quotes_helper[key]["kline_or_tick"]
quote_info = self._data["quotes"][symbol]
froms = ["open"] if when == "OPEN" else ["close"]
if when == "CLOSE" and self._quotes[symbol]['min_duration'] == dur:
# kline 生成 quote 数据,只有该合约订阅的最小周期会生成 high low 对应的行情
froms = ["high", "low", "close"]
diffs = TqBacktest._get_quote_diffs_from_kline(symbol, quote_info['price_tick'], timestamp, kline, froms)
diffs = TqBacktest._get_quote_diffs_from_kline(symbol, quote_info['price_tick'], timestamp, item, froms)
if diffs:
self._quotes[symbol]["sended_init_quote"] = True
self._diffs.extend(diffs)
Expand Down Expand Up @@ -505,18 +507,31 @@ async def _gen_serial(self, ins, dur):
if not (chart_info.items() <= _get_obj(chart, ["state"]).items()):
# 当前请求还没收齐回应, 不应继续处理
continue
left_id = chart.get("left_id", -1)
right_id = chart.get("right_id", -1)
if (left_id == -1 and right_id == -1) or chart.get("more_data", True):
continue # 定位信息还没收到, 数据没有完全收到
if not chart.get("ready", False):
continue # chart 数据还没准备好
last_id = serials[0].get("last_id", -1)
if last_id == -1:
continue # 数据序列还没收到
# 所有合约的 tick 数据一定有,开盘一定会收到一笔 tick
# kline 是由有价格的 tick 生成的,所以 kline 可能没有数据的
assert dur > 0
diff = {
"klines": {
symbol_list[0]: {
str(dur): {
"last_id": -1
}
}
}
}
yield self._current_dt, diff, None, "OPEN"
return
if self._data.get("mdhis_more_data", True):
self._data["_listener"].add(update_chan)
continue
else:
self._data["_listener"].discard(update_chan)
left_id = chart.get("left_id", -1)
right_id = chart.get("right_id", -1)
if current_id is None:
current_id = max(left_id, 0)
# 发送下一段 chart 8964 根 kline
Expand Down
16 changes: 16 additions & 0 deletions tqsdk/ins_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@
import sgqlc.types
from sgqlc.operation import Fragment


########################################################################
# Monkey patching 检查请求的参数是否合法,与 api.query_graphql 函数的校验规则保持一致
########################################################################
_origin__to_graphql_input__ = sgqlc.types.Arg.__to_graphql_input__


def _tqsdk__to_graphql_input__(self, value, *args, **kwargs):
if value == "" or isinstance(value, list) and (any([s == "" for s in value]) or len(value) == 0):
raise Exception(f"variables 中变量值不支持空字符串、空列表或者列表中包括空字符串。")
return _origin__to_graphql_input__(self, value, *args, **kwargs)


sgqlc.types.Arg.__to_graphql_input__ = _tqsdk__to_graphql_input__


ins_schema = sgqlc.types.Schema()


Expand Down
8 changes: 4 additions & 4 deletions tqsdk/lib/target_pos_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ async def _run(self):
async for _ in self._api.register_update_notify(quote):
if _get_trade_timestamp(quote.datetime, float('nan')) > row['deadline']:
if target_pos_task:
target_pos_task._task.cancel()
await asyncio.gather(target_pos_task._task, return_exceptions=True)
target_pos_task.cancel()
await asyncio.gather(target_pos_task, return_exceptions=True)
break
elif target_pos_task: # 最后一项,如果有 target_pos_task 等待持仓调整完成,否则直接退出
position = self._account.get_position(self._symbol)
Expand All @@ -147,8 +147,8 @@ async def _run(self):
_index = _index + 1
finally:
if target_pos_task:
target_pos_task._task.cancel()
await asyncio.gather(target_pos_task._task, return_exceptions=True)
target_pos_task.cancel()
await asyncio.gather(target_pos_task, return_exceptions=True)
await self._trade_objs_chan.close()
await self._trade_recv_task

Expand Down
79 changes: 72 additions & 7 deletions tqsdk/lib/target_pos_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,33 @@ def get_price(direction):
self._time_update_task = self._api.create_task(self._update_time_from_md()) # 监听行情更新并记录当时本地时间的task
self._local_time_record = time.time() - 0.005 # 更新最新行情时间时的本地时间
self._local_time_record_update_chan = TqChan(self._api, last_only=True) # 监听 self._local_time_record 更新
self._wait_task_finished = self._api._loop.create_future()
self._task.add_done_callback(lambda _: self._api.create_task(self._exit_task()))

async def _exit_task(self):
"""
执行 task.cancel() 时, 删除掉该 symbol 对应的 TargetPosTask 实例,以释放占有的资源。
当用户代码为:
t = TargetPosTask(api, 'SHFE.rb2106', min_volume=2, max_volume=10)
t.cancel()
await asyncio.gather(t._task, return_exceptions=True)
以上代码执行后,t._task 中的 finally 部分没有被执行过,因为 t._task 本身从来没有被执行过。
所以这里用 add_done_callback 的方式,处理 __init__ 方法中创建的资源。
self._task、self._pos_chan、self._time_update_task 都是在 __init__ 方法里创建的资源,所以在这里释放资源,
self._task 中的 finally 部分只处理在 self._task 函数里创建的资源。
"""
# self._account 类型为 TqSim/TqKq/TqAccount,都包括 _account_key 变量
TargetPosTaskSingleton._instances.pop(self._account._account_key + "#" + self._symbol, None)
await self._pos_chan.close()
self._time_update_task.cancel()
await asyncio.gather(self._time_update_task, return_exceptions=True)
self._wait_task_finished.set_result(True)

def __await__(self):
return self._wait_task_finished.__await__()

def set_target_volume(self, volume: int) -> None:
"""
Expand Down Expand Up @@ -379,12 +406,7 @@ async def _target_pos_task(self):
all_tasks.append(order_task)
delta_volume -= order_volume if order_dir == "BUY" else -order_volume
finally:
# 执行 task.cancel() 时, 删除掉该 symbol 对应的 TargetPosTask 实例
# self._account 类型为 TqSim/TqKq/TqAccount,都包括 _account_key 变量
TargetPosTaskSingleton._instances.pop(self._account._account_key + "#" + self._symbol, None)
await self._pos_chan.close()
self._time_update_task.cancel()
await asyncio.gather(*([t._task for t in all_tasks] + [self._time_update_task]), return_exceptions=True)
await asyncio.gather(*[t._task for t in all_tasks], return_exceptions=True)

def cancel(self):
"""
Expand Down Expand Up @@ -423,6 +445,46 @@ def cancel(self):
api.close()
Example2::
# 在异步代码中使用
from datetime import datetime, time
from tqsdk import TqApi, TargetPosTask
api = TqApi(auth=TqAuth("快期账户", "账户密码"))
quote = api.get_quote("SHFE.rb2110")
async def demo(SYMBOL):
quote = await api.get_quote(SYMBOL)
target_pos_passive = TargetPosTask(api, SYMBOL, price="PASSIVE")
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if datetime.strptime(quote.datetime, "%Y-%m-%d %H:%M:%S.%f").time() < time(14, 50):
# ... 策略代码 ...
else:
target_pos_passive.cancel() # 取消 TargetPosTask 实例
await target_pos_passive # 等待 target_pos_passive 处理 cancel 结束
break
target_pos_active = TargetPosTask(api, "SHFE.rb2110", price="ACTIVE")
target_pos_active.set_target_volume(0) # 平所有仓位
pos = await api.get_position(SYMBOL)
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if pos.pos == 0:
target_pos_active.cancel() # 取消 TargetPosTask 实例
await target_pos_active # 等待 target_pos_active 处理 cancel 结束
break
symbol_list = ["SHFE.rb2107", "DCE.m2109"] # 设置合约代码
for symbol in symbol_list:
api.create_task(demo("SHFE.rb2107")) # 为每个合约创建异步任务
while True:
api.wait_update()
"""
self._task.cancel()

Expand All @@ -433,7 +495,10 @@ def is_finished(self) -> bool:
Returns:
bool: 当前 TargetPosTask 实例是否已经结束
"""
return self._task.done()
if self._wait_task_finished.done():
assert self._task.done() is True
return self._wait_task_finished.result()
return False


class InsertOrderUntilAllTradedTask(object):
Expand Down
Loading

0 comments on commit ded2cec

Please sign in to comment.