-
Notifications
You must be signed in to change notification settings - Fork 1
/
spotifynow.py
283 lines (265 loc) · 14.3 KB
/
spotifynow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
#!env/scripts/python
import sql, processor
from uuid import uuid4
import os, sys, json, sqlite3, logging, requests
from urllib.parse import quote_plus as linkparse
from telegram.ext import Updater, CommandHandler, ConversationHandler, MessageHandler, InlineQueryHandler, Filters, run_async
from telegram import Bot, ParseMode, InlineKeyboardMarkup, InlineKeyboardButton, ChatAction, InlineQueryResultCachedPhoto
default_pic = requests.get('https://files.catbox.moe/upm7uz.jpg')
def link(update, context):
'add new user'
if update.effective_chat.type != update.effective_chat.PRIVATE:
button = InlineKeyboardMarkup([[InlineKeyboardButton(text="Link",url="t.me/spotifynowbot")]])
update.effective_message.reply_text("Contact me in private chat to link your Spotify account.", reply_markup=button)
return ConversationHandler.END
message = "I'm gonna need some information for linking your Spotify account. Tell me, what should I call you?"
update.effective_message.reply_text(message, parse_mode=ParseMode.MARKDOWN)
return USERNAME
def getusername(update, context):
'save username and id'
username = update.effective_message.text.strip()
sql.add_user(update.effective_user.id, username)
message = "Next up is Authorization. I need permissions to see what you're listening to."
button = InlineKeyboardMarkup([[InlineKeyboardButton(text="Authorize", url=authlink)]])
update.effective_message.reply_text(message, reply_markup=button)
return ConversationHandler.END
def relink(update, context):
'update stored token'
if update.effective_chat.type != update.effective_chat.PRIVATE:
button = InlineKeyboardMarkup([[InlineKeyboardButton(text="Link",url="t.me/spotifynowbot")]])
update.effective_message.reply_text("Contact me in private chat to link your Spotify account.", reply_markup=button)
return
if not sql.get_user(update.effective_user.id):
update.effective_message.reply_text("You need to /link before using this function.")
return
message = "Tap the button below to authorize."
button = InlineKeyboardMarkup([[InlineKeyboardButton(text="Authorize", url=authlink)]])
update.effective_message.reply_text(message, reply_markup=button)
return
def unlink(update, context):
'remove user from db'
sql.del_user(update.effective_user.id)
print(f'{update.message.from_user.username} just unlinked their account.')
message = "You've been unlinked from my database. You can disable the authorization from your [Account's Apps Overview](https://www.spotify.com/us/account/apps/) section."
update.message.reply_text(message, parse_mode=ParseMode.MARKDOWN)
def code(text):
'get authtoken from a temporary json given by the webserver'
tempjson = requests.get('https://jsonblob.com/api/'+jkey).json()
code = tempjson[text[7:]]
keycount = len(tempjson.keys())
if keycount > 10:
for x in range(0, 5):
tempjson.pop(list(tempjson.keys())[x])
requests.put('https://jsonblob.com/api/'+jkey, json=tempjson)
return(code)
def start(update, context):
'handle start command with deep linking'
text = update.effective_message.text
if len(text) <= 7:
update.message.reply_text("Hi! I'm SpotifyNow and I you flex what you're listening to on Spotify. Tap /now to get started.")
elif text.endswith('link'):
update.message.reply_text("Hi! I'm SpotifyNow and I you flex what you're listening to on Spotify. Tap /link to connect your account.")
elif text.endswith('relink'):
update.message.reply_text("Spotify isn't letting me see what you're listening to! Try to /relink your Spotify account.")
elif text.endswith('notsure'):
update.message.reply_text("I'm not sure what you're listening to.")
elif text.endswith('ads'):
update.message.reply_text("Ads. You're listening to those annoying ads!")
elif text.endswith('notlistening'):
update.message.reply_text("You're not listening to anything on Spotify at the moment.")
else:
try:
data = {'grant_type':'authorization_code','code':code(text),'redirect_uri':redirect_uri,'client_id':client_id,'client_secret':client_secret}
authtoken = requests.post('https://accounts.spotify.com/api/token', data=data).json()['refresh_token']
except:
update.message.reply_text(f'Something went wrong. Try to /relink your account.')
else:
sql.add_token(authtoken, update.effective_user.id)
print(f'{update.message.from_user.username} just linked their account.')
message = "Yay! Your Spotify account is now linked. Tap /now anytime to flex what you're listening to. You can also use the inline mode by typing @SpotifyNowBot in any chat."
update.message.reply_text(message)
update.effective_message.delete()
def getpic(r, uid, context, podcast=False):
'retrives and passes all arguments to the image processor'
username = list(sql.get_user(uid)[0])[1]
songname = r['item']['name']
albumname = r['item']['show']['name'] if podcast else r['item']['album']['name']
totaltime = r['item']['duration_ms']
crrnttime = r['progress_ms']
coverart = requests.get(r['item']['images'][1]['url'] if podcast else r['item']['album']['images'][1]['url'])
artists = r['item']['show']['publisher'] if podcast else ', '.join([x['name'] for x in r['item']['artists']])
try:
pfp = context.bot.getUserProfilePhotos(uid, limit=1)['photos'][0][0]['file_id']
user = requests.get(context.bot.getFile(pfp).file_path)
except:
user = default_pic
return(processor.process(username, songname, albumname, artists, crrnttime, totaltime, user, coverart))
@run_async
def nowplaying(update, context):
'collects user info, requests spotify api for song info and sends the processed image'
context.bot.sendChatAction(update.message.chat_id, ChatAction.TYPING)
try:
uid = update.message.from_user.id
authtoken = list(sql.get_user(uid)[0])[2]
except:
update.message.reply_text('You need to /link your Spotify account with me first.')
return
try:
data = {
'grant_type': 'refresh_token',
'refresh_token': authtoken,
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret
}
token = requests.post('https://accounts.spotify.com/api/token', data=data).json()
except:
update.message.reply_text('Something went wrong. Try to /relink your account.')
return
try:
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': 'Bearer '+token['access_token']
}
r = requests.get('https://api.spotify.com/v1/me/player/currently-playing', headers=headers)
r = r.json()
r['token'] = token
# with open('resp.json', 'w') as f:
# json.dump(r, f, indent=4)
if r.get('error'):
update.message.reply_text("Spotify API: "+r['error'].get('message'))
print(r['error'].get('message'))
return
if r['currently_playing_type'] == 'ad':
update.message.reply_text("Ads. You're listening to those annoying ads.")
elif r['currently_playing_type'] == 'track':
button = InlineKeyboardButton(text="Play on Spotify", url=r['item']['external_urls']['spotify'])
context.bot.send_photo(update.message.chat_id, getpic(r, uid, context), reply_markup=InlineKeyboardMarkup([[button]]))
elif r['currently_playing_type'] == 'episode':
r = requests.get('https://api.spotify.com/v1/me/player/currently-playing?additional_types=episode', headers=headers).json()
button = InlineKeyboardButton(text="Play on Spotify", url=r['item']['external_urls']['spotify'])
context.bot.send_photo(update.message.chat_id, getpic(r, uid, context, podcast=True), reply_markup=InlineKeyboardMarkup([[button]]))
else:
update.message.reply_text("I'm not sure what you're listening to.")
except Exception as e:
print(e)
update.message.reply_text("You're not listening to anything on Spotify at the moment.")
def sstats(update, context):
'returns the number of registered users, devs only'
if update.message.from_user.id in sudoList:
userlist = sql.list_users()
update.message.reply_text(f'{len(userlist)} Users')
else:
update.effective_message.reply_text("This is a developer restricted command.\nYou don't have permissions to access this.")
def cancel(update, context):
update.message.reply_text('Canceled.')
return ConversationHandler.END
def sendhelp(update, context):
update.message.reply_text(helptext)
def inlinenow(update, context):
'inline implementation of notplaying() function along with exception handeling for new users'
try:
uid = update.inline_query.from_user.id
authtoken = list(sql.get_user(uid)[0])[2]
except:
update.inline_query.answer(
results=[],
switch_pm_text='Connect your Spotify account.',
switch_pm_parameter='link',
cache_time=0
)
return
try:
data = {
'grant_type':'refresh_token',
'refresh_token':authtoken,
'redirect_uri':redirect_uri,
'client_id':client_id,
'client_secret':client_secret
}
token = requests.post('https://accounts.spotify.com/api/token', data=data).json()
except:
update.inline_query.answer(
results=[],
switch_pm_text="Something's wrong. Lets fix it.",
switch_pm_parameter='relink',
cache_time=0
)
return
try:
headers = {
'Accept':'application/json',
'Content-Type':'application/json',
'Authorization':'Bearer '+token['access_token']
}
r = requests.get('https://api.spotify.com/v1/me/player/currently-playing', headers=headers).json()
if r.get('error'):
update.inline_query.answer([], switch_pm_text="Spotify API: "+r['error'].get('message'), switch_pm_parameter='notsure', cache_time=0)
if r['currently_playing_type'] == 'ad':
update.inline_query.answer([], switch_pm_text="You're listening to annoying ads.", switch_pm_parameter='ads', cache_time=0)
elif r['currently_playing_type'] == 'track':
button = InlineKeyboardButton(text="Play on Spotify", url=r['item']['external_urls']['spotify'])
image = getpic(r, uid, context)
dump = context.bot.send_photo(dumpchannel, photo=image)
photo = dump['photo'][1]['file_id']
dump.delete()
update.inline_query.answer(
[
InlineQueryResultCachedPhoto(
id=uuid4(),
photo_file_id=photo,
reply_markup=InlineKeyboardMarkup([[button]])
)
], cache_time=0
)
elif r['currently_playing_type'] == 'episode':
r = requests.get('https://api.spotify.com/v1/me/player/currently-playing?additional_types=episode', headers=headers).json()
button = InlineKeyboardButton(text="Play on Spotify", url=r['item']['external_urls']['spotify'])
image = getpic(r, uid, context, podcast=True)
dump = context.bot.send_photo(dumpchannel, photo=image)
photo = dump['photo'][1]['file_id']
dump.delete()
update.inline_query.answer(
[
InlineQueryResultCachedPhoto(
id=uuid4(),
photo_file_id=photo,
reply_markup=InlineKeyboardMarkup([[button]])
)
], cache_time=0
)
else:
update.inline_query.answer([], switch_pm_text="Not sure what you're listening to.", switch_pm_parameter='notsure', cache_time=0)
except Exception as e:
print(e)
update.inline_query.answer([], switch_pm_text="You're not listening to anything.", switch_pm_parameter='notlistening', cache_time=0)
helptext = '''
Tap /now to share what you're listening to on Spotify. You can also use the inline mode by typing @SpotifyNowBot in any chat.\n
If you're new, you need to /link your account to get started. You can always /unlink it whenever you feel like.\n
If you're facing errors, try restarting Spotify. No good? Send /cancel followed by /relink and if the issue persists, report it to @notdedsec.\n'''
if __name__ == "__main__":
if not os.path.exists('spotifynow.db'):
sql.create_table()
with open('config.json','r') as conf:
config = json.load(conf)
dumpchannel, jkey, client_id, client_secret, redirect_uri, bot_token, sudoList = config.values()
authlink = f"https://accounts.spotify.com/authorize?client_id={client_id}&response_type=code&redirect_uri={linkparse(redirect_uri)}&scope=user-read-currently-playing"
updater = Updater(bot_token, use_context=True)
os.system("title " + Bot(bot_token).first_name)
logging.basicConfig(format='\n\n%(levelname)s\n%(asctime)s\n%(name)s\n%(message)s', level=logging.ERROR)
USERNAME, AUTHTOKEN = range(2)
link_handler = ConversationHandler(
entry_points=[CommandHandler('link', link)],
states={USERNAME: [MessageHandler(Filters.text, getusername)]},
fallbacks=[CommandHandler('cancel', cancel)])
updater.dispatcher.add_handler(link_handler)
updater.dispatcher.add_handler(InlineQueryHandler(inlinenow))
updater.dispatcher.add_handler(CommandHandler('now', nowplaying))
updater.dispatcher.add_handler(CommandHandler('help', sendhelp))
updater.dispatcher.add_handler(CommandHandler('start', start))
updater.dispatcher.add_handler(CommandHandler('unlink', unlink))
updater.dispatcher.add_handler(CommandHandler('relink', relink))
updater.dispatcher.add_handler(CommandHandler('sstats', sstats))
updater.start_polling()
updater.idle()