From e9a499f91bd95a27be7a1689ae68bfd70701a6c7 Mon Sep 17 00:00:00 2001 From: Dara Dermody Date: Fri, 26 Jun 2020 12:19:57 +0100 Subject: [PATCH] Support synchronous running of modules and locking for concurrency --- .gitignore | 1 + recon/core/framework.py | 6 ++++++ recon/core/module.py | 1 + recon/core/tasks.py | 35 +++++++++++++++++++++++++++++++++-- recon/core/web/api.py | 30 +++++++++++++++++++++++------- 5 files changed, 64 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 139865cd..3cdc60d0 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ *sublime* venv/ scripts/ +.idea diff --git a/recon/core/framework.py b/recon/core/framework.py index 937f7a02..81ddf933 100644 --- a/recon/core/framework.py +++ b/recon/core/framework.py @@ -117,6 +117,7 @@ class Framework(cmd.Cmd): _record = None _spool = None _summary_counts = {} + _inserted_data = {} def __init__(self, params): cmd.Cmd.__init__(self) @@ -637,6 +638,11 @@ def insert(self, table, data, unique_columns=[]): query = f"INSERT INTO `{table}` (`{columns_str}`) SELECT {placeholder_str} WHERE NOT EXISTS(SELECT * FROM `{table}` WHERE {unique_columns_str})" values = tuple([data[column] for column in columns] + [data[column] for column in unique_columns]) + # record data for inclusion in task info + if table not in self._inserted_data: + self._inserted_data[table] = [] + self._inserted_data[table].append(data) + # query the database rowcount = self.query(query, values) diff --git a/recon/core/module.py b/recon/core/module.py index 7222b9bf..13af7d8f 100644 --- a/recon/core/module.py +++ b/recon/core/module.py @@ -328,6 +328,7 @@ def run(self): self._validate_options() self._validate_input() self._summary_counts = {} + self._inserted_data = {} pre = self.module_pre() params = [pre] if pre is not None else [] # provide input if a default query is specified in the module diff --git a/recon/core/tasks.py b/recon/core/tasks.py index 5e670f58..1723bfcc 100644 --- a/recon/core/tasks.py +++ b/recon/core/tasks.py @@ -1,12 +1,15 @@ from recon.core import base from recon.core.web.db import Tasks from rq import get_current_job +from threading import Lock import traceback # These tasks exist outside the web directory to avoid loading the entire # application (which reloads the framework) on every task execution. -def run_module(workspace, module): +execution_locks = {} + +def run_module_async(workspace, module, options=None): results = {} try: @@ -19,7 +22,8 @@ def run_module(workspace, module): tasks.update_task(job.get_id(), status=job.get_status()) # execute the task module = recon._loaded_modules.get(module) - module.run() + run_module_with_lock(module, options) + except Exception as e: results['error'] = { 'type': str(type(e)), @@ -27,6 +31,33 @@ def run_module(workspace, module): 'traceback': traceback.format_exc(), } results['summary'] = module._summary_counts + results['data'] = module._inserted_data # update the task's status and results tasks.update_task(job.get_id(), status='finished', result=results) return results + +def run_module_sync(workspace, module_path, options=None): + recon = base.Recon(check=False, analytics=False, marketplace=False) + recon.start(base.Mode.JOB, workspace=workspace) + module = recon._loaded_modules.get(module_path) + run_module_with_lock(module, options) + return { + 'summary': module._summary_counts, + 'data': module._inserted_data + } + + +def run_module_with_lock(module, options=None): + if module._modulename not in execution_locks: + execution_locks[module._modulename] = Lock() + + with execution_locks[module._modulename]: + if options is not None: + original_options = module.options.copy() + module.options.update(options) + + try: + module.run() + finally: + if options is not None: + module.options.update(original_options) diff --git a/recon/core/web/api.py b/recon/core/web/api.py index 5d4d67b3..3ea312fa 100644 --- a/recon/core/web/api.py +++ b/recon/core/web/api.py @@ -1,8 +1,10 @@ +import traceback from flask import Blueprint, current_app, request, abort from flask_restful import Resource, Api from recon.core.web import recon, tasks from recon.core.web.utils import columnize from recon.core.web.constants import EXPORTS, REPORTS +from recon.core.tasks import run_module_sync resources = Blueprint('resources', __name__, url_prefix='/api') api = Api() @@ -56,15 +58,29 @@ def post(self): - task ''' path = request.json.get('path') + options = request.json.get('options') + sync = request.args.get('sync') is not None + if not path or path not in recon._loaded_modules: abort(404) - job = current_app.task_queue.enqueue('recon.core.tasks.run_module', current_app.config['WORKSPACE'], path) - tid = job.get_id() - status = job.get_status() - tasks.add_task(tid, status) - return { - 'task': tid, - }, 201 + + if sync: + try: + return run_module_sync(current_app.config['WORKSPACE'], path, options), 200 + except Exception as e: + return { + 'type': str(type(e)), + 'message': str(e), + 'traceback': traceback.format_exc(), + }, 500 + else: + job = current_app.task_queue.enqueue('recon.core.tasks.run_module_async', current_app.config['WORKSPACE'], path, options) + tid = job.get_id() + status = job.get_status() + tasks.add_task(tid, status) + return { + 'task': tid, + }, 201 api.add_resource(TaskList, '/tasks/')