-
Notifications
You must be signed in to change notification settings - Fork 0
/
music.py
executable file
·297 lines (235 loc) · 11.6 KB
/
music.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import asyncio
import youtube_dl
import pafy
import discord
import nacl
from discord.ext import commands
from dislash import *
import os, re
path = "/proc/self/cgroup"
def is_docker():
if not os.path.isfile(path): return False
with open(path) as f:
for line in f:
if re.match("\d+:[\w=]+:/docker(-[ce]e)?/\w+", line):
return True
return False
if is_docker():
ffmpegLoc = '/usr/bin/ffmpeg'
else:
ffmpegLoc = 'ffmpeg'
class Player(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.loopingSong = False
self.loopingQueue = False
self.currentlyPlayingNumber = 0
self.song_queue = {}
global playingSong
playingSong = False
self.setup()
def setup(self):
for guild in self.bot.guilds:
self.song_queue[guild.id] = []
async def check_queue(self, ctx):
global playingSong
if len(self.song_queue[ctx.guild.id]) >= (self.currentlyPlayingNumber + 1):
ctx.guild.voice_client.stop()
if not self.loopingSong:
self.currentlyPlayingNumber = self.currentlyPlayingNumber + 1
await self.play_song(ctx, self.song_queue[ctx.guild.id][self.currentlyPlayingNumber - 1])
playingSong = True
elif self.loopingQueue:
# if ctx.guild.voice_client exists, stop it
ctx.guild.voice_client.stop()
self.currentlyPlayingNumber = 0
await self.play_song(ctx, self.song_queue[ctx.guild.id][self.currentlyPlayingNumber - 1])
playingSong = True
else:
playingSong = False
async def search_song(self, amount, song, get_url=False):
info = await self.bot.loop.run_in_executor(None, lambda: youtube_dl.YoutubeDL(
{"format": "bestaudio", "quiet": True}).extract_info(f"ytsearch{amount}:{song}", download=False,
ie_key="YoutubeSearch"))
if len(info["entries"]) == 0: return None
return [entry["webpage_url"] for entry in info["entries"]] if get_url else info
async def play_song(self, ctx, song):
global playingSong
url = pafy.new(song).getbestaudio().url
ctx.guild.voice_client.play(discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(executable=ffmpegLoc, source=url, before_options= "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5")),
after=lambda error: self.bot.loop.create_task(self.check_queue(ctx)))
playingSong = True
ctx.guild.voice_client.source.volume = 0.5
@slash_commands.command(
description="Makes the bot join your voice channel"
)
async def join(self, ctx):
if ctx.author.voice is None:
return await ctx.reply(
"You are not connected to a voice channel, please connect to the channel you want the bot to join.")
if ctx.guild.voice_client is not None:
await ctx.guild.voice_client.disconnect()
await ctx.reply('Connected.')
await ctx.author.voice.channel.connect()
@slash_commands.command(
description="Make the bot leave your voice channel.")
async def leave(self, ctx):
if ctx.guild.voice_client is not None:
if len(self.song_queue[ctx.guild.id]) > 0:
self.song_queue[ctx.guild.id] = []
await ctx.guild.voice_client.disconnect()
await ctx.reply('Disconnected succesfully.')
else:
await ctx.reply("I am not connected to a voice channel.")
@slash_commands.command(
description="Play a song",
options=[
Option("song", "Enter a name or url", Type.STRING, required=True)
# By default, Option is optional
# Pass required=True to make it a required arg
]
)
async def play(self, ctx, *, song=None):
await ctx.reply('Song loading...')
global playingSong
if song is None:
return await ctx.reply("You must include a song to play.")
if ctx.guild.voice_client is None:
await self.join(self, ctx)
# handle song where song isn't url
if not ("youtube.com/watch?" in song or "https://youtu.be/" in song):
result = await self.search_song(1, song, get_url=True)
if result is None:
return await ctx.send("The given song could not be found. Try using /search.")
song = result[0]
queue_len = len(self.song_queue[ctx.guild.id])
self.song_queue[ctx.guild.id].append(song)
if not playingSong:
await self.check_queue(ctx)
return await ctx.send(
f"{song} has been added to the queue at position: {queue_len + 1}.")
@slash_commands.command(
description="Search for a song",
options=[
Option("song", "Enter what to search for", Type.STRING, required=True)
# By default, Option is optional
# Pass required=True to make it a required arg
]
)
async def search(self, ctx, *, song=None):
if song is None: return await ctx.reply("You forgot to include a song to search for.")
await ctx.reply("Searching for song, this may take a few seconds.")
info = await self.search_song(5, song)
embed = discord.Embed(title=f"Results for '{song}':",
description="*You can use these URL's to play an exact song if the one you want isn't the first result.*\n",
colour=discord.Colour.red())
amount = 0
for entry in info["entries"]:
embed.description += f"[{entry['title']}]({entry['webpage_url']})\n"
amount += 1
embed.set_footer(text=f"Displaying the first {amount} results.")
await ctx.send(embed=embed)
@slash_commands.command(
description="View the queue",
)
async def queue(self, ctx): # display the current guilds queue
if len(self.song_queue[ctx.guild.id]) == 0:
return await ctx.send("There are currently no songs in the queue.")
if self.loopingQueue:
loopingtext = ' Currently looping the queue.'
elif self.loopingSong:
loopingtext = ' Currently looping the current song.'
else:
loopingtext = ''
embed = discord.Embed(title="Song Queue", description="", colour=discord.Colour.dark_gold())
i = 1
for url in self.song_queue[ctx.guild.id]:
embed.description += f"{i}) {url}\n"
i += 1
embed.description += f' \n Currently playing #{self.currentlyPlayingNumber}: {self.song_queue[ctx.guild.id][self.currentlyPlayingNumber - 1]}'
embed.set_footer(text=f"{len(self.song_queue[ctx.guild.id])} songs in the queue." + loopingtext)
await ctx.reply(embed=embed)
@slash_commands.command(description="Skip the currently playing song")
async def skip(self, ctx):
if ctx.guild.voice_client is None:
return await ctx.reply("I am not playing any song.")
if ctx.author.voice is None:
return await ctx.reply("You are not connected to any voice channel.")
if ctx.author.voice.channel.id != ctx.guild.voice_client.channel.id:
return await ctx.reply("I am not currently playing any songs for you.")
if len(ctx.guild.voice_client.channel.members) <= 2:
ctx.guild.voice_client.stop()
await ctx.reply("Song skipped.")
return await self.check_queue(ctx)
poll = discord.Embed(title=f"Vote to Skip Song by - {ctx.author.name}#{ctx.author.discriminator}",
description="**80% of the voice channel must vote to skip for it to pass.**",
colour=discord.Colour.blue())
poll.add_field(name="Skip", value=":white_check_mark:")
poll.add_field(name="Stay", value=":no_entry_sign:")
poll.set_footer(text="Voting ends in 15 seconds.")
poll_msg = await ctx.reply(
embed=poll) # only returns temporary message, we need to get the cached message to get the reactions
poll_id = poll_msg.id
await poll_msg.add_reaction(u"\u2705") # yes
await poll_msg.add_reaction(u"\U0001F6AB") # no
await asyncio.sleep(15) # 15 seconds to vote
poll_msg = await ctx.channel.fetch_message(poll_id)
votes = {u"\u2705": 0, u"\U0001F6AB": 0}
reacted = []
for reaction in poll_msg.reactions:
if reaction.emoji in [u"\u2705", u"\U0001F6AB"]:
async for user in reaction.users():
if user.voice.channel.id == ctx.guild.voice_client.channel.id and user.id not in reacted and not user.bot:
votes[reaction.emoji] += 1
reacted.append(user.id)
skip = False
if votes[u"\u2705"] > 0:
if votes[u"\U0001F6AB"] == 0 or votes[u"\u2705"] / (
votes[u"\u2705"] + votes[u"\U0001F6AB"]) > 0.79: # 80% or higher
skip = True
embed = discord.Embed(title="Skip Successful",
description="***Voting to skip the current song was succesful, skipping now.***",
colour=discord.Colour.green())
if not skip:
embed = discord.Embed(title="Skip Failed",
description="*Voting to skip the current song has failed.*\n\n**Voting failed, the vote requires at least 80% of the members to skip.**",
colour=discord.Colour.red())
embed.set_footer(text="Voting has ended.")
await poll_msg.clear_reactions()
await poll_msg.edit(embed=embed)
if skip:
ctx.guild.voice_client.stop()
await self.check_queue(ctx)
@slash_commands.command(description="Force skip (only available to staff)")
@slash_commands.has_permissions(manage_messages=True)
async def fskip(self, ctx):
if ctx.guild.voice_client is None:
return await ctx.reply("No songs are playing.")
if ctx.author.voice is None:
return await ctx.reply("You are not connected to any voice channel.")
if ctx.author.voice.channel.id != ctx.guild.voice_client.channel.id:
return await ctx.send("No songs are playing in your voice channel.")
ctx.guild.voice_client.stop()
await self.check_queue(ctx)
await ctx.reply("Song skipped!")
@slash_commands.command(description='Loop')
async def loop(self, inter):
pass
@loop.sub_command(name = 'queue', description='Loop the queue')
async def loop_queue(self, inter):
if self.loopingQueue == True:
self.loopingQueue = False
await inter.reply('Stopped looping the queue.')
else:
self.loopingQueue = True
self.loopingSong = False
await inter.reply('Started looping the queue.')
@loop.sub_command(name = 'song', description='Loop the current song')
async def loop_song(self, inter):
if self.loopingSong == True:
self.loopingSong = False
await inter.reply('Stopped looping this song.')
else:
self.loopingSong = True
self.loopingQueue = False
await inter.reply('Started looping this song.')