diff --git a/howdoi/config.py b/howdoi/config.py new file mode 100644 index 000000000..6e32babb9 --- /dev/null +++ b/howdoi/config.py @@ -0,0 +1,99 @@ +import os +import sys +import appdirs +from cachelib import FileSystemCache, NullCache + +# Handle imports for Python 2 and 3 +if sys.version < "3": + import codecs + from urllib import quote as url_quote + from urllib import getproxies + from urlparse import urlparse, parse_qs + + # Handling Unicode: http://stackoverflow.com/a/6633040/305414 + def u(x): + return codecs.unicode_escape_decode(x)[0] + + +else: + from urllib.request import getproxies + from urllib.parse import quote as url_quote, urlparse, parse_qs + + def u(x): + return x + + +if os.getenv("HOWDOI_DISABLE_SSL"): # Set http instead of https + SCHEME = "http://" + VERIFY_SSL_CERTIFICATE = False +else: + SCHEME = "https://" + VERIFY_SSL_CERTIFICATE = True + +SUPPORTED_SEARCH_ENGINES = ("google", "bing", "duckduckgo") + +URL = os.getenv("HOWDOI_URL") or "stackoverflow.com" + +USER_AGENTS = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:11.0) Gecko/20100101 Firefox/11.0", + "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100 101 Firefox/22.0", + "Mozilla/5.0 (Windows NT 6.1; rv:11.0) Gecko/20100101 Firefox/11.0", + ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_4) AppleWebKit/536.5 (KHTML, like Gecko) " + "Chrome/19.0.1084.46 Safari/536.5" + ), + ( + "Mozilla/5.0 (Windows; Windows NT 6.1) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.46" + "Safari/536.5" + ), +) +SEARCH_URLS = { + "bing": SCHEME + "www.bing.com/search?q=site:{0}%20{1}&hl=en", + "google": SCHEME + "www.google.com/search?q=site:{0}%20{1}&hl=en", + "duckduckgo": SCHEME + "duckduckgo.com/?q=site:{0}%20{1}&t=hj&ia=web", +} + +BLOCK_INDICATORS = ( + 'form id="captcha-form"', + "This page appears when Google automatically detects requests coming from your computer " + 'network which appear to be in violation of the Terms of Service', +) + +BLOCKED_QUESTION_FRAGMENTS = ("webcache.googleusercontent.com",) + +STAR_HEADER = u("\u2605") +ANSWER_HEADER = u("{2} Answer from {0} {2}\n{1}") +NO_ANSWER_MSG = "< no answer given >" + +CACHE_EMPTY_VAL = "NULL" +CACHE_DIR = appdirs.user_cache_dir("howdoi") +CACHE_ENTRY_MAX = 128 + +HTML_CACHE_PATH = "cache_html" +SUPPORTED_HELP_QUERIES = [ + "use howdoi", + "howdoi", + "run howdoi", + "do howdoi", + "howdoi howdoi", + "howdoi use howdoi", +] + +# variables for text formatting, prepend to string to begin text formatting. +BOLD = "\033[1m" +GREEN = "\033[92m" +RED = "\033[91m" +UNDERLINE = "\033[4m" +END_FORMAT = "\033[0m" # append to string to end text formatting. + +# stash options +STASH_SAVE = "save" +STASH_VIEW = "view" +STASH_REMOVE = "remove" +STASH_EMPTY = "empty" + + +if os.getenv("HOWDOI_DISABLE_CACHE"): + cache = NullCache() # works like an always empty cache +else: + cache = FileSystemCache(CACHE_DIR, CACHE_ENTRY_MAX, default_timeout=0) diff --git a/howdoi/howdoi.py b/howdoi/howdoi.py index 4761644a0..faf87ea4a 100755 --- a/howdoi/howdoi.py +++ b/howdoi/howdoi.py @@ -9,667 +9,189 @@ ###################################################### from __future__ import print_function + import gc -gc.disable() # noqa: E402 -import argparse +import inspect import os -import appdirs -import re -from cachelib import FileSystemCache, NullCache -import json -import requests +import pkgutil import sys -from . import __version__ - -from keep import utils as keep_utils - -from pygments import highlight -from pygments.lexers import guess_lexer, get_lexer_by_name -from pygments.formatters.terminal import TerminalFormatter -from pygments.util import ClassNotFound - -from pyquery import PyQuery as pq -from requests.exceptions import ConnectionError -from requests.exceptions import SSLError - - -# Handle imports for Python 2 and 3 -if sys.version < '3': - import codecs - from urllib import quote as url_quote - from urllib import getproxies - from urlparse import urlparse, parse_qs - - # Handling Unicode: http://stackoverflow.com/a/6633040/305414 - def u(x): - return codecs.unicode_escape_decode(x)[0] -else: - from urllib.request import getproxies - from urllib.parse import quote as url_quote, urlparse, parse_qs - - def u(x): - return x - - -# rudimentary standardized 3-level log output -def _print_err(x): print("[ERROR] " + x) - - -_print_ok = print # noqa: E305 -def _print_dbg(x): print("[DEBUG] " + x) # noqa: E302 - - -if os.getenv('HOWDOI_DISABLE_SSL'): # Set http instead of https - SCHEME = 'http://' - VERIFY_SSL_CERTIFICATE = False -else: - SCHEME = 'https://' - VERIFY_SSL_CERTIFICATE = True - -SUPPORTED_SEARCH_ENGINES = ('google', 'bing', 'duckduckgo') - -URL = os.getenv('HOWDOI_URL') or 'stackoverflow.com' - -USER_AGENTS = ('Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:11.0) Gecko/20100101 Firefox/11.0', - 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100 101 Firefox/22.0', - 'Mozilla/5.0 (Windows NT 6.1; rv:11.0) Gecko/20100101 Firefox/11.0', - ('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_4) AppleWebKit/536.5 (KHTML, like Gecko) ' - 'Chrome/19.0.1084.46 Safari/536.5'), - ('Mozilla/5.0 (Windows; Windows NT 6.1) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.46' - 'Safari/536.5'), ) -SEARCH_URLS = { - 'bing': SCHEME + 'www.bing.com/search?q=site:{0}%20{1}&hl=en', - 'google': SCHEME + 'www.google.com/search?q=site:{0}%20{1}&hl=en', - 'duckduckgo': SCHEME + 'duckduckgo.com/?q=site:{0}%20{1}&t=hj&ia=web' -} - -BLOCK_INDICATORS = ( - 'form id="captcha-form"', - 'This page appears when Google automatically detects requests coming from your computer ' - 'network which appear to be in violation of the Terms of Service' +from howdoi.utils import ( + STASH_EMPTY, + STASH_REMOVE, + STASH_SAVE, + STASH_VIEW, + SUPPORTED_SEARCH_ENGINES, + _clear_cache, + _print_err, + _print_ok, + cache, + get_parser, + howdoi_session, + keep_utils, + print_stash, + prompt_stash_remove, ) -BLOCKED_QUESTION_FRAGMENTS = ( - 'webcache.googleusercontent.com', -) - -STAR_HEADER = u('\u2605') -ANSWER_HEADER = u('{2} Answer from {0} {2}\n{1}') -NO_ANSWER_MSG = '< no answer given >' - -CACHE_EMPTY_VAL = "NULL" -CACHE_DIR = appdirs.user_cache_dir('howdoi') -CACHE_ENTRY_MAX = 128 - -HTML_CACHE_PATH = 'cache_html' -SUPPORTED_HELP_QUERIES = ['use howdoi', 'howdoi', 'run howdoi', - 'do howdoi', 'howdoi howdoi', 'howdoi use howdoi'] - -# variables for text formatting, prepend to string to begin text formatting. -BOLD = '\033[1m' -GREEN = '\033[92m' -RED = '\033[91m' -UNDERLINE = '\033[4m' -END_FORMAT = '\033[0m' # append to string to end text formatting. - -# stash options -STASH_SAVE = 'save' -STASH_VIEW = 'view' -STASH_REMOVE = 'remove' -STASH_EMPTY = 'empty' - -if os.getenv('HOWDOI_DISABLE_CACHE'): - cache = NullCache() # works like an always empty cache -else: - cache = FileSystemCache(CACHE_DIR, CACHE_ENTRY_MAX, default_timeout=0) - -howdoi_session = requests.session() - - -class BlockError(RuntimeError): - pass - - -def _random_int(width): - bres = os.urandom(width) - if sys.version < '3': - ires = int(bres.encode('hex'), 16) - else: - ires = int.from_bytes(bres, 'little') - - return ires - - -def _random_choice(seq): - return seq[_random_int(1) % len(seq)] - - -def get_proxies(): - proxies = getproxies() - filtered_proxies = {} - for key, value in proxies.items(): - if key.startswith('http'): - if not value.startswith('http'): - filtered_proxies[key] = 'http://%s' % value - else: - filtered_proxies[key] = value - return filtered_proxies - - -def _format_url_to_filename(url, file_ext='html'): - filename = ''.join(ch for ch in url if ch.isalnum()) - return filename + '.' + file_ext - - -def _get_result(url): - try: - return howdoi_session.get(url, headers={'User-Agent': _random_choice(USER_AGENTS)}, - proxies=get_proxies(), - verify=VERIFY_SSL_CERTIFICATE).text - except requests.exceptions.SSLError as e: - _print_err('Encountered an SSL Error. Try using HTTP instead of ' - 'HTTPS by setting the environment variable "HOWDOI_DISABLE_SSL".\n') - raise e - - -def _add_links_to_text(element): - hyperlinks = element.find('a') - - for hyperlink in hyperlinks: - pquery_object = pq(hyperlink) - href = hyperlink.attrib['href'] - copy = pquery_object.text() - if (copy == href): - replacement = copy - else: - replacement = "[{0}]({1})".format(copy, href) - pquery_object.replace_with(replacement) - - -def get_text(element): - ''' return inner text in pyquery element ''' - _add_links_to_text(element) - try: - return element.text(squash_space=False) - except TypeError: - return element.text() - - -def _extract_links_from_bing(html): - html.remove_namespaces() - return [a.attrib['href'] for a in html('.b_algo')('h2')('a')] - - -def _extract_links_from_google(html): - return [a.attrib['href'] for a in html('.l')] or \ - [a.attrib['href'] for a in html('.r')('a')] - - -def _extract_links_from_duckduckgo(html): - html.remove_namespaces() - links_anchors = html.find('a.result__a') - results = [] - for anchor in links_anchors: - link = anchor.attrib['href'] - url_obj = urlparse(link) - parsed_url = parse_qs(url_obj.query).get('uddg', '') - if parsed_url: - results.append(parsed_url[0]) - return results - - -def _extract_links(html, search_engine): - if search_engine == 'bing': - return _extract_links_from_bing(html) - if search_engine == 'duckduckgo': - return _extract_links_from_duckduckgo(html) - return _extract_links_from_google(html) - - -def _get_search_url(search_engine): - return SEARCH_URLS.get(search_engine, SEARCH_URLS['google']) - - -def _is_blocked(page): - for indicator in BLOCK_INDICATORS: - if page.find(indicator) != -1: - return True - - return False - - -def _get_links(query): - search_engine = os.getenv('HOWDOI_SEARCH_ENGINE', 'google') - search_url = _get_search_url(search_engine) - - result = _get_result(search_url.format(URL, url_quote(query))) - if _is_blocked(result): - _print_err('Unable to find an answer because the search engine temporarily blocked the request. ' - 'Please wait a few minutes or select a different search engine.') - raise BlockError("Temporary block by search engine") - - html = pq(result) - return _extract_links(html, search_engine) - - -def get_link_at_pos(links, position): - if not links: - return False - - if len(links) >= position: - link = links[position - 1] - else: - link = links[-1] - return link - - -def _format_output(code, args): - if not args['color']: - return code - lexer = None - - # try to find a lexer using the StackOverflow tags - # or the query arguments - for keyword in args['query'].split() + args['tags']: - try: - lexer = get_lexer_by_name(keyword) - break - except ClassNotFound: - pass - - # no lexer found above, use the guesser - if not lexer: - try: - lexer = guess_lexer(code) - except ClassNotFound: - return code - - return highlight(code, - lexer, - TerminalFormatter(bg='dark')) - - -def _is_question(link): - for fragment in BLOCKED_QUESTION_FRAGMENTS: - if fragment in link: - return False - return re.search(r'questions/\d+/', link) - - -def _get_questions(links): - return [link for link in links if _is_question(link)] - - -def _get_answer(args, links): - link = get_link_at_pos(links, args['pos']) - if not link: - return False - - cache_key = link - page = cache.get(link) - if not page: - page = _get_result(link + '?answertab=votes') - cache.set(cache_key, page) - - html = pq(page) - - first_answer = html('.answer').eq(0) - - instructions = first_answer.find('pre') or first_answer.find('code') - args['tags'] = [t.text for t in html('.post-tag')] - - if not instructions and not args['all']: - text = get_text(first_answer.find('.post-text').eq(0)) - elif args['all']: - texts = [] - for html_tag in first_answer.items('.post-text > *'): - current_text = get_text(html_tag) - if current_text: - if html_tag[0].tag in ['pre', 'code']: - texts.append(_format_output(current_text, args)) - else: - texts.append(current_text) - text = '\n'.join(texts) - else: - text = _format_output(get_text(instructions.eq(0)), args) - if text is None: - text = NO_ANSWER_MSG - text = text.strip() - return text - - -def _get_links_with_cache(query): - cache_key = query + "-links" - res = cache.get(cache_key) - if res: - if res == CACHE_EMPTY_VAL: - res = False - return res - - links = _get_links(query) - if not links: - cache.set(cache_key, CACHE_EMPTY_VAL) - - question_links = _get_questions(links) - cache.set(cache_key, question_links or CACHE_EMPTY_VAL) - - return question_links - - -def build_splitter(splitter_character='=', splitter_length=80): - return '\n' + splitter_character * splitter_length + '\n\n' - - -def _get_answers(args): - """ - @args: command-line arguments - returns: array of answers and their respective metadata - False if unable to get answers - """ - - question_links = _get_links_with_cache(args['query']) - if not question_links: - return False - - answers = [] - initial_position = args['pos'] - multiple_answers = (args['num_answers'] > 1 or args['all']) - - for answer_number in range(args['num_answers']): - current_position = answer_number + initial_position - args['pos'] = current_position - link = get_link_at_pos(question_links, current_position) - answer = _get_answer(args, question_links) - if not answer: - continue - if not args['link'] and not args['json_output'] and multiple_answers: - answer = ANSWER_HEADER.format(link, answer, STAR_HEADER) - answer += '\n' - answers.append({ - 'answer': answer, - 'link': link, - 'position': current_position - }) - - return answers - - -def _clear_cache(): - global cache - if not cache: - cache = FileSystemCache(CACHE_DIR, CACHE_ENTRY_MAX, 0) - - return cache.clear() - - -def _is_help_query(query): - return any([query.lower() == help_query for help_query in SUPPORTED_HELP_QUERIES]) - - -def _format_answers(res, args): - if "error" in res: - return res["error"] - - if args["json_output"]: - return json.dumps(res) - - formatted_answers = [] - - for answer in res: - next_ans = answer["answer"] - if args["link"]: # if we only want links - next_ans = answer["link"] - formatted_answers.append(next_ans) - - return build_splitter().join(formatted_answers) - - -def _get_help_instructions(): - instruction_splitter = build_splitter(' ', 60) - query = 'print hello world in python' - instructions = [ - 'Here are a few popular howdoi commands ', - '>>> howdoi {} (default query)', - '>>> howdoi {} -a (read entire answer)', - '>>> howdoi {} -n [number] (retrieve n number of answers)', - '>>> howdoi {} -l (display only a link to where the answer is from', - '>>> howdoi {} -c (Add colors to the output)', - '>>> howdoi {} -e (Specify the search engine you want to use e.g google,bing)' - ] - - instructions = map(lambda s: s.format(query), instructions) - - return instruction_splitter.join(instructions) - - -def _get_cache_key(args): - return str(args) + __version__ - - -def format_stash_item(fields, index = -1): - title = fields['alias'] - description = fields['desc'] - item_num = index + 1 - if index == -1: - return '{underline}{bold}$ {title}{end_format}\n\n{description}\n'.format( - underline=UNDERLINE, - bold=BOLD, - title=title, - end_format=END_FORMAT, - description=description) - return '{underline}{bold}$ [{item_num}] {title}{end_format}\n\n{description}\n'.format( - underline=UNDERLINE, - bold=BOLD, - item_num=item_num, - title=title, - end_format=END_FORMAT, - description=description) - - -def print_stash(stash_list = []): - if len(stash_list) == 0: - stash_list = ['\nSTASH LIST:'] - commands = keep_utils.read_commands() - if commands is None or len(commands.items()) == 0: - print('No commands found in stash. Add a command with "howdoi --{stash_save} ".'.format(stash_save=STASH_SAVE)) - return - for cmd, fields in commands.items(): - stash_list.append(format_stash_item(fields)) - else: - stash_list = [format_stash_item(x['fields'], i) for i, x in enumerate(stash_list)] - print(build_splitter('#').join(stash_list)) - - -def _get_stash_key(args): - stash_args = {} - ignore_keys = [STASH_SAVE, STASH_VIEW, STASH_REMOVE, STASH_EMPTY, 'tags'] # ignore these for stash key. - for key in args: - if not (key in ignore_keys): - stash_args[key] = args[key] - return str(stash_args) - - -def _stash_remove(cmd_key, title): - commands = keep_utils.read_commands() - if commands is not None and cmd_key in commands: - keep_utils.remove_command(cmd_key) - print('\n{bold}{green}"{title}" removed from stash.{end_format}\n'.format( - bold=BOLD, - green=GREEN, - title=title, - end_format=END_FORMAT)) - else: - print('\n{bold}{red}"{title}" not found in stash.{end_format}\n'.format( - bold=BOLD, - red=RED, - title=title, - end_format=END_FORMAT)) - - -def _stash_save(cmd_key, title, answer): - try: - keep_utils.save_command(cmd_key, answer, title) - except FileNotFoundError: - os.system('keep init') - keep_utils.save_command(cmd_key, answer, title) - finally: - print_stash() - +from . import __version__ -def _parse_cmd(args, res): - answer = _format_answers(res, args) - cmd_key = _get_stash_key(args) - title = ''.join(args['query']) - if args[STASH_SAVE]: - _stash_save(cmd_key, title, answer) - return '' - - if args[STASH_REMOVE]: - _stash_remove(cmd_key, title) - return '' - return answer - - -def howdoi(raw_query): - args = raw_query - if type(raw_query) is str: # you can pass either a raw or a parsed query - parser = get_parser() - args = vars(parser.parse_args(raw_query.split(' '))) - - args['query'] = ' '.join(args['query']).replace('?', '') - cache_key = _get_cache_key(args) - - if _is_help_query(args['query']): - return _get_help_instructions() + '\n' - - res = cache.get(cache_key) - - if res: - return _parse_cmd(args, res) - - try: - res = _get_answers(args) - if not res: - res = {"error": "Sorry, couldn\'t find any help with that topic\n"} - cache.set(cache_key, res) - except (ConnectionError, SSLError): - return {"error": "Failed to establish network connection\n"} - finally: - return _parse_cmd(args, res) - - -def get_parser(): - parser = argparse.ArgumentParser(description='instant coding answers via the command line') - parser.add_argument('query', metavar='QUERY', type=str, nargs='*', help='the question to answer') - parser.add_argument('-p', '--pos', help='select answer in specified position (default: 1)', default=1, type=int) - parser.add_argument('-a', '--all', help='display the full text of the answer', action='store_true') - parser.add_argument('-l', '--link', help='display only the answer link', action='store_true') - parser.add_argument('-c', '--color', help='enable colorized output', action='store_true') - parser.add_argument('-n', '--num-answers', help='number of answers to return', default=1, type=int) - parser.add_argument('-C', '--clear-cache', help='clear the cache', - action='store_true') - parser.add_argument('-j', '--json-output', help='return answers in raw json format', - action='store_true') - parser.add_argument('-v', '--version', help='displays the current version of howdoi', - action='store_true') - parser.add_argument('-e', '--engine', help='change search engine for this query only (google, bing, duckduckgo)', - dest='search_engine', nargs="?", default='google') - parser.add_argument('--save', help='stash a howdoi answer', - action='store_true') - parser.add_argument('--view', help='view your stash', - action='store_true') - parser.add_argument('--remove', help='remove an entry in your stash', - action='store_true'), - parser.add_argument('--empty', help='empty your stash', - action='store_true') - return parser - - -def prompt_stash_remove(args, stash_list, view_stash = True): - if view_stash: - print_stash(stash_list) - - last_index = len(stash_list) - prompt = "{bold}> Select a stash command to remove [1-{last_index}] (0 to cancel): {end_format}".format( - bold=BOLD, - last_index=last_index, - end_format=END_FORMAT) - user_input = input(prompt) - - try: - user_input = int(user_input) - if user_input == 0: - return - elif user_input < 1 or user_input > last_index: - print("\n{red}Input index is invalid.{end_format}".format(red=RED, end_format=END_FORMAT)) - prompt_stash_remove(args, stash_list, False) - return - cmd = stash_list[user_input - 1] - cmd_key = cmd['command'] - cmd_name = cmd['fields']['alias'] - _stash_remove(cmd_key, cmd_name) - return - except ValueError: - print("\n{red}Invalid input. Must specify index of command.{end_format}".format(red=RED, end_format=END_FORMAT)) - prompt_stash_remove(args, stash_list, False) - return +gc.disable() # noqa: E402 def command_line_runner(): + plugins = HowDoi("plugins").plugins parser = get_parser() + args = vars(parser.parse_args()) - if args['version']: + if args["version"]: _print_ok(__version__) return - if args['clear_cache']: + if args["clear_cache"]: if _clear_cache(): - _print_ok('Cache cleared successfully') + _print_ok(" Cache cleared successfully") else: - _print_err('Clearing cache failed') - return + _print_err(" Clearing cache failed") if args[STASH_VIEW]: print_stash() return if args[STASH_EMPTY]: - os.system('keep init') + os.system("keep init") return - if args[STASH_REMOVE] and len(args['query']) == 0: + if args[STASH_REMOVE] and len(args["query"]) == 0: commands = keep_utils.read_commands() if commands is None or len(commands.items()) == 0: - print('No commands found in stash. Add a command with "howdoi --{stash_save} ".'.format(stash_save=STASH_SAVE)) + print( + 'No commands found in stash. Add a command with "howdoi --{stash_save} ".'.format( + stash_save=STASH_SAVE + ) + ) return - stash_list = [{'command': cmd, 'fields': field} for cmd, field in commands.items()] + stash_list = [ + {"command": cmd, "fields": field} for cmd, field in commands.items() + ] prompt_stash_remove(args, stash_list) return - if not args['query']: + if not args["query"]: parser.print_help() return - if os.getenv('HOWDOI_COLORIZE'): - args['color'] = True + if os.getenv("HOWDOI_COLORIZE"): + args["color"] = True - if not args['search_engine'] in SUPPORTED_SEARCH_ENGINES: - _print_err('Unsupported engine.\nThe supported engines are: %s' % ', '.join(SUPPORTED_SEARCH_ENGINES)) + if not args["search_engine"] in SUPPORTED_SEARCH_ENGINES: + _print_err( + "Unsupported engine.\nThe supported engines are: %s" + % ", ".join(SUPPORTED_SEARCH_ENGINES) + ) return - elif args['search_engine'] != 'google': - os.environ['HOWDOI_SEARCH_ENGINE'] = args['search_engine'] - - utf8_result = howdoi(args).encode('utf-8', 'ignore') - if sys.version < '3': - print(utf8_result) - else: - # Write UTF-8 to stdout: https://stackoverflow.com/a/3603160 - sys.stdout.buffer.write(utf8_result) - # close the session to release connection + elif args["search_engine"] != "google": + os.environ["HOWDOI_SEARCH_ENGINE"] = args["search_engine"] + + # Walk plugin directory and run individual plugins against query. + # TODO: (mwizasimbeye11) Enable default plugins. + # TODO: (mwizasimbeye11) Check if args for specific plugins have been passed. + for plugin in plugins: + plugin.raw_query = args + utf8_result = plugin.howdoi(args, parser, cache).encode("utf-8", "ignore") + + # utf8_result = howdoi(args).encode("utf-8", "ignore") + if sys.version < "3": + print(utf8_result) + else: + # Write UTF-8 to stdout: https://stackoverflow.com/a/3603160 + sys.stdout.buffer.write(utf8_result) + # close the session to release connection howdoi_session.close() -if __name__ == '__main__': +class Plugin(object): + """Base class that each plugin must inherit from. within this class + you must define the methods that all of your plugins must implement + """ + + def __init__(self): + self.description = "UNKNOWN" + self.raw_query = "" + + def _get_answers(self, args): + return NotImplementedError + + def _get_answer(self, args, links): + return NotImplementedError + + def howdoi(self, raw_query, parser): + return NotImplementedError + + +class HowDoi: + """Walks the default plugin folder and looks for plugins and applys the query to it. + """ + + def __init__(self, plugin_package): + """Constructor that initiates the reading of all available plugins + when an instance of the PluginCollection object is created + """ + self.plugin_package = plugin_package + self.reload_plugins() + + def reload_plugins(self): + """Reset the list of all plugins and initiate the walk over the main + provided plugin package to load all available plugins + """ + self.plugins = [] + self.seen_paths = [] + print() + print(f"Looking for plugins under package {self.plugin_package}") + self.walk_package(self.plugin_package) + + def walk_package(self, package): + """Recursively walk the supplied package to retrieve all plugins + """ + imported_package = __import__(package, globals(), fromlist=["blah"], level=1) + + for _, pluginname, ispkg in pkgutil.iter_modules( + imported_package.__path__, imported_package.__name__ + "." + ): + if not ispkg: + plugin_module = __import__(pluginname, globals(), fromlist=["blah"]) + clsmembers = inspect.getmembers(plugin_module, inspect.isclass) + for (_, c) in clsmembers: + # TODO: Only add classes that are a sub class of Plugin, but NOT Plugin itself + # TODO: use issubclass() function to check if plugin is subclass. + # if issubclass(Plugin, c): + if str(c.__base__.__name__) == "Plugin": + print(f" Found plugin class: {c.__module__}.{c.__name__}") + self.plugins.append(c()) + + # Now that we have looked at all the modules in the current package, start looking + # recursively for additional modules in sub packages + all_current_paths = [] + if isinstance(imported_package.__path__, str): + all_current_paths.append(imported_package.__path__) + else: + all_current_paths.extend([x for x in imported_package.__path__]) + + for pkg_path in all_current_paths: + if pkg_path not in self.seen_paths: + self.seen_paths.append(pkg_path) + + # Get all sub directory of the current package path directory + child_pkgs = [ + p + for p in os.listdir(pkg_path) + if os.path.isdir(os.path.join(pkg_path, p)) + ] + + # For each sub directory, apply the walk_package method recursively + for child_pkg in child_pkgs: + self.walk_package(package + "." + child_pkg) + + +if __name__ == "__main__": command_line_runner() diff --git a/howdoi/plugins/core.py b/howdoi/plugins/core.py new file mode 100644 index 000000000..f2b67913d --- /dev/null +++ b/howdoi/plugins/core.py @@ -0,0 +1,123 @@ +from pyquery import PyQuery as pq + +from howdoi.howdoi import Plugin +from howdoi.utils import ( + ANSWER_HEADER, + NO_ANSWER_MSG, + STAR_HEADER, + SSLError, + _clear_cache, + _format_output, + _get_cache_key, + _get_help_instructions, + _get_links_with_cache, + _get_result, + _is_help_query, + _parse_cmd, + cache, + get_link_at_pos, + get_text +) + + +class StackOverflow(Plugin): + """Default howdoi plugin that queries StackOverflow. + """ + + def __init__(self): + super().__init__() + self.description = "StackOverflow Plugin" + + def _get_answer(self, args, links): + link = get_link_at_pos(links, args["pos"]) + if not link: + return False + + cache_key = link + page = cache.get(link) + if not page: + page = _get_result(link + "?answertab=votes") + cache.set(cache_key, page) + + html = pq(page) + + first_answer = html(".answer").eq(0) + + instructions = first_answer.find("pre") or first_answer.find("code") + args["tags"] = [t.text for t in html(".post-tag")] + + if not instructions and not args["all"]: + text = get_text(first_answer.find(".post-text").eq(0)) + elif args["all"]: + texts = [] + for html_tag in first_answer.items(".post-text > *"): + current_text = get_text(html_tag) + if current_text: + if html_tag[0].tag in ["pre", "code"]: + texts.append(_format_output(current_text, args)) + else: + texts.append(current_text) + text = "\n".join(texts) + else: + text = _format_output(get_text(instructions.eq(0)), args) + if text is None: + text = NO_ANSWER_MSG + text = text.strip() + return text + + def _get_answers(self, args): + """ + @args: command-line arguments + returns: array of answers and their respective metadata + False if unable to get answers + """ + + question_links = _get_links_with_cache(args["query"]) + if not question_links: + return False + + answers = [] + initial_position = args["pos"] + multiple_answers = args["num_answers"] > 1 or args["all"] + + for answer_number in range(args["num_answers"]): + current_position = answer_number + initial_position + args["pos"] = current_position + link = get_link_at_pos(question_links, current_position) + answer = self._get_answer(args, question_links) + if not answer: + continue + if not args["link"] and not args["json_output"] and multiple_answers: + answer = ANSWER_HEADER.format(link, answer, STAR_HEADER) + answer += "\n" + answers.append( + {"answer": answer, "link": link, "position": current_position} + ) + + return answers + + def howdoi(self, raw_query, parser, cache): + args = raw_query + if type(raw_query) is str: # you can pass either a raw or a parsed query + args = vars(parser.parse_args(raw_query.split(" "))) + + args["query"] = " ".join(args["query"]).replace("?", "") + cache_key = _get_cache_key(args) + + if _is_help_query(args["query"]): + return _get_help_instructions() + "\n" + + res = cache.get(cache_key) + + if res: + return _parse_cmd(args, res) + + try: + res = self._get_answers(args) + if not res: + res = {"error": "Sorry, couldn't find any help with that topic\n"} + cache.set(cache_key, res) + except (ConnectionError, SSLError): + return {"error": "Failed to establish network connection\n"} + finally: + return _parse_cmd(args, res) diff --git a/howdoi/utils.py b/howdoi/utils.py new file mode 100644 index 000000000..c27719d06 --- /dev/null +++ b/howdoi/utils.py @@ -0,0 +1,478 @@ +import argparse +import inspect +import json +import re + +import requests +from keep import utils as keep_utils +from pygments import highlight +from pygments.formatters.terminal import TerminalFormatter +from pygments.lexers import get_lexer_by_name, guess_lexer +from pygments.util import ClassNotFound +from pyquery import PyQuery as pq +from requests.exceptions import ConnectionError, SSLError +from .import __version__ + +from howdoi.config import * + + +# rudimentary standardized 3-level log output +def _print_err(x): + print("[ERROR] " + x) + + +_print_ok = print # noqa: E305 + + +def _print_dbg(x): + print("[DEBUG] " + x) # noqa: E302 + + +howdoi_session = requests.session() + + +class BlockError(RuntimeError): + pass + + +def _random_int(width): + bres = os.urandom(width) + if sys.version < "3": + ires = int(bres.encode("hex"), 16) + else: + ires = int.from_bytes(bres, "little") + + return ires + + +def _random_choice(seq): + return seq[_random_int(1) % len(seq)] + + +def get_proxies(): + proxies = getproxies() + filtered_proxies = {} + for key, value in proxies.items(): + if key.startswith("http"): + if not value.startswith("http"): + filtered_proxies[key] = "http://%s" % value + else: + filtered_proxies[key] = value + return filtered_proxies + + +def _format_url_to_filename(url, file_ext="html"): + filename = "".join(ch for ch in url if ch.isalnum()) + return filename + "." + file_ext + + +def _get_result(url): + try: + return howdoi_session.get( + url, + headers={"User-Agent": _random_choice(USER_AGENTS)}, + proxies=get_proxies(), + verify=VERIFY_SSL_CERTIFICATE, + ).text + except requests.exceptions.SSLError as e: + _print_err( + "Encountered an SSL Error. Try using HTTP instead of " + 'HTTPS by setting the environment variable "HOWDOI_DISABLE_SSL".\n' + ) + raise e + + +def _add_links_to_text(element): + hyperlinks = element.find("a") + + for hyperlink in hyperlinks: + pquery_object = pq(hyperlink) + href = hyperlink.attrib["href"] + copy = pquery_object.text() + if copy == href: + replacement = copy + else: + replacement = "[{0}]({1})".format(copy, href) + pquery_object.replace_with(replacement) + + +def get_text(element): + """ return inner text in pyquery element """ + _add_links_to_text(element) + try: + return element.text(squash_space=False) + except TypeError: + return element.text() + + +def _extract_links_from_bing(html): + html.remove_namespaces() + return [a.attrib["href"] for a in html(".b_algo")("h2")("a")] + + +def _extract_links_from_google(html): + return [a.attrib["href"] for a in html(".l")] or [ + a.attrib["href"] for a in html(".r")("a") + ] + + +def _extract_links_from_duckduckgo(html): + html.remove_namespaces() + links_anchors = html.find("a.result__a") + results = [] + for anchor in links_anchors: + link = anchor.attrib["href"] + url_obj = urlparse(link) + parsed_url = parse_qs(url_obj.query).get("uddg", "") + if parsed_url: + results.append(parsed_url[0]) + return results + + +def _extract_links(html, search_engine): + if search_engine == "bing": + return _extract_links_from_bing(html) + if search_engine == "duckduckgo": + return _extract_links_from_duckduckgo(html) + return _extract_links_from_google(html) + + +def _get_search_url(search_engine): + return SEARCH_URLS.get(search_engine, SEARCH_URLS["google"]) + + +def _is_blocked(page): + for indicator in BLOCK_INDICATORS: + if page.find(indicator) != -1: + return True + + return False + + +def _get_links(query): + search_engine = os.getenv("HOWDOI_SEARCH_ENGINE", "google") + search_url = _get_search_url(search_engine) + + result = _get_result(search_url.format(URL, url_quote(query))) + if _is_blocked(result): + _print_err( + "Unable to find an answer because the search engine temporarily blocked the request. " + "Please wait a few minutes or select a different search engine." + ) + raise BlockError("Temporary block by search engine") + + html = pq(result) + return _extract_links(html, search_engine) + + +def get_link_at_pos(links, position): + if not links: + return False + + if len(links) >= position: + link = links[position - 1] + else: + link = links[-1] + return link + + +def _format_output(code, args): + if not args["color"]: + return code + lexer = None + + # try to find a lexer using the StackOverflow tags + # or the query arguments + for keyword in args["query"].split() + args["tags"]: + try: + lexer = get_lexer_by_name(keyword) + break + except ClassNotFound: + pass + + # no lexer found above, use the guesser + if not lexer: + try: + lexer = guess_lexer(code) + except ClassNotFound: + return code + + return highlight(code, lexer, TerminalFormatter(bg="dark")) + + +def _is_question(link): + for fragment in BLOCKED_QUESTION_FRAGMENTS: + if fragment in link: + return False + return re.search(r"questions/\d+/", link) + + +def _get_questions(links): + return [link for link in links if _is_question(link)] + + +def _get_links_with_cache(query): + cache_key = query + "-links" + res = cache.get(cache_key) + if res: + if res == CACHE_EMPTY_VAL: + res = False + return res + + links = _get_links(query) + if not links: + cache.set(cache_key, CACHE_EMPTY_VAL) + + question_links = _get_questions(links) + cache.set(cache_key, question_links or CACHE_EMPTY_VAL) + + return question_links + + +def build_splitter(splitter_character="=", splitter_length=80): + return "\n" + splitter_character * splitter_length + "\n\n" + + +def _clear_cache(): + global cache + if not cache: + cache = FileSystemCache(CACHE_DIR, CACHE_ENTRY_MAX, 0) + + return cache.clear() + + +def _is_help_query(query): + return any([query.lower() == help_query for help_query in SUPPORTED_HELP_QUERIES]) + + +def _format_answers(res, args): + if "error" in res: + return res["error"] + + if args["json_output"]: + return json.dumps(res) + + formatted_answers = [] + + for answer in res: + next_ans = answer["answer"] + if args["link"]: # if we only want links + next_ans = answer["link"] + formatted_answers.append(next_ans) + + return build_splitter().join(formatted_answers) + + +def _get_help_instructions(): + instruction_splitter = build_splitter(" ", 60) + query = "print hello world in python" + instructions = [ + "Here are a few popular howdoi commands ", + ">>> howdoi {} (default query)", + ">>> howdoi {} -a (read entire answer)", + ">>> howdoi {} -n [number] (retrieve n number of answers)", + ">>> howdoi {} -l (display only a link to where the answer is from", + ">>> howdoi {} -c (Add colors to the output)", + ">>> howdoi {} -e (Specify the search engine you want to use e.g google,bing)", + ] + + instructions = map(lambda s: s.format(query), instructions) + + return instruction_splitter.join(instructions) + + +def _get_cache_key(args): + return str(args) + __version__ + + +def format_stash_item(fields, index=-1): + title = fields["alias"] + description = fields["desc"] + item_num = index + 1 + if index == -1: + return "{underline}{bold}$ {title}{end_format}\n\n{description}\n".format( + underline=UNDERLINE, + bold=BOLD, + title=title, + end_format=END_FORMAT, + description=description, + ) + return "{underline}{bold}$ [{item_num}] {title}{end_format}\n\n{description}\n".format( + underline=UNDERLINE, + bold=BOLD, + item_num=item_num, + title=title, + end_format=END_FORMAT, + description=description, + ) + + +def print_stash(stash_list=[]): + if len(stash_list) == 0: + stash_list = ["\nSTASH LIST:"] + commands = keep_utils.read_commands() + if commands is None or len(commands.items()) == 0: + print( + 'No commands found in stash. Add a command with "howdoi --{stash_save} ".'.format( + stash_save=STASH_SAVE + ) + ) + return + for cmd, fields in commands.items(): + stash_list.append(format_stash_item(fields)) + else: + stash_list = [ + format_stash_item(x["fields"], i) for i, x in enumerate(stash_list) + ] + print(build_splitter("#").join(stash_list)) + + +def _get_stash_key(args): + stash_args = {} + ignore_keys = [ + STASH_SAVE, + STASH_VIEW, + STASH_REMOVE, + STASH_EMPTY, + "tags", + ] # ignore these for stash key. + for key in args: + if not (key in ignore_keys): + stash_args[key] = args[key] + return str(stash_args) + + +def _stash_remove(cmd_key, title): + commands = keep_utils.read_commands() + if commands is not None and cmd_key in commands: + keep_utils.remove_command(cmd_key) + print( + '\n{bold}{green}"{title}" removed from stash.{end_format}\n'.format( + bold=BOLD, green=GREEN, title=title, end_format=END_FORMAT + ) + ) + else: + print( + '\n{bold}{red}"{title}" not found in stash.{end_format}\n'.format( + bold=BOLD, red=RED, title=title, end_format=END_FORMAT + ) + ) + + +def _stash_save(cmd_key, title, answer): + try: + keep_utils.save_command(cmd_key, answer, title) + except FileNotFoundError: + os.system("keep init") + keep_utils.save_command(cmd_key, answer, title) + finally: + print_stash() + + +def _parse_cmd(args, res): + answer = _format_answers(res, args) + cmd_key = _get_stash_key(args) + title = "".join(args["query"]) + if args[STASH_SAVE]: + _stash_save(cmd_key, title, answer) + return "" + + if args[STASH_REMOVE]: + _stash_remove(cmd_key, title) + return "" + return answer + + +def get_parser(): + parser = argparse.ArgumentParser( + description="instant coding answers via the command line" + ) + parser.add_argument( + "query", metavar="QUERY", type=str, nargs="*", help="the question to answer" + ) + parser.add_argument( + "-p", + "--pos", + help="select answer in specified position (default: 1)", + default=1, + type=int, + ) + parser.add_argument( + "-a", "--all", help="display the full text of the answer", action="store_true" + ) + parser.add_argument( + "-l", "--link", help="display only the answer link", action="store_true" + ) + parser.add_argument( + "-c", "--color", help="enable colorized output", action="store_true" + ) + parser.add_argument( + "-n", "--num-answers", help="number of answers to return", default=1, type=int + ) + parser.add_argument( + "-C", "--clear-cache", help="clear the cache", action="store_true" + ) + parser.add_argument( + "-j", + "--json-output", + help="return answers in raw json format", + action="store_true", + ) + parser.add_argument( + "-v", + "--version", + help="displays the current version of howdoi", + action="store_true", + ) + parser.add_argument( + "-e", + "--engine", + help="change search engine for this query only (google, bing, duckduckgo)", + dest="search_engine", + nargs="?", + default="google", + ) + parser.add_argument("--save", help="stash a howdoi answer", action="store_true") + parser.add_argument("--view", help="view your stash", action="store_true") + parser.add_argument( + "--remove", help="remove an entry in your stash", action="store_true" + ), + parser.add_argument("--empty", help="empty your stash", action="store_true") + return parser + + +def prompt_stash_remove(args, stash_list, view_stash=True): + if view_stash: + print_stash(stash_list) + + last_index = len(stash_list) + prompt = "{bold}> Select a stash command to remove [1-{last_index}] (0 to cancel): {end_format}".format( + bold=BOLD, last_index=last_index, end_format=END_FORMAT + ) + user_input = input(prompt) + + try: + user_input = int(user_input) + if user_input == 0: + return + elif user_input < 1 or user_input > last_index: + print( + "\n{red}Input index is invalid.{end_format}".format( + red=RED, end_format=END_FORMAT + ) + ) + prompt_stash_remove(args, stash_list, False) + return + cmd = stash_list[user_input - 1] + cmd_key = cmd["command"] + cmd_name = cmd["fields"]["alias"] + _stash_remove(cmd_key, cmd_name) + return + except ValueError: + print( + "\n{red}Invalid input. Must specify index of command.{end_format}".format( + red=RED, end_format=END_FORMAT + ) + ) + prompt_stash_remove(args, stash_list, False) + return