-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
119 lines (94 loc) · 4.57 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# -*- coding: utf-8 -*-
import configparser
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, InlineQueryHandler
from telegram import InlineQueryResultArticle, ChatAction, InputTextMessageContent
from uuid import uuid4
import subprocess
import time
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
def error(bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
config = configparser.ConfigParser()
config.read('bot.ini')
updater = Updater(token=config['KEYS']['bot_api'])
dispatcher = updater.dispatcher
def start(bot, update):
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
bot.sendMessage(chat_id=update.message.chat_id, text="Hi. I'm sudobot.")
if update.message.from_user.id != int(config['ADMIN']['id']):
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
time.sleep(1)
bot.sendMessage(chat_id=update.message.chat_id,
text="It seems like you aren't allowed to use me. :(")
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
time.sleep(1.5)
bot.sendMessage(chat_id=update.message.chat_id,
text="But sudobot is open source software, which means you can have your own! See my [GitHub repo](https://github.com/bvanrijn/sudobot) for details.", parse_mode="Markdown")
else:
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
time.sleep(1)
bot.sendMessage(chat_id=update.message.chat_id,
text="You can use me to run commands on your computer or server.")
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
time.sleep(1.5)
bot.sendMessage(chat_id=update.message.chat_id,
text="Note that interactive commands or commands that generate a lot of output won't work.")
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
time.sleep(1)
bot.sendMessage(chat_id=update.message.chat_id, text="Have fun!")
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
time.sleep(1.5)
bot.sendMessage(chat_id=update.message.chat_id,
text="Oh, before I forget: sudobot is open source software. See my [GitHub repo](https://github.com/bvanrijn/sudobot).", parse_mode="Markdown")
def execute(bot, update, direct=True):
try:
user_id = update.message.from_user.id
command = update.message.text
inline = False
except AttributeError:
# Using inline
user_id = update.inline_query.from_user.id
command = update.inline_query.query
inline = True
if user_id == int(config['ADMIN']['id']):
if not inline:
bot.sendChatAction(chat_id=update.message.chat_id,
action=ChatAction.TYPING)
output = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = output.stdout.read().decode('utf-8')
output = '`{0}`'.format(output)
if not inline:
bot.sendMessage(chat_id=update.message.chat_id,
text=output, parse_mode="Markdown")
return False
if inline:
return output
def inlinequery(bot, update):
query = update.inline_query.query
o = execute(query, update, direct=False)
results = list()
results.append(InlineQueryResultArticle(id=uuid4(),
title=query,
description=o,
input_message_content=InputTextMessageContent(
'*{0}*\n\n{1}'.format(query, o),
parse_mode="Markdown")))
bot.answerInlineQuery(update.inline_query.id, results=results, cache_time=10)
start_handler = CommandHandler('start', start)
execute_handler = MessageHandler([Filters.text], execute)
dispatcher.add_handler(start_handler)
dispatcher.add_handler(execute_handler)
dispatcher.add_handler(InlineQueryHandler(inlinequery))
dispatcher.add_error_handler(error)
updater.start_polling()
updater.idle()