Skip to content

Commit 768ff31

Browse files
committed
4.4.2 release
1 parent 214a667 commit 768ff31

File tree

1 file changed

+86
-27
lines changed

1 file changed

+86
-27
lines changed

content/administration/enrichments.md

Lines changed: 86 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,74 +36,133 @@ A DAG to enrich an IPv4 entity can have:
3636

3737
You are limited only by what you can do programmatically in python and using APIs
3838

39-
## Sample DAG
39+
## Creating your first DAG
4040
Below is a simple DAG workflow you can use as a starting place. It's made for enrichment of an IPv4 address (but could be modified for any entity type). Based on how you set up your Airflow instance and define your DAG endpoints, you should name this script to match the endpoint. In this case, where we define the endpoint (see workflow above) as `/api/v1/dags/scot_entity_[ENTITY_TYPE_PLACEHOLDER]_enrichment/dagRuns`, and assuming you have an entity type named `ipaddr`, your DAG should be named: `scot_entity_ipaddr_enrichment`
4141

42+
43+
### MaxMind DB
44+
This enrichment takes a given IPv4 and performs a lookup in a database file from MaxMind that can be stored locally on the same server where your Airflow instance is running. You can create an account under MaxMind's GeoLite2 service (https://www.maxmind.com/en/geolite2/signup) which will allow access to their db files which you can down load and use. In this particular example, the database file we expect to use is 'GeoLite2-City.mmdb'. Once you have this file, place it somewhere on your airlow server and note the file path for later.
45+
46+
47+
### Sample DAG code
48+
49+
See the Airflow documentation for creating a new DAG. Copy the following code and save as described in the section above "Creating your first DAG"
50+
4251
```
43-
import os
52+
from airflow.utils.task_group import TaskGroup
4453
import json
4554
import pendulum
46-
47-
from airflow.utils.task_group import TaskGroup
4855
from airflow.decorators import task, dag, task_group
4956
from airflow.models import Variable
50-
from airflow.operators.bash import BashOperator
5157
from airflow.operators.python import get_current_context
5258
from airflow.models.log import Log
5359
from airflow.utils.db import create_session
54-
from airflow.models import Variable
60+
from airflow.timetables.trigger import CronTriggerTimetable
61+
import logging
62+
import os
5563
5664
@dag(
57-
schedule_interval=None,
65+
timetable=None, ## Add your schedule interval here, any valid cron notation (https://en.wikipedia.org/wiki/Cron) works here. Use CronTriggerTimeTable like this example: CronTriggerTimetable("0 */4 * * *", timezone="UTC")
5866
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
5967
catchup=False,
60-
tags=['author:user', 'scot4_enrichment'],
68+
tags=['author:example', 'example_from_template'],
6169
doc_md=open(f"{os.path.dirname(os.path.realpath(__file__))}/README.md").read(),
62-
params={'entity_id': None, 'entity_value': '192.168.1.1', 'callback_url':None}
70+
## params is where you can add other parameters to each triggered DAG's configuration
71+
params={'entity_id': None, 'entity_value': '192.168.1.1', 'db_path': None, 'callback_url':None}
6372
)
64-
def scot4_entity_ipaddr_enrichment():
73+
74+
def scot4_simple_example():
6575
@task
66-
def task1_enrichment():
76+
def enrichment_task():
6777
import pandas as pd
78+
# Use logger to get create info/debug/error messages for the DAG instead of the print function
79+
logger = logging.getLogger(__name__)
6880
# get the IP address from the Airflow context
6981
context = get_current_context()
7082
ip_addr = context['params'].get('entity_value')
71-
# query enrichment data for IP from 3rd party API service
72-
enr_data = get_data_from_api(ip_addr)
83+
# query enrichment data for IP from 3rd party/API service
84+
enr_data = get_enr_data(ip_addr)
85+
logger.debug(f'enr_data results: {enr_data}')
7386
# convert any results to markdown so SCOT can parse it
7487
markdown = pd.DataFrame.from_records([enr_data]).T.to_markdown()
7588
# use this schema for SCOT parsing (title value = tab name in the flair pane)
76-
enrichment_data = { 'title': 'Data Enrichment Summary',
89+
enrichment_data = { 'title': 'Geo IP Summary',
7790
'enrichment_class': 'markdown',
78-
'description': 'Summary from API being used',
91+
'description': 'enrichment via MaxMind direct query',
7992
'data': {'markdown': markdown,}
8093
}
81-
return {'entity_class_ids': entity_class_ids, 'enrichment_data': [enrichment_data]}
82-
83-
84-
@task
94+
return {'enrichment_data': [enrichment_data]}
95+
96+
97+
def get_enr_data(ip_addr):
98+
import geoip2.database
99+
logger = logging.getLogger(__name__) # use for debugging
100+
context = get_current_context()
101+
## This should be the file path to the GeoLite2-City.mmdb file on your airflow server
102+
db_path = context['params'].get('db_path')
103+
reader = geoip2.database.Reader(db_path)
104+
try:
105+
response = reader.city(ip_addr)
106+
# logger.debug(f'response content: {response}') # uncomment for debugging
107+
# use response.raw if running an older version of geoip2
108+
result = response.to_dict
109+
# result = response.raw
110+
reader.close()
111+
return result
112+
except:
113+
logger.error(f'failed to perform lookup')
114+
reader.close()
115+
# Throw an error and Airflow will stop and return nothing
116+
raise Exception(f"failed to read enrichment data")
117+
## you could optionally return an error to populate in the IP's enrichment pane of the flair modal
118+
# return {"data": "unable to find IP in database"}
119+
120+
121+
@task
85122
def add_enrichment_to_scot(results):
86123
import requests
87-
88124
context = get_current_context()
89125
callback_url = context['params'].get('callback_url')
90-
api_key = Variable.get('scot4-instance-api-key')
91-
126+
api_key = None
127+
## Airflow can store secrets, use that to store an API key for your SCOT instance and retrieve it using Variable.get()
128+
## if you have a second/multiple SCOT instances you can store multiple API keys and retrieve the correct one based on which callback URL is being given
129+
if callback_url is not None and "scot4-prod" in callback_url:
130+
api_key = Variable.get('scot4-api-key')
131+
elif callback_url is not None and "scot4-test" in callback_url:
132+
api_key = Variable.get('scot4-test-api-key')
133+
92134
if callback_url is not None and results.get('enrichment_data') is not None and len(results['enrichment_data']) > 0:
93135
for enrichment_data in results['enrichment_data']:
94136
context = get_current_context()
95137
entity_id = context['params'].get('entity_id')
96138
url = f"{callback_url}/entity/{entity_id}/enrichment"
97-
res = requests.post(url, data=json.dumps(enrichment_data), headers=
139+
res = requests.post(url, data=json.dumps(enrichment_data), headers={
98140
'Content-Type':'application/json',
99141
'Authorization': f'apikey {api_key}',
100142
})
101143
if not res.ok:
102144
raise Exception(f"Request to {url} failed: {res.status_code} {res.reason}: {res.text}")
145+
print(f"Request to {url} succeeded: {res.status_code} {res.reason}: {res.text}")
146+
147+
148+
## This is where we define the flow of the DAG.
149+
## See https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
150+
## for more information about defining the control flow of DAGs.
151+
152+
task_results = enrichment_task()
153+
add_enrichment_to_scot(task_results)
154+
155+
156+
dag = scot4_simple_example()
157+
```
158+
103159

160+
### Testing your DAG
161+
See the Airflow documentation for how to trigger a DAG from the Airflow UI. When you tigger a DAG, you're asked to supply values for the params that are specified in the code. You should already have the entity you want to test with registered/flaired in SCOT so that it has an entity ID.
104162

105-
task1_results = task1_enrichment()
106-
add_enrichment_to_scot(task1_results)
163+
- 'entity_id': The ID of the entity you're testing
164+
- 'entity_value': The value 0f the entity (try the IP for maxmind.com)
165+
- 'db_path': /the/file/path/on/your/airflow/server/to/GeoLite2-City.mmdb
166+
- 'callback_url': the full URL to your SCOT API as defined in your SCOT-API config settings (ex. https://scot4-test.domain.com/api/v1)
107167

108-
dag = scot4_entity_ipaddr_enrichment()
109-
```
168+
Once you enter these fields and run your DAG, you'll see the progress and whether the perations completed successfully (green) or not (red). If the DAG failed, click on the red box indicating the portion of the DAG that failed and then select the log tab on the right to check for any errors. See the Airflow docs for details.

0 commit comments

Comments
 (0)