Skip to content

Quote plugin (updated) #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 347 additions & 0 deletions src/csbot/plugins/quote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
import re
import random
import functools
import collections

import attr
import pymongo
import requests

from csbot.plugin import Plugin
from csbot.util import nick


@attr.s
class QuoteRecord:
quote_id = attr.ib()
channel = attr.ib()
nick = attr.ib()
message = attr.ib()

def format(self, show_channel=False, show_id=True):
""" Formats a quote into a prettified string.
>>> self.format()
"[3] <Alan> some silly quote..."
>>> self.format(show_channel=True, show_id=False)
"#test - <Alan> silly quote"
"""
if show_channel and show_id:
fmt = '[{quoteId}] - {channel} - <{nick}> {message}'
elif show_channel and not show_id:
fmt = '{channel} - <{nick}> {message}'
elif not show_channel and show_id:
fmt = '[{quoteId}] <{nick}> {message}'
else:
fmt = '<{nick}> {message}'

return fmt.format(quoteId=self.quote_id, channel=self.channel, nick=self.nick, message=self.message)

def __bool__(self):
return True

def to_udict(self):
return {'quoteId': self.quote_id, 'nick': self.nick, 'channel': self.channel, 'message': self.message}

@classmethod
def from_udict(cls, udict):
return cls(quote_id=udict['quoteId'],
channel=udict['channel'],
nick=udict['nick'],
message=udict['message'],
)


class QuoteDB:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def quote_from_id(self, quote_id):
"""gets a quote with some `quoteId` from the database
returns None if no such quote exists
"""
return QuoteRecord.from_udict(self.quotedb.find_one({'quoteId': quote_id}))

def set_current_quote_id(self, id):
""" Sets the last quote id
We keep track of the latest quote ID (they're sequential) in the database
To update it we remove the old one and insert a new record.
"""
self.quotedb.replace_one({'header': 'currentQuoteId'},
{'header': 'currentQuoteId', 'maxQuoteId': id},
upsert=True)

def get_current_quote_id(self):
""" Gets the current maximum quote ID
"""
id_dict = self.quotedb.find_one({'header': 'currentQuoteId'})
if id_dict is not None:
current_id = id_dict['maxQuoteId']
else:
current_id = -1

return current_id

def insert_quote(self, quote):
""" Remember a quote by storing it in the database
Inserts a {'user': user, 'channel': channel, 'message': msg}
or {'account': accnt, 'channel': channel, 'message': msg}
quote into the persistent storage.
"""

id = self.get_current_quote_id()
sId = id + 1
quote.quote_id = sId
self.quotedb.insert_one(quote.to_udict())
self.set_current_quote_id(sId)
return sId

def remove_quote(self, quote_id):
""" Remove a given quote from the database
Returns False if the quoteId is invalid or does not exist.
"""

try:
id = int(quote_id)
except ValueError:
return False
else:
q = self.quote_from_id(id)
if not q:
return False

self.quotedb.delete_one({'quoteId': q.quote_id})

return True

def find_quotes(self, nick=None, channel=None, pattern=None, direction=pymongo.ASCENDING):
""" Finds and yields all quotes for a particular nick on a given channel
"""
if nick is None or nick == '*':
user = {'channel': channel}
elif channel is not None:
user = {'channel': channel, 'nick': nick}
else:
user = {'nick': nick}

for quote in self.quotedb.find(user, sort=[('quoteId', direction)]):
if message_matches(quote['message'], pattern=pattern):
yield QuoteRecord.from_udict(quote)


class Quote(Plugin, QuoteDB):
"""Attach channel specific quotes to a user
"""
quotedb = Plugin.use('mongodb', collection='quotedb')

PLUGIN_DEPENDS = ['usertrack', 'auth']

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel_logs = collections.defaultdict(functools.partial(collections.deque, maxlen=100))

def quote_set(self, nick, channel, pattern=None):
""" Insert the last matching quote from a user on a particular channel into the quotes database.
"""
for q in self.channel_logs[channel]:
if nick == q.nick and channel == q.channel and message_matches(q.message, pattern=pattern):
self.insert_quote(q)
return q
return None

