-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
tradingview_alerts.py
406 lines (327 loc) · 15.6 KB
/
tradingview_alerts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
__author__ = "Patrick Kantorski"
__version__ = "1.0.5"
__maintainer__ = "Patrick Kantorski"
__status__ = "Development Build"
import os
import sys
import time
import threading
import sqlite3
import base64
import importlib
import urllib.parse
import re
from queue import Queue
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from datetime import datetime, timedelta
from pprint import pprint
import json
# Change working directory to location of script
os.chdir(os.path.dirname(os.path.realpath(__file__)))
class TradingViewAlertsHandler:
def __init__(self):
self.STRATEGY_COLUMNS = []
self.CREDENTIALS_FILE = 'credentials.json'
self.TOKEN_FILE = 'token.json'
self.SQL_COLUMNS = ['message_id', 'msg_timestamp', 'alert']
self.SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
self.EMAIL_SENDER = "TradingView <[email protected]>"
self.N_DAYS = 20
self.start = False
self.kill_daemon = False
self.initial_run = True
self.alert_found = False
self.message_queue = Queue()
self.telegram_path = os.path.dirname(os.path.abspath(__file__))
sys.path.append(self.telegram_path)
tradingview_telegram_lib = importlib.import_module('tradingview_telegram')
self.TradingViewAlertsTelegram = tradingview_telegram_lib.TradingViewAlertsTelegram
self.tradingview_telegram = self.TradingViewAlertsTelegram()
self.load_config() # Load alert configurations file.
def load_config(self):
# Load configuration from config.json
with open('config.json', 'r') as config_file:
config = json.load(config_file)
# Access configuration values
self.STRATEGY_COLUMNS = config["STRATEGY_COLUMNS"]
self.CREDENTIALS_FILE = config["CREDENTIALS_FILE"]
self.TOKEN_FILE = config["TOKEN_FILE"]
def notify_command(self, message):
self.tradingview_telegram.notify(message=message)
def message_daemon(self):
while not self.start:
time.sleep(1)
while not self.kill_daemon:
try:
if not self.message_queue.empty():
print("Sending message...")
message = self.message_queue.get()
self.notify_command(message)
time.sleep(2) # Adjust the delay as needed
except Exception as e:
print(f"An error occurred: {e}")
self.kill_daemon = True
def extract_info_from_message(self, message_text):
msg_lines = message_text.split('\n')
msg_lines = [msg_line.strip('\r') for msg_line in msg_lines if msg_line != '\r']
info = {}
for column in self.STRATEGY_COLUMNS:
for line in msg_lines:
if column in line:
value = line.split(column)[-1].lstrip(':').strip()
info[column.lower()] = value
break
# Add default values for missing keywords
for column in self.STRATEGY_COLUMNS:
if column.lower() not in info.keys():
info[column.lower()] = None
return info
def authenticate_and_fetch_alerts(self):
if not os.path.exists(self.TOKEN_FILE):
flow = InstalledAppFlow.from_client_secrets_file(self.CREDENTIALS_FILE, self.SCOPES)
creds = flow.run_local_server(port=0)
with open(self.TOKEN_FILE, 'w') as token:
token.write(creds.to_json())
# Load the token from file
creds = Credentials.from_authorized_user_file(self.TOKEN_FILE, self.SCOPES)
# Create a Gmail API service
service = build('gmail', 'v1', credentials=creds)
# Calculate the timestamp for n days ago
n_days_ago = datetime.now() - timedelta(days=self.N_DAYS)
# Connect to the SQLite database
conn = sqlite3.connect('tradingview_alerts.db')
cursor = conn.cursor()
strategy_table_lines = [f"{column.lower()} TEXT, " for column in self.STRATEGY_COLUMNS]
strategy_table_line = ''
for i in range(len(strategy_table_lines)):
line = strategy_table_lines[i]
if ('timestamp' in line) or ('time_close' in line):
line = line.replace("TEXT", "DATETIME")
if i == len(strategy_table_lines)-1:
line = line.rstrip(', ')
strategy_table_line += line
# Update the schema based upon the strategy columns
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS alerts (
{self.SQL_COLUMNS[0]} TEXT PRIMARY KEY,
{self.SQL_COLUMNS[1]} DATETIME,
{self.SQL_COLUMNS[2]} TEXT,
{strategy_table_line}
)
''')
conn.commit()
# Retrieve the last processed message_id from the database
cursor.execute('SELECT MAX(timestamp) FROM alerts')
last_timestamp_str = cursor.fetchone()[0]
#print("last_timestamp_str: "+last_timestamp_str)
# If no records exist in the database yet, set a default value far in the past
if last_timestamp_str is None:
last_timestamp = n_days_ago
else:
last_timestamp = datetime.strptime(last_timestamp_str, '%Y-%m-%d %H:%M:%S')
# Initialize backoff parameters
retries = 0
max_retries = 5
base_delay = 3 # Initial delay in seconds
results = None
messages = []
while retries < max_retries:
try:
# Your existing code for fetching messages
query = 'after:' + last_timestamp.strftime('%s') + ' from:' + self.EMAIL_SENDER
results = service.users().messages().list(userId='me', q=query).execute()
messages = results.get('messages', [])
break # Break out of the loop if successful
except Exception as e:
print(f"Error: {str(e)}")
# Check if the error message contains a retry time
retry_match = re.search(r"Retry after (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)", str(e))
if retry_match:
retry_time = datetime.strptime(retry_match.group(1), '%Y-%m-%dT%H:%M:%S.%fZ')
wait_duration = (retry_time - datetime.utcnow()).total_seconds()
wait_duration += 10 # Adding 10 seconds buffer
if wait_duration > 0:
print(f"Retrying after {wait_duration} seconds...")
time.sleep(wait_duration)
else:
print("Invalid retry time. Retrying after the base delay...")
time.sleep(base_delay)
else:
retries += 1
if retries < max_retries:
delay = base_delay * (2 ** retries)
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
print("Max retries reached. Exiting.")
#self.kill_daemon = True
break
self.alert_found = False
if len(messages) > 0:
for message in messages:
msg_id = message['id']
# Check if the message id is not already stored in the database
cursor.execute('SELECT * FROM alerts WHERE message_id = ?', (msg_id,))
if cursor.fetchone():
continue # Skip if this email is already in the database
msg = service.users().messages().get(userId='me', id=msg_id).execute()
headers = msg['payload']['headers']
subject = next((header['value'] for header in headers if header['name'] == 'Subject'), 'No Subject')
date = next((header['value'] for header in headers if header['name'] == 'Date'), 'No Date')
# Check if the subject starts with "Alert" and if the sender is "<[email protected]>"
if subject.startswith("Alert"):
msg_data = msg['payload']['body']['data']
msg_data = base64.urlsafe_b64decode(msg_data.encode('ASCII')).decode('utf-8')
self.alert_found = True
print(f'Subject: {subject}, Date: {date}\n')
start_index = None
end_index = None
start_identifier = f'">{self.STRATEGY_COLUMNS[0]}: '
end_identifier = "</p>"
start_index = msg_data.find(start_identifier)
msg_data = msg_data[start_index:]
end_index = msg_data.find(end_identifier)
msg_data = msg_data[:end_index].strip(end_identifier).lstrip('">')
print(msg_data+'\n')
# Extract information from the message
extracted_info = self.extract_info_from_message(msg_data)
alert = subject.replace('Alert: ', '')
# Insert the extracted data into the 'alerts' table
columns_line = ', '.join(self.SQL_COLUMNS + [column.lower() for column in self.STRATEGY_COLUMNS])
insertion_line = ', '.join(['?']*(len(self.SQL_COLUMNS) + len(self.STRATEGY_COLUMNS)))
# Define the values as a list
values = [msg_id, date, alert] + [extracted_info[column.lower()] for column in self.STRATEGY_COLUMNS]
cursor.execute(
f'INSERT INTO alerts ({columns_line}) VALUES ({insertion_line})',
tuple(values)
)
conn.commit()
if not self.alert_found:
self.print_log('No new alerts found.')
conn.close()
def check_database_update(self, last_checked_id):
# Connect to the SQLite database
conn = sqlite3.connect('tradingview_alerts.db')
cursor = conn.cursor()
# Query for new records since the last checked ID
try:
cursor.execute('SELECT * FROM alerts WHERE message_id > ?', (last_checked_id,))
new_records = cursor.fetchall()
except:
return last_checked_id
if new_records:
# Process the new records
for record in new_records:
message_id, msg_timestamp, alert, *values = record
#message = urllib.parse.unquote(message)
# Print an alert message (you can customize the format) (REPLACE WITH TELEGRAM)
line = f"New Alert: {alert}\n"
for i, column_name in enumerate(self.STRATEGY_COLUMNS):
if i != len(self.STRATEGY_COLUMNS) -1:
line += f"{column_name.capitalize()}: {values[i]}\n"
else:
line += f"{column_name.capitalize()}: {values[i]}"
if not self.initial_run:
print(line)
# self.notify_command(line)
self.message_queue.put(line)
#print(line)
## self.notify_command(line)
#self.message_queue.put(line)
self.initial_run = False
# Update the last_checked_id with the latest message_id
last_checked_id = new_records[-1][0]
# Close the database connection
conn.close()
return last_checked_id
def store_alerts_daemon(self):
while not self.start:
time.sleep(1)
OFFSET = 5 # seconds
no_alerts_counter = 0
while not self.kill_daemon:
# to ensure no alerts found and recheck messages are not spamming
if not self.alert_found and no_alerts_counter > 0:
delete_last_line(2)
try:
self.authenticate_and_fetch_alerts()
if not self.alert_found:
no_alerts_counter += 1
else:
no_alerts_counter = 0
# Calculate the number of seconds remaining in the current minute
current_time = datetime.now()
seconds_until_next_minute = 60 - current_time.second
seconds_until_next_minute += OFFSET
wait_time = seconds_until_next_minute % 11
if wait_time < 3:
time.sleep(wait_time+1)
seconds_until_next_minute = 60 - current_time.second
seconds_until_next_minute += OFFSET
wait_time = seconds_until_next_minute % 11
self.print_log(f"Checking again in {wait_time}s.")
# Sleep for the remaining seconds until the next minute
time.sleep(wait_time)
except Exception as e:
print(f"An error occurred: {e}")
self.kill_daemon = True
def process_new_alerts_daemon(self):
while not self.start:
time.sleep(1)
TIMEOUT = 1
# Initialize the last_checked_id (You can store it persistently)
last_checked_id = 0
loop_count = 0
while not self.kill_daemon:
try:
last_checked_id = self.check_database_update(last_checked_id)
time.sleep(TIMEOUT) # Sleep for 1s
except Exception as e:
print(e)
self.kill_daemon = True
def print_log(self, text):
current_time = datetime.now()
timelog = current_time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timelog}] {text}")
def run(self):
os.system('clear')
self.print_log("Launching TradingView Alerts data storing daemon...")
background_thread(self.store_alerts_daemon, [])
time.sleep(0.5)
self.print_log("Launching TradingView Alerts alert processing daemon...")
background_thread(self.process_new_alerts_daemon, [])
time.sleep(0.5)
self.print_log("Launching TradingView Alerts telegram messages daemon...")
background_thread(self.message_daemon, [])
time.sleep(0.5)
self.print_log("TradingView Alerts handler is now live!")
self.start = True
# Loop until killed
while not self.kill_daemon:
time.sleep(5)
# kill all threads
def background_thread(target, args_list):
args = ()
for arg in args_list:
args += (arg,)
pr = threading.Thread(target=target, args=args)
pr.daemon = True
pr.start()
# For deleting lines in stdout
def delete_last_line(num_lines=1):
for i in range(num_lines):
CURSOR_UP_ONE = '\033[F'
ERASE_LINE = '\033[K'
print(CURSOR_UP_ONE + ERASE_LINE, end='')
if __name__ == "__main__":
while True:
try:
handler = TradingViewAlertsHandler()
handler.run()
# if background thread crashes
except Exception as e:
print(f"An error occurred: {e}")
print("Restarting the script...")
time.sleep(10) # Wait for 10 seconds before restarting