Cloudera Navigator Audit is a tool that collects and consolidates audit records from several Hadoop services. It offers the ability to forward those events into a Kafka topic for downstream consumption.
This example reads audit events from a Kafka topic populated by Navigator Audit and writes them to Kudu. We have also added a HDFS filesystem output for the sake of showing how to handle writes to table partitions on HDFS, but this option comes with caveats (see below).
Audit events are written to Kafka by Navigator Audit as JSON strings. The Envelope pipeline reads this payload through a Kafka input, and then uses the Parse JSON deriver to convert the JSON messages into Dataset records.
The records are then processed by further Envelope derivers, which filter the records for each particular Hadoop service, and the results are forward to the corresponding outputs (Kudu and Filesystem).
This example also demonstrates the necessary configuration to run the Envelope job against a secured cluster. It contains all the required configurations to properly authenticate with Kerbero-enabled clusters.
If you’re running the example against a non-secured cluster, you will have to adjust the following Kafka setting in the configuration file:
-
parameter.security.protocol = SSL
(if TLS is enabled) -
parameter.security.protocol = PLAINTEXT
(if TLS is not enabled)
If authorization has been configured for your cluster you must ensure that the following privileges have been granted to the user who’s running the Envelope job:
-
Read privileges on the Kafka topic containing the audit events
-
Select and insert privileges on the Kudu table used for Kafka offset management
-
Insert privileges on the output Kudu tables (for the Kudu outputs)
-
Read and write permissions on the output HDFS directories (for the HDFS Filesystem outputs)
The example has been configured for a Kafka cluster with TLS encryption enabled. To run it successfully you will need a copy of the JKS truststore for the Kafka cluster, as well as the truststore password, which should be configured in the job configuration file.
If TLS is not being used for Kafka you need to adjust the following settings in the configuration file:
-
parameter.security.protocol = SASL_PLAINTEXT
(if Kerberos is enabled) -
parameter.security.protocol = PLAINTEXT
(if Kerberos is not enabled)
-
Create a directory for the Parquet tables, if it doesn’t already exist.
hdfs dfs -mkdir -p hdfs:///data/nav
Ensure the following permissions have been granted on this directory:
-
Read/write permissions for the user running the Envelope job
-
Read/write permissions for the
hive
group
-
-
Create the required Kudu and Parquet tables using the provided Apache Impala script:
impala-shell -f create_nav_tables.sql \ --var hdfs_base_dir=hdfs:///data/nav \ --verbose
-
Create partitions for the Kudu tables:
./add_kudu_partitions.sh > create_partitions.sql impala-shell --verbose -f create_partitions.sql
-
Create the required Kafka topic:
kafka-topics --zookeeper YOURZOOKEEPERHOSTNAME:2181 --create --topic nav-audit --partitions 4 --replication-factor 3
-
Configure Cloudera Navigator Audit to publish audit events to Kafka.
-
If running in a secured cluster, create a keytab file for the Kerberos principal that will run the example job. Ensure that the permissions of the keytab file are set to
400
. -
If running in a secured cluster, modify
jaas.conf
to set the correct values for thekeyTab
andprincipal
properties. Note thatKEYTABFILENAME
should be just the file name, not the full path. -
Modify
nav-audit.conf
, reviewing all the properties in thevars
section and updating them as per your environment. -
If running in a secured cluster, modify your corresponding submit shell script (see below) to update the
KERBEROS_KEYTAB
andKERBEROS_PRINCIPAL
variables. -
Run the example by submitting the Envelope application using the corresponding provided wrapper script:
-
For secured clusters:
-
That are running CDH6 or other Spark distributions:
./submit-secured.sh
-
That are running CDH5:
./submit-secured-cdh5.sh
-
-
For unsecured clusters:
-
That are running CDH6 or other Spark distributions:
./submit-unsecured.sh
-
That are running CDH5:
./submit-unsecured-cdh5.sh
-
-
In this section we go into a bit more details of the implementation of this example.
The audit records written by Navigator Audit to the Kafka topic are JSON-formatted string containing the attributes of each audit event. The events for all service types share a common set of attributes and each service has additional specific attributes, as shown in the table below. The mapping between the JSON attributes and Impala table columns is also shown below.
Service | JSON Attribute | Table column | Comments |
---|---|---|---|
Generic |
service |
service_name |
|
allowed |
allowed |
||
user |
username |
||
impersonator |
impersonator |
||
ip |
ip_addr |
||
time |
event_time |
||
op |
operation |
||
HBase |
tableName |
table_name |
|
family |
family |
||
qualifier |
qualifier |
||
HDFS |
src |
src |
|
dst |
dest |
||
perms |
permissions |
||
DELEGATION_TOKEN_ID |
delegation_token_id |
Not exposed through the Kafka messages |
|
Hive |
opText |
operation_text |
|
db |
database_name |
||
table |
table_name |
||
path |
resource_path |
||
objType |
object_type |
||
objUsageType |
object_usage_type |
||
Hue |
operationText |
operation_text |
|
service |
service |
||
url |
url |
||
Impala |
opText |
operation_text |
|
status |
status |
||
db |
database_name |
||
table |
table_name |
||
privilege |
privilege |
||
objType |
object_type |
||
QUERY_ID |
query_id |
Not exposed through the Kafka messages |
|
SESSION_ID |
session_id |
||
NavMS |
additionalInfo |
additional_info |
|
entityId |
entity_id |
||
name |
stored_object_name |
||
subOperation |
sub_operation |
||
Sentry |
databaseName |
sentry_database_name |
|
objectType |
sentry_object_type |
||
operationText |
operation_text |
||
resourcePath |
resource_path |
||
tableName |
table_name |
||
Solr |
collectionName |
collection_name |
|
operationParams |
operation_params |
||
solrVersion |
solr_version |
An Envelope’s Kafka input is used to read the Navigator Audit events from the Kafka topic.
A translator is required to provide the input messages to the rest of the pipeline. The raw translator is used to provide the Kafka message as-is, so that we can parse it into record fields in a subsequent deriver.
To enable the job to be stopped and restarted, the Kafka input step manages the Kafka message offsets by storing them in a Kudu table (impala::nav.nav_offsets
). The offsets are committed to this table only after the associated records have been persisted to the job output. With this, if the job is stopped in the middle of a micro-batch, when it is restarted it will reprocess any messages that had not been written at the end of the pipeline. This implements at-least-once delivery semantics.
The Kafka input step provides the raw Kafka message, which in this case is the JSON event, as a binary value. The string events step then uses the SQL deriver to cast the binary JSON to a string. Following that, the parsed events step uses the Parse JSON deriver to parse the JSON string into individual fields for downstream processing.
The records that come out of the parsed events step then contain all the 8 type of events that Navigator Audit captures. The attribute type
of the audit event records identify which service they are coming from.
The goal of this Envelope pipeline is to separate the events by service type and store them in service-specific tables/directories. To filter the records for each service we use SQL derivers, which use a SQL query, like the one in the example below, to filter all the events for a particular service.
SELECT
-- partitioning column
from_unixtime(CAST(time as BIGINT) / 1000, 'yyyy-MM-dd') as day,
-- generic attributes
service as service_name, allowed as allowed, user as username,
impersonator as impersonator, ip as ip_addr, op as operation,
CAST(CAST(time as BIGINT) / 1000 as TIMESTAMP) as event_time,
-- event specific attributes
src as src, dst as dest, perms as permissions,
-- the attribute below is not exposed by Navigator through Kafka; it's included here for
-- completeness only
DELEGATION_TOKEN_ID as delegation_token_id
FROM parsedEvents
WHERE type = 'HDFS'
All the tables created in this example are partitioned by day. Since the original audit events don’t have a day
column we have to generate one, deriving it from the event time
value. The query then lists all the generic event attributes, followed by the ones specific to the event associated with the deriver where the query is configured. Note that the query’s WHERE
clause filters only the events associated with that particular service.
Some attributes that exist in Navigator Audit, such as the DELEGATION_TOKEN_ID
listed above, are not added to the messages written to Kafka. I could have removed these attributes altogether from the example, but decided to leave them in just for completeness.
Kudu is our recommended output for this example. It provides a efficient and easy way to store and query audit records at scale.
The configuration of the Kudu outputs is fairly straight forward.
Those outputs have been configured with insert.ignore = true
to allow records to be upserted in case a restart of the job causes an event to be reprocessed.
The Kudu tables used in this example are (range) partitioned by day, and each day partitioned is further partitioned in multiple hash buckets, for performance. This increases parallelism and makes it faster to query events for a particular day.
The caveat is that the range partitions must already exist when event for particular days are ingested. If the partition does exist the job will fail to populate the Kudu tables.
We have included a script to help create partitions for all the Kudu tables. The script generates a DDL script, which can then be executed using impala-shell
. The syntax of the script is:
./add_kudu_partitions.sh [#_of_days] [start_day]
Both parameters are optional. If they are omitted, the script will generate DDL statements to create partitions for 7 days, starting from the current day. You can control the range of partitions and the starting point by specifying the parameters.
# Default usage
$ ./add_kudu_partitions.sh
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-02';
...
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-08';
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-02';
...
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-08';
...
# Specifying a different date range
$ ./add_kudu_partitions.sh 5 2018-05-30
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-05-30';
...
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-03';
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-05-30';
...
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-03';
...
# Generate DDL and execute with impala-shell
$ ./add_kudu_partitions.sh 5 2018-05-30 > create_partitions.sql
$ impala-shell -v -f create_partitions.sql
For the sake of showing an example of writing to partitioned Impala Parquet-based tables, we have also added Filesystem outputs to this example, besides the Kudu ones.
Although this might be a cheaper alternative for longer-term storage of the audit events, the implementation in this example is not recommended for production and is here for the mere purpose of illustrating the usage of the filesystem output.
Writing to HDFS-based tables using short microbatches, as the one in this example, has the potential of creating too many small Parquet files on HDFS, which will be detrimental to the Impala and HDFS performances in the long run.
A better approach for long-term storage of events in HDFS would be the creation of a separate Envelope job that runs less frequently (once a day, for example), and copy older partitions of the Kudu table to HDFS. This implementation, though, is out of the scope of this example.
Differently from the Kudu tables, HDFS partition directories will be automatically created every time the first event for a new day is ingested. There’s no need to precreate the directories in advance.
The new directories/partitions, however, will not be immediately visible for users querying the Impala tables. Similarly, new Parquet files added to partitions will not be visible by Impala users until a REFRESH
is executed for the table.
Hence, you must schedule the following statements
ALTER TABLE <table_name> RECOVER PARTITIONS;
REFRESH <table_name>;
This example provides a DDL script with these commands for all the Parquet tables, which can be run with the following command:
impala-shell -v -f refresh_parquet_tables.sql
[host1:21000] > select * from nav.hdfs_events limit 3;
...
+------------+---------------+--------------+----------+--------------+-------------+---------+--------------+-----------------------------------------------------------------------------------------------+------+-------------+---------------------+
| day | event_time | service_name | username | ip_addr | operation | allowed | impersonator | src | dest | permissions | delegation_token_id |
+------------+---------------+--------------+----------+--------------+-------------+---------+--------------+-----------------------------------------------------------------------------------------------+------+-------------+---------------------+
| 2018-06-03 | 1528086332427 | HDFS-1 | root | 172.31.116.6 | getfileinfo | true | NULL | /data/nav/navms_events_parquet/_temporary/0/_temporary/attempt_20180603212532_0012_m_000000_0 | NULL | NULL | NULL |
| 2018-06-03 | 1528086332680 | HDFS-1 | root | 172.31.116.9 | delete | true | NULL | /data/nav/navms_events_parquet/_temporary | NULL | NULL | NULL |
| 2018-06-03 | 1528086344032 | HDFS-1 | root | 172.31.116.6 | getfileinfo | true | NULL | /data/nav/hdfs_events_parquet/_temporary/0/_temporary/attempt_20180603212544_0014_m_000002_0 | NULL | NULL | NULL |
+------------+---------------+--------------+----------+--------------+-------------+---------+--------------+-----------------------------------------------------------------------------------------------+------+-------------+---------------------+
Fetched 3 row(s) in 0.34s