@Plugin.command('remember',
help="remember <nick> [<pattern>]: adds last quote that matches <pattern> to the database")
def remember(self, e):
""" Remembers the last matching quote from a user
"""
data = e['data'].strip()
channel = e['channel']
user_nick = nick(e['user'])

m = re.fullmatch(r'(?P<nick>\S+)', data)
if m:
return self.remember_quote(e, user_nick, m.group('nick'), channel, None)

m = re.fullmatch(r'(?P<nick>\S+)\s+(?P<pattern>.+)', data)
if m:
return self.remember_quote(e, user_nick, m.group('nick'), channel, m.group('pattern').strip())

e.reply('Error: invalid command')

def remember_quote(self, e, user, nick, channel, pattern):
quote = self.quote_set(nick, channel, pattern)
if quote is None:
if pattern is not None:
e.reply(f'No data for {nick} found matching "{pattern}"')
else:
e.reply(f'No data for {nick}')
else:
self.bot.reply(user, 'remembered "{}"'.format(quote.format(show_id=False)))

@Plugin.command('quote', help=("quote [<nick> [<pattern>]]: looks up quotes from <nick>"
" (optionally only those matching <pattern>)"))
def quote(self, e):
""" Lookup quotes for the given channel/nick and outputs one
"""
data = e['data']
channel = e['channel']

if data.strip() == '':
return e.reply(self.find_a_quote(None, channel, None))

m = re.fullmatch(r'(?P<nick>\S+)', data)
if m:
return e.reply(self.find_a_quote(m.group('nick'), channel, None))

m = re.fullmatch(r'(?P<nick>\S+)\s+(?P<pattern>.+)', data)
if m:
return e.reply(self.find_a_quote(m.group('nick'), channel, m.group('pattern')))

def find_a_quote(self, nick, channel, pattern):
""" Finds a random matching quote from a user on a specific channel
Returns the formatted quote string
"""
res = list(self.find_quotes(nick, channel, pattern))
if not res:
if nick is None:
return 'No data'
else:
return 'No data for {}'.format(nick)
else:
out = random.choice(res)
return out.format(show_channel=False)

@Plugin.command('quote.list', help=("quote.list [<pattern>]: looks up all quotes on the channel"))
def quote_list(self, e):
""" Look for all quotes that match a given pattern in a channel
This action pastes multiple lines and so needs authorization.
"""
channel = e['channel']
nick_ = nick(e['user'])

if not self.bot.plugins['auth'].check_or_error(e, 'quote', channel):
return

if channel == self.bot.nick:
# first argument must be a channel
data = e['data'].split(maxsplit=1)

# TODO: use assignment expressions here when they come out
# https://www.python.org/dev/peps/pep-0572/
just_channel = re.fullmatch(r'(?P<channel>\S+)', data)
channel_and_pat = re.fullmatch(r'(?P<channel>\S+)\s+(?P<pattern>.+)', data)
if just_channel:
return self.reply_with_summary(nick_, just_channel.group('channel'), None)
elif channel_and_pat:
return self.reply_with_summary(nick_,
channel_and_pat.group('channel'),
channel_and_pat.group('pattern'))

return e.reply('Invalid command. Syntax in privmsg is !quote.list <channel> [<pattern>]')
else:
pattern = e['data']
return self.reply_with_summary(nick_, channel, pattern)

def reply_with_summary(self, to, channel, pattern):
""" Helper to list all quotes for a summary paste.
"""
for line in self.quote_summary(channel, pattern=pattern):
self.bot.reply(to, line)

def quote_summary(self, channel, pattern=None, dpaste=True):
""" Search through all quotes for a channel and optionally paste a list of them
Returns the last 5 matching quotes only, the remainder are added to a pastebin.
"""
quotes = list(self.find_quotes(nick=None, channel=channel, pattern=pattern, direction=pymongo.DESCENDING))
if not quotes:
if pattern:
yield 'No quotes for channel {} that match "{}"'.format(channel, pattern)
else:
yield 'No quotes for channel {}'.format(channel)

return

for q in quotes[:5]:
yield q.format(show_channel=True)

if dpaste and len(quotes) > 5:
paste_link = self.paste_quotes(quotes)
if paste_link:
yield 'Full summary at: {}'.format(paste_link)
else:
self.log.warn(f'Failed to upload full summary: {paste_link}')

