-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomp7940chatbotv2.py
181 lines (155 loc) · 7.73 KB
/
comp7940chatbotv2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from pymongo import MongoClient
from telegram import Update
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext, ConversationHandler
from telegram.ext import CallbackQueryHandler
import json
import re
# The messageHandler is used for all message updates
import configparser
import logging
import tabulate
global cluster
cluster = MongoClient("mongodb+srv://user_1:[email protected]/?retryWrites=true&w=majority")
global db
db = cluster["hkparking"]
global collection
collection = db["data"]
LOCATION, SELECT_CAR_PARK, SELECT_INFO = range(3)
def start(update, context):
"""Handler function for the /start command"""
chat_id = update.effective_chat.id
context.bot.send_message(chat_id=chat_id, text='講輸入想泊車的地區')
return LOCATION
# Save as a parameter while user made the input
def input_location(update, context):
location = update.message.text
print(location)
global collection
# result = list(collection.find({'carpark_address': re.compile(fr"{location}")}))
result = list(collection.find({'carpark_address': re.compile(fr"{location}")})) #Select * from db where carpark_address = input
# print(carpark_names)
if len(result) == 0:
# no car parks found
update.message.reply_text('沒有找到相應的泊車場所,請重新輸入')
return LOCATION
# number_of_result = collection.count_documents({'carpark_address': re.compile(fr"{location}")})
# print(number_of_result)
buttons=[]
for i in range(len(result)):
button = InlineKeyboardButton(text=result[i]['carpark_name'], callback_data=result[i]['carpark_name'])
buttons.append([button])
# create an InlineKeyboardMarkup object with the list of buttons
keyboard = InlineKeyboardMarkup(buttons)
# send the list of carpark names as a message with the keyboard
update.message.reply_text('請選擇想要停泊的停車場\U0001F17F\U0001F697\U0001F698:', reply_markup=keyboard)
return SELECT_CAR_PARK
# define a function to handle the button presses
def carpark_name_callback(update, context):
query = update.callback_query
carpark_name = query.data
query.answer()
# save the search result as context.user_data for use in later stages of the conversation
context.user_data['carpark_name'] = carpark_name
options = [[InlineKeyboardButton(text='停車場地址', callback_data='停車場地址')],
[InlineKeyboardButton(text='Google 地圖連結\U0001F5FA', callback_data='Google 地圖連結')],
[InlineKeyboardButton(text='時租收費\U0001F4B5', callback_data='時租收費')]]
# create an InlineKeyboardMarkup object with the list of options
keyboard1 = InlineKeyboardMarkup(options)
# send the list of carpark names as a message with the keyboard
query.message.reply_text('請選擇資訊:', reply_markup=keyboard1)
return SELECT_INFO
def content_callback(update, context):
query = update.callback_query
query.answer()
option = query.data
carpark_name = context.user_data['carpark_name']
global collection
document = collection.find_one({'carpark_name': carpark_name})
print(document)
# Display selected carpark detail
if query.data == '停車場地址':
carpark_address = document['carpark_address']
print(carpark_address)
query.message.reply_text(text=f"您選擇的停車場是: {carpark_name} \n停車場地址: {carpark_address}")
elif query.data == 'Google 地圖連結':
google_map_url = document['google_map_url']
button = InlineKeyboardButton(text="Google 地圖連結", url=google_map_url)
keyboard = InlineKeyboardMarkup([[button]])
if google_map_url != "N/A":
query.message.reply_text(text=f"您選擇的停車場是: {carpark_name} \n點擊下面的按鈕前往 Google 地圖 \U0001F5FA", reply_markup=keyboard)
else:
query.message.reply_text(text=f"您選擇的停車場是: {carpark_name} \n抱歉未能提供Google 地圖連結 \U0001F615")
elif query.data == '時租收費':
fee = document['時租']
print(fee)
keys = []
for row in fee:
# for t in row["時租"]:
for key in row.keys():
if key not in keys:
keys.append(key)
table_content_list =[]
for t in fee:
table_content_list.append(list(t.values())[0])
t_r = [0]
for i in range(1,100):
if i*(len(fee)/len(keys))<len(fee):
t_r.append(int(i*(len(fee)/len(keys))))
elif i*(len(fee)/len(keys))==len(fee):
t_r.append(int(i*(len(fee)/len(keys))))
else:
break
rows=[]
if len(t_r)==7:
for i in range(1):
rows=(list(zip(table_content_list[t_r[i]:t_r[i+1]-1],table_content_list[t_r[i+1]:t_r[i+2]-1],table_content_list[t_r[i+2]:t_r[i+3]-1],table_content_list[t_r[i+3]:t_r[i+4]-1],table_content_list[t_r[i+4]:t_r[i+5]-1],table_content_list[t_r[i+5]:t_r[i+6]-1])))
if len(t_r)==6:
for i in range(1):
rows=(list(zip(table_content_list[t_r[i]:t_r[i+1]-1],table_content_list[t_r[i+1]:t_r[i+2]-1],table_content_list[t_r[i+2]:t_r[i+3]-1],table_content_list[t_r[i+3]:t_r[i+4]-1],table_content_list[t_r[i+4]:t_r[i+5]-1])))
elif len(t_r)==5:
for i in range(1):
rows=(list(zip(table_content_list[t_r[i]:t_r[i+1]-1],table_content_list[t_r[i+1]:t_r[i+2]-1],table_content_list[t_r[i+2]:t_r[i+3]-1],table_content_list[t_r[i+3]:t_r[i+4]-1])))
elif len(t_r)==4:
for i in range(1):
rows=(list(zip(table_content_list[t_r[i]:t_r[i+1]-1],table_content_list[t_r[i+1]:t_r[i+2]-1],table_content_list[t_r[i+2]:t_r[i+3]-1])))
elif len(t_r)==3:
for i in range(1):
rows=(list(zip(table_content_list[t_r[i]:t_r[i+1]-1],table_content_list[t_r[i+1]:t_r[i+2]-1])))
else:
rows=[]
header = list(keys)
table_str = tabulate.tabulate(rows, headers=header)
# query.message.reply_text(text=f"您選擇的停車場是: {carpark_name} \n時租收費\U0001F4B5:{fee}")
query.message.reply_text(text=f"您選擇的停車場是: {carpark_name} \n {table_str}", parse_mode='HTML')
# def error(update, context):
# """Log Errors caused by Updates."""
# logging.warning('Update "%s" caused error "%s"', update, context.error)
def main():
# Load your token and create an Updater for your Bot
updater = Updater(token=("6261863634:AAHcBD5htGdiC6BCu4-rxJBCYaLpQWw1qt4"), use_context=True)
dispatcher = updater.dispatcher
conv_handler = ConversationHandler(
entry_points=[CommandHandler('start', start)],
states={
LOCATION: [MessageHandler(Filters.text & ~Filters.command, input_location)],
SELECT_CAR_PARK: [CallbackQueryHandler(carpark_name_callback)],
SELECT_INFO: [CallbackQueryHandler(content_callback)]
},
fallbacks=[CommandHandler('start', start)]
)
dispatcher.add_handler(conv_handler)
# # Add the command handler for the /start command
# dispatcher.add_handler(CommandHandler('start', start))
# # Set up a message handler to call the save_location function
# message_handler = MessageHandler(Filters.text & ~Filters.command, input_location)
# dispatcher.add_handler(message_handler)
# dispatcher.add_handler(CallbackQueryHandler(carpark_name_callback))
# dispatcher.add_handler(CallbackQueryHandler(content_callback))
# add the error handler to the dispatcher
# dispatcher.add_error_handler(error)
# Start the bot
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()