This repository has been archived by the owner on Sep 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
run.py
executable file
·72 lines (58 loc) · 2.16 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import yaml
import base64
from flask import Flask, render_template, request
from sigma.conversion.base import Backend
from sigma.plugins import InstalledSigmaPlugins
from sigma.collection import SigmaCollection
from sigma.exceptions import SigmaError
app = Flask(__name__)
plugins = InstalledSigmaPlugins.autodiscover()
backends = plugins.backends
pipeline_resolver = plugins.get_pipeline_resolver()
pipelines = list(pipeline_resolver.list_pipelines())
with open('requirements.txt', 'r') as f:
requirements = f.read()
@app.route('/')
def home():
formats = []
for backend in backends.keys():
for name, description in plugins.backends[backend].formats.items():
formats.append({"name": name, "description": description, "backend": backend})
for name, pipeline in pipelines:
if len(pipeline.allowed_backends) > 0:
pipeline.backends = ", ".join(pipeline.allowed_backends)
else:
pipeline.backends = "all"
return render_template('index.html', backends=backends, pipelines=pipelines, formats=formats, requirements=requirements)
@app.route('/sigma', methods=['POST'])
def convert():
# get params from request
rule = str(base64.b64decode(request.json['rule']), "utf-8")
# check if input is valid yaml
try:
yaml.safe_load(rule)
except:
print("error")
return ("Error: No valid yaml input")
pipeline = []
if request.json['pipeline']:
for p in request.json['pipeline']:
pipeline.append(p)
target = request.json['target']
format = request.json['format']
backend_class = backends[target]
processing_pipeline = pipeline_resolver.resolve(pipeline)
backend : Backend = backend_class(processing_pipeline=processing_pipeline)
try:
sigma_rule = SigmaCollection.from_yaml(rule)
result = backend.convert(sigma_rule, format)
if isinstance(result, list):
result = result[0]
except SigmaError as e:
return "Error: " + str(e)
return result
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8000)))