def paste_quotes(self, quotes):
""" Pastebins a the last 100 quotes and returns the url
"""
paste_content = '\n'.join(q.format(show_channel=True) for q in quotes[:100])
if len(quotes) > 100:
paste_content = 'Latest 100 quotes:\n' + paste_content

req = requests.post('http://dpaste.com/api/v2/', {'content': paste_content})
if req:
return req.content.decode('utf-8').strip()

return req # return the failed request to handle error later

@Plugin.command('quote.remove', help=("quote.remove <id> [, <id>]*: removes quotes from the database"))
def quotes_remove(self, e):
"""Lookup the given quotes and remove them from the database transcationally
"""
data = e['data'].split(',')
channel = e['channel']

if not self.bot.plugins['auth'].check_or_error(e, 'quote', e['channel']):
return

if len(data) < 1:
return e.reply('No quoteID supplied')

ids = [qId.strip() for qId in data]
invalid_ids = []
for id in ids:
if id == '-1':
# special case -1, to be the last
try:
q = next(self.find_quotes(nick=None, channel=channel, pattern=None, direction=pymongo.DESCENDING))
except StopIteration:
invalid_ids.append(id)
continue

id = q.quote_id

if not self.remove_quote(id):
invalid_ids.append(id)

if invalid_ids:
str_invalid_ids = ', '.join(str(id) for id in invalid_ids)
return e.reply('Error: could not remove quote(s) with ID: {ids}'.format(ids=str_invalid_ids))

@Plugin.hook('core.message.privmsg')
def log_privmsgs(self, e):
"""Register privmsgs for a channel and stick them into the log for that channel
this is merely an in-memory deque, so won't survive across restarts/crashes
"""
msg = e['message']

channel = e['channel']
user = nick(e['user'])
quote = QuoteRecord(None, channel, user, msg)
self.channel_logs[channel].appendleft(quote)


def message_matches(msg, pattern=None):
""" Check whether the given message matches the given pattern
If there is no pattern, it is treated as a wildcard and all messages match.
"""
if pattern is None:
return True

return re.search(pattern, msg) is not None
12 changes: 12 additions & 0 deletions src/csbot/util.py
Original file line number Diff line number Diff line change
@@ -189,6 +189,18 @@ def ordinal(value):
return ordval


def subdict(d1, d2):
"""returns True if d1 is a "subset" of d2
i.e. if forall keys k in d1, k in d2 and d1[k] == d2[k]
"""
for k1 in d1:
if k1 not in d2:
return False
if d1[k1] != d2[k1]:
return False
return True


def pluralize(n, singular, plural):
return '{0} {1}'.format(n, singular if n == 1 else plural)

230 changes: 230 additions & 0 deletions tests/test_plugin_quote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import asyncio
from unittest import mock

import mongomock
import pytest

from csbot.plugins.quote import QuoteRecord
from csbot.util import subdict


class TestQuoteRecord:
def test_quote_formatter(self):
quote = QuoteRecord(quote_id=0, channel='#First', nick='Nick', message='test')
assert quote.format() == '[0] <Nick> test'
assert quote.format(show_id=False) == '<Nick> test'
assert quote.format(show_channel=True) == '[0] - #First - <Nick> test'
assert quote.format(show_channel=True, show_id=False) == '#First - <Nick> test'

def test_quote_deserialise(self):
udict = {'quoteId': 0, 'channel': '#First', 'message': 'test', 'nick': 'Nick'}
qr = QuoteRecord(quote_id=0, channel='#First', nick='Nick', message='test')
assert QuoteRecord.from_udict(udict) == qr

def test_quote_serialise(self):
udict = {'quoteId': 0, 'channel': '#First', 'message': 'test', 'nick': 'Nick'}
qr = QuoteRecord(quote_id=0, channel='#First', nick='Nick', message='test')
assert qr.to_udict() == udict


class TestQuotePlugin:
pytestmark = [
pytest.mark.bot(config="""
["@bot"]
plugins = ["mongodb", "usertrack", "auth", "quote"]
[auth]
nickaccount = "#First:quote"
otheraccount = "#Second:quote"
[mongodb]
mode = "mock"
"""),
pytest.mark.usefixtures("run_client"),
]

