forked from UFund-Me/Qbot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
auto_monitor.py
153 lines (125 loc) · 4.35 KB
/
auto_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
Author: Charmve [email protected]
Date: 2023-03-08 10:58:49
LastEditors: Charmve [email protected]
LastEditTime: 2023-03-10 00:12:32
FilePath: /Qbot/auto_monitor.py
Version: 1.0.1
Blogs: charmve.blog.csdn.net
GitHub: https://github.com/Charmve
Description:
Copyright (c) 2023 by Charmve, All Rights Reserved.
Licensed under the MIT License.
"""
# -*-coding=utf-8-*-
import os
import subprocess
import sys
import time
import urllib.request # noqa F401
import pandas as pd
import pync
import tushare as ts
from utils.larkbot import LarkBot
"""
description:
param {*} title
param {*} text
return {*}
use:
"""
def show_notification(title, text):
os.system(
"""
osascript -e 'display notification "{}" with title "{}"'
""".format(
text, title
)
)
def show_notification_2(title, text):
cmd = 'display notification "' + text + '" with title "' + title + '"'
subprocess.call(["osascript", "-e", cmd])
"""
使用mac系统定时任务crontab设置告警通知的执行时间。
crontab设置过程
1. 输入 crontab -e进入设置文本。
2. 填写 */3 9-12,13-15 * * 1-5 /usr/local/anaconda3/bin/python /Users/charmve/Qbot/auto_monitor.py ,
即周一到周五,上午9点到12点,下午1点到3点,每三分钟执行阀值告警。
"""
stocks_pool = [
{"code": "sz000063", "name": "中兴通讯", "min_threshold": "26", "max_threshold": "38"},
{"code": "sh000016", "name": "上证50"},
{"coce": "601318", "name": "中国平安"},
]
def get_data(ts_code):
pro = ts.pro_api("your token")
try:
df = pro.fund_daily(ts_code=ts_code)
except: # noqa E722
time.sleep(0.5)
print("获取数据失败")
else:
print("获取数据成功")
# 对数据进行处理符合backtrader格式
columns = ["trade_date", "open", "high", "low", "close", "vol"]
df = df[columns]
# 转换日期格式
df["trade_date"] = df["trade_date"].apply(lambda x: pd.to_datetime(str(x)))
bt_col_dict = {"vol": "volume", "trade_date": "datetime"}
df = df.rename(columns=bt_col_dict)
df = df.set_index("datetime")
# openinterest 默认为0
df["openinterest"] = 0
# 由于获取的数据的第一行是最新数据,需要重新排列,否则最新日期的均线数据为空
df = df.sort_index()
return df
def check_strategy():
return True
def check(code, low, high):
# for ind, stock in enumerate(stocks_pool):
# response = str(urllib.request.urlopen(f"http://hq.sinajs.cn/list={stock['code']}").read())
# stockData = response.split("\"")[1].split(",")
df = ts.get_realtime_quotes(code)
# df = get_data('510300.SH') # 获取沪深300ETF数据
# print(df)
e = df[["code", "name", "price", "date", "time"]]
p = df["price"]
print(e)
if float(p[0]) > low and float(p[0]) < high:
return True
else:
return False
top_path = os.path.dirname(os.path.abspath(sys.argv[0]))
sounds_file = os.path.join(top_path, "./qbot/sounds/bell.wav")
while True:
WEBHOOK_SECRET = "wNMVU3ewSm2F0G2TwTX4Fd"
bot = LarkBot(secret=WEBHOOK_SECRET)
if check("sh", 3300, 10000) or check("601318", 0, 49):
bot.send(content="[Signal💡] 中国平安 低于 ¥49")
priceNow = 48
pync.notify(
f'{"中国平安"}当前价格为{priceNow}',
title=f'Qbot - {"中国平安"}股票已低于设定值{49}',
open="https://ufund-me.github.io/",
appIcon="./gui/imgs/logo.ico",
)
# pync.notify(
# "Reminder - Drink Water, Sir",
# title="Qbot",
# open="https://ufund-me.github.io/",
# # appIcon='https://raw.githubusercontent.com/UFund-Me/Qbot/main/gui/imgs/UFund.png',
# # appIcon='https://ufund-me.github.io/img/UFund.png',
# # appIcon='https://ufund-me.github.io/img/logo.ico',
# appIcon="./gui/imgs/logo.ico",
# )
# show_notification("Title", "notification")
# pync.notify(
# f'{stocks_pool["name"]}当前价格{priceNow}',
# title=f'{stocks_pool["name"]}股票已低于设定值{stocks_pool["min_threshold"]}',
# )
# if linux
# os.system('play ./qbot/sounds/alert-bells.wav')
# if MacOs
os.system(f"afplay {sounds_file}")
# exit()
time.sleep(2)