@pytest.fixture(autouse=True)
def quote_plugin(self, bot_helper):
self.bot_helper = bot_helper
self.quote = self.bot_helper['quote']

# Force the test to fail if not using a mock database. This prevents the tests from accidentally
# polluting a real database in the evnet of failure.
assert isinstance(self.quote.quotedb, mongomock.Collection), \
'Not mocking MongoDB -- may be writing to actual database (!) (aborted test)'

self.mock_paste_quotes = mock.MagicMock(wraps=self.quote.paste_quotes, return_value='N/A')
with mock.patch.object(self.quote, 'paste_quotes', self.mock_paste_quotes):
yield

async def _recv_line(self, line):
return await asyncio.wait(self.bot_helper.receive(line))

async def _recv_privmsg(self, name, channel, msg):
return await self._recv_line(f':{name} PRIVMSG {channel} :{msg}')

def assert_sent_quote(self, channel, quote_id, quoted_user, quoted_channel, quoted_text, show_channel=False):
quote = QuoteRecord(quote_id=quote_id,
channel=quoted_channel,
nick=quoted_user,
message=quoted_text)
self.bot_helper.assert_sent('NOTICE {} :{}'.format(channel, quote.format()))

def test_quote_empty(self):
assert list(self.quote.find_quotes('noQuotesForMe', '#anyChannel')) == []

async def test_client_quote_add(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')
await self._recv_privmsg('Nick!~user@host', '#First', '!quote Nick')
self.assert_sent_quote('#First', 0, 'Nick', '#First', 'test data')

async def test_client_quote_remember_send_privmsg(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')
self.bot_helper.assert_sent('NOTICE Other :remembered "<Nick> test data"')

async def test_client_quote_add_pattern_find(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data#1')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')

await self._recv_privmsg('Nick!~user@host', '#First', 'test data#2')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')

await self._recv_privmsg('Nick!~user@host', '#First', '!quote Nick data#2')
self.assert_sent_quote('#First', 1, 'Nick', '#First', 'test data#2')

async def test_client_quotes_not_exist(self):
await self._recv_privmsg('Nick!~user@host', '#First', '!quote Nick')
self.bot_helper.assert_sent('NOTICE #First :No data for Nick')

async def test_client_quote_add_multi(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data')
await self._recv_privmsg('Nick!~user@host', '#First', 'other data')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick test')
await self._recv_privmsg('Nick!~user@host', '#First', '!quote Nick')
self.assert_sent_quote('#First', 0, 'Nick', '#First', 'test data')

async def test_client_quote_channel_specific_logs(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data')
await self._recv_privmsg('Nick!~user@host', '#First', 'other data')

await self._recv_privmsg('Other!~user@host', '#Second', '!remember Nick')
self.bot_helper.assert_sent('NOTICE #Second :No data for Nick')

await self._recv_privmsg('Other!~user@host', '#Second', '!quote Nick')
self.bot_helper.assert_sent('NOTICE #Second :No data for Nick')

async def test_client_quote_channel_specific_quotes(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data')
await self._recv_privmsg('Nick!~user@host', '#Second', 'other data')

await self._recv_privmsg('Other!~user@host', '#Second', '!remember Nick')
await self._recv_privmsg('Other!~user@host', '#Second', '!quote Nick')
self.assert_sent_quote('#Second', 0, 'Nick', '#Second', 'other data')

await self._recv_privmsg('Another!~user@host', '#First', '!remember Nick')
await self._recv_privmsg('Other!~user@host', '#First', '!quote Nick')
self.assert_sent_quote('#First', 1, 'Nick', '#First', 'test data')

async def test_client_quote_channel_fill_logs(self):
for i in range(150):
await self._recv_privmsg('Nick!~user@host', '#First', f'test data#{i}')
await self._recv_privmsg('Nick!~user@host', '#Second', f'other data#{i}')

await self._recv_privmsg('Other!~user@host', '#Second', '!remember Nick data#135')
await self._recv_privmsg('Other!~user@host', '#Second', '!quote Nick')
self.assert_sent_quote('#Second', 0, 'Nick', '#Second', 'other data#135')

async def test_client_quotes_format(self):
"""make sure the format !quote.list yields is readable and goes to the right place
"""
await self._recv_line(":Other!~other@otherhost ACCOUNT otheraccount")

await self._recv_privmsg('Nick!~user@host', '#Second', 'data test')
await self._recv_privmsg('Other!~user@host', '#Second', '!remember Nick')

await self._recv_privmsg('Other!~user@host', '#Second', '!quote.list')
self.bot_helper.assert_sent('NOTICE Other :[0] - #Second - <Nick> data test')

async def test_client_quotes_list(self):
"""ensure the list !quote.list sends is short and redirects to pastebin
"""
await self._recv_line(":Nick!~user@host ACCOUNT nickaccount")
await self._recv_line(":Other!~other@otherhost ACCOUNT otheraccount")

# stick some quotes in a thing
data = [f'test data#{i}' for i in range(10)]
for msg in data:
await self._recv_privmsg('Nick!~user@host', '#Second', msg)
await self._recv_privmsg('Other!~user@host', '#Second', '!remember Nick')

await self._recv_privmsg('Other!~user@host', '#Second', '!quote.list')

quotes = [QuoteRecord(quote_id=i, channel='#Second', nick='Nick', message=d) for i, d in enumerate(data)]
quotes = reversed(quotes)
msgs = ['NOTICE {channel} :{msg}'.format(channel='Other',
msg=q.format(show_channel=True)) for q in quotes]
self.bot_helper.assert_sent(msgs[:5])

# manually unroll the call args to map subdict over it
# so we can ignore the cruft mongo inserts
quote_calls = self.quote.paste_quotes.call_args
qarg, = quote_calls[0] # args
for quote, document in zip(quotes, qarg):
assert subdict(quote, document)

async def test_client_quote_remove(self):
await self._recv_line(":Nick!~user@host ACCOUNT nickaccount")

await self._recv_privmsg('Nick!~user@host', '#First', 'test data#1')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')

await self._recv_privmsg('Nick!~user@host', '#First', 'test data#2')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')

await self._recv_privmsg('Nick!~user@host', '#First', '!quote.remove -1')
await self._recv_privmsg('Nick!~user@host', '#First', '!quote.remove 0')

await self._recv_privmsg('Nick!~user@host', '#First', '!quote Nick')
self.bot_helper.assert_sent('NOTICE #First :No data for Nick')

async def test_client_quote_remove_no_permission(self):
await self._recv_line(":Other!~other@otherhost ACCOUNT otheraccount")

await self._recv_privmsg('Nick!~user@host', '#First', 'test data#1')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')
await self._recv_privmsg('Other!~user@host', '#First', '!quote.remove -1')

self.bot_helper.assert_sent('NOTICE #First :error: otheraccount not authorised for #First:quote')

async def test_client_quote_remove_no_quotes(self):
await self._recv_line(":Nick!~user@host ACCOUNT nickaccount")
await self._recv_privmsg('Nick!~user@host', '#First', '!quote.remove -1')

self.bot_helper.assert_sent('NOTICE #First :Error: could not remove quote(s) with ID: -1')

async def test_client_quote_list_no_permission(self):
await self._recv_line(":Other!~other@otherhost ACCOUNT otheraccount")

await self._recv_privmsg('Nick!~user@host', '#First', 'test data#1')
await self._recv_privmsg('Other!~user@host', '#First', '!remember Nick')
await self._recv_privmsg('Other!~user@host', '#First', '!quote.list')

self.bot_helper.assert_sent('NOTICE #First :error: otheraccount not authorised for #First:quote')

async def test_client_quote_channelwide(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data!')
await self._recv_privmsg('Other!~other@host', '#First', '!remember Nick')
await self._recv_privmsg('Other!~other@host', '#First', '!quote')
self.assert_sent_quote('#First', 0, 'Nick', '#First', 'test data!')

async def test_client_quote_channelwide_with_pattern(self):
await self._recv_privmsg('Nick!~user@host', '#First', 'test data!')
await self._recv_privmsg('Other!~other@host', '#First', '!remember Nick')

await self._recv_privmsg('Other!~other@host', '#First', 'other data')
await self._recv_privmsg('Nick!~user@host', '#First', '!remember Other')

await self._recv_privmsg('Other!~other@host', '#First', '!quote * other')
self.assert_sent_quote('#First', 1, 'Other', '#First', 'other data')