Skip to content

Latest commit

 

History

History

navigator

Navigator Audit example

Cloudera Navigator Audit is a tool that collects and consolidates audit records from several Hadoop services. It offers the ability to forward those events into a Kafka topic for downstream consumption.

This example reads audit events from a Kafka topic populated by Navigator Audit and writes them to Kudu. We have also added a HDFS filesystem output for the sake of showing how to handle writes to table partitions on HDFS, but this option comes with caveats (see below).

Audit events are written to Kafka by Navigator Audit as JSON strings. The Envelope pipeline reads this payload through a Kafka input, and then uses the Parse JSON deriver to convert the JSON messages into Dataset records.

The records are then processed by further Envelope derivers, which filter the records for each particular Hadoop service, and the results are forward to the corresponding outputs (Kudu and Filesystem).

Security

Authentication

This example also demonstrates the necessary configuration to run the Envelope job against a secured cluster. It contains all the required configurations to properly authenticate with Kerbero-enabled clusters.

If you’re running the example against a non-secured cluster, you will have to adjust the following Kafka setting in the configuration file:

  • parameter.security.protocol = SSL (if TLS is enabled)

  • parameter.security.protocol = PLAINTEXT (if TLS is not enabled)

Authorization

If authorization has been configured for your cluster you must ensure that the following privileges have been granted to the user who’s running the Envelope job:

  • Read privileges on the Kafka topic containing the audit events

  • Select and insert privileges on the Kudu table used for Kafka offset management

  • Insert privileges on the output Kudu tables (for the Kudu outputs)

  • Read and write permissions on the output HDFS directories (for the HDFS Filesystem outputs)

Encryption

The example has been configured for a Kafka cluster with TLS encryption enabled. To run it successfully you will need a copy of the JKS truststore for the Kafka cluster, as well as the truststore password, which should be configured in the job configuration file.

If TLS is not being used for Kafka you need to adjust the following settings in the configuration file:

  • parameter.security.protocol = SASL_PLAINTEXT (if Kerberos is enabled)

  • parameter.security.protocol = PLAINTEXT (if Kerberos is not enabled)

Running the example

  1. Create a directory for the Parquet tables, if it doesn’t already exist.

    hdfs dfs -mkdir -p hdfs:///data/nav

    Ensure the following permissions have been granted on this directory:

    • Read/write permissions for the user running the Envelope job

    • Read/write permissions for the hive group

  2. Create the required Kudu and Parquet tables using the provided Apache Impala script:

    impala-shell -f create_nav_tables.sql \
      --var hdfs_base_dir=hdfs:///data/nav \
      --verbose
  3. Create partitions for the Kudu tables:

    ./add_kudu_partitions.sh > create_partitions.sql
    impala-shell --verbose -f create_partitions.sql
  4. Create the required Kafka topic:

    kafka-topics --zookeeper YOURZOOKEEPERHOSTNAME:2181 --create --topic nav-audit --partitions 4 --replication-factor 3
  5. Configure Cloudera Navigator Audit to publish audit events to Kafka.

  6. If running in a secured cluster, create a keytab file for the Kerberos principal that will run the example job. Ensure that the permissions of the keytab file are set to 400.

  7. If running in a secured cluster, modify jaas.conf to set the correct values for the keyTab and principal properties. Note that KEYTABFILENAME should be just the file name, not the full path.

  8. Modify nav-audit.conf, reviewing all the properties in the vars section and updating them as per your environment.

  9. If running in a secured cluster, modify your corresponding submit shell script (see below) to update the KERBEROS_KEYTAB and KERBEROS_PRINCIPAL variables.

  10. Run the example by submitting the Envelope application using the corresponding provided wrapper script:

    • For secured clusters:

      • That are running CDH6 or other Spark distributions:

        ./submit-secured.sh
      • That are running CDH5:

        ./submit-secured-cdh5.sh
    • For unsecured clusters:

      • That are running CDH6 or other Spark distributions:

        ./submit-unsecured.sh
      • That are running CDH5:

        ./submit-unsecured-cdh5.sh

Architecture

In this section we go into a bit more details of the implementation of this example.

Audit records

The audit records written by Navigator Audit to the Kafka topic are JSON-formatted string containing the attributes of each audit event. The events for all service types share a common set of attributes and each service has additional specific attributes, as shown in the table below. The mapping between the JSON attributes and Impala table columns is also shown below.

Service JSON Attribute Table column Comments

Generic

service

service_name

allowed

allowed

user

username

impersonator

impersonator

ip

ip_addr

time

event_time

op

operation

HBase

tableName

table_name

family

family

qualifier

qualifier

HDFS

src

src

dst

dest

perms

permissions

DELEGATION_TOKEN_ID

delegation_token_id

Not exposed through the Kafka messages

Hive

opText

operation_text

db

database_name

table

table_name

path

resource_path

objType

object_type

objUsageType

object_usage_type

Hue

operationText

operation_text

service

service

url

url

Impala

opText

operation_text

status

status

db

database_name

table

table_name

privilege

privilege

objType

object_type

QUERY_ID

query_id

Not exposed through the Kafka messages

SESSION_ID

session_id

NavMS

additionalInfo

additional_info

entityId

entity_id

name

stored_object_name

subOperation

sub_operation

Sentry

databaseName

sentry_database_name

objectType

sentry_object_type

operationText

operation_text

resourcePath

resource_path

tableName

table_name

Solr

collectionName

collection_name

operationParams

operation_params

solrVersion

solr_version

Kafka Input

An Envelope’s Kafka input is used to read the Navigator Audit events from the Kafka topic.

A translator is required to provide the input messages to the rest of the pipeline. The raw translator is used to provide the Kafka message as-is, so that we can parse it into record fields in a subsequent deriver.

Offset management

To enable the job to be stopped and restarted, the Kafka input step manages the Kafka message offsets by storing them in a Kudu table (impala::nav.nav_offsets). The offsets are committed to this table only after the associated records have been persisted to the job output. With this, if the job is stopped in the middle of a micro-batch, when it is restarted it will reprocess any messages that had not been written at the end of the pipeline. This implements at-least-once delivery semantics.

Event derivers

The Kafka input step provides the raw Kafka message, which in this case is the JSON event, as a binary value. The string events step then uses the SQL deriver to cast the binary JSON to a string. Following that, the parsed events step uses the Parse JSON deriver to parse the JSON string into individual fields for downstream processing.

The records that come out of the parsed events step then contain all the 8 type of events that Navigator Audit captures. The attribute type of the audit event records identify which service they are coming from.

The goal of this Envelope pipeline is to separate the events by service type and store them in service-specific tables/directories. To filter the records for each service we use SQL derivers, which use a SQL query, like the one in the example below, to filter all the events for a particular service.

SELECT
  -- partitioning column
  from_unixtime(CAST(time as BIGINT) / 1000, 'yyyy-MM-dd') as day,
  -- generic attributes
  service as service_name, allowed as allowed, user as username,
  impersonator as impersonator, ip as ip_addr, op as operation,
  CAST(CAST(time as BIGINT) / 1000 as TIMESTAMP) as event_time,
  -- event specific attributes
  src as src, dst as dest, perms as permissions,
  -- the attribute below is not exposed by Navigator through Kafka; it's included here for
  -- completeness only
  DELEGATION_TOKEN_ID as delegation_token_id
FROM parsedEvents
WHERE type = 'HDFS'

All the tables created in this example are partitioned by day. Since the original audit events don’t have a day column we have to generate one, deriving it from the event time value. The query then lists all the generic event attributes, followed by the ones specific to the event associated with the deriver where the query is configured. Note that the query’s WHERE clause filters only the events associated with that particular service.

Some attributes that exist in Navigator Audit, such as the DELEGATION_TOKEN_ID listed above, are not added to the messages written to Kafka. I could have removed these attributes altogether from the example, but decided to leave them in just for completeness.

Kudu outputs

Kudu is our recommended output for this example. It provides a efficient and easy way to store and query audit records at scale. The configuration of the Kudu outputs is fairly straight forward. Those outputs have been configured with insert.ignore = true to allow records to be upserted in case a restart of the job causes an event to be reprocessed.

Managing partitions for Kudu tables

The Kudu tables used in this example are (range) partitioned by day, and each day partitioned is further partitioned in multiple hash buckets, for performance. This increases parallelism and makes it faster to query events for a particular day.

The caveat is that the range partitions must already exist when event for particular days are ingested. If the partition does exist the job will fail to populate the Kudu tables.

We have included a script to help create partitions for all the Kudu tables. The script generates a DDL script, which can then be executed using impala-shell. The syntax of the script is:

Syntax of add_kudu_partitions.sh
./add_kudu_partitions.sh [#_of_days] [start_day]

Both parameters are optional. If they are omitted, the script will generate DDL statements to create partitions for 7 days, starting from the current day. You can control the range of partitions and the starting point by specifying the parameters.

Usage example of add_kudu_partitions.sh
# Default usage
$ ./add_kudu_partitions.sh
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-02';
...
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-08';
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-02';
...
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-08';
...

# Specifying a different date range
$ ./add_kudu_partitions.sh 5 2018-05-30
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-05-30';
...
ALTER TABLE nav.hbase_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-03';
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-05-30';
...
ALTER TABLE nav.hdfs_events_kudu ADD IF NOT EXISTS RANGE PARTITION VALUE = '2018-06-03';
...

# Generate DDL and execute with impala-shell
$ ./add_kudu_partitions.sh 5 2018-05-30 > create_partitions.sql
$ impala-shell -v -f create_partitions.sql

Filesystem outputs

For the sake of showing an example of writing to partitioned Impala Parquet-based tables, we have also added Filesystem outputs to this example, besides the Kudu ones.

Although this might be a cheaper alternative for longer-term storage of the audit events, the implementation in this example is not recommended for production and is here for the mere purpose of illustrating the usage of the filesystem output.

Writing to HDFS-based tables using short microbatches, as the one in this example, has the potential of creating too many small Parquet files on HDFS, which will be detrimental to the Impala and HDFS performances in the long run.

A better approach for long-term storage of events in HDFS would be the creation of a separate Envelope job that runs less frequently (once a day, for example), and copy older partitions of the Kudu table to HDFS. This implementation, though, is out of the scope of this example.

Managing partitions and refreshes for Impala tables

Differently from the Kudu tables, HDFS partition directories will be automatically created every time the first event for a new day is ingested. There’s no need to precreate the directories in advance.

The new directories/partitions, however, will not be immediately visible for users querying the Impala tables. Similarly, new Parquet files added to partitions will not be visible by Impala users until a REFRESH is executed for the table.

Hence, you must schedule the following statements

ALTER TABLE <table_name> RECOVER PARTITIONS;
REFRESH <table_name>;

This example provides a DDL script with these commands for all the Parquet tables, which can be run with the following command:

impala-shell -v -f refresh_parquet_tables.sql

Querying data through Impala

[host1:21000] > select * from nav.hdfs_events limit 3;
...
+------------+---------------+--------------+----------+--------------+-------------+---------+--------------+-----------------------------------------------------------------------------------------------+------+-------------+---------------------+
| day        | event_time    | service_name | username | ip_addr      | operation   | allowed | impersonator | src                                                                                           | dest | permissions | delegation_token_id |
+------------+---------------+--------------+----------+--------------+-------------+---------+--------------+-----------------------------------------------------------------------------------------------+------+-------------+---------------------+
| 2018-06-03 | 1528086332427 | HDFS-1       | root     | 172.31.116.6 | getfileinfo | true    | NULL         | /data/nav/navms_events_parquet/_temporary/0/_temporary/attempt_20180603212532_0012_m_000000_0 | NULL | NULL        | NULL                |
| 2018-06-03 | 1528086332680 | HDFS-1       | root     | 172.31.116.9 | delete      | true    | NULL         | /data/nav/navms_events_parquet/_temporary                                                     | NULL | NULL        | NULL                |
| 2018-06-03 | 1528086344032 | HDFS-1       | root     | 172.31.116.6 | getfileinfo | true    | NULL         | /data/nav/hdfs_events_parquet/_temporary/0/_temporary/attempt_20180603212544_0014_m_000002_0  | NULL | NULL        | NULL                |
+------------+---------------+--------------+----------+--------------+-------------+---------+--------------+-----------------------------------------------------------------------------------------------+------+-------------+---------------------+
Fetched 3 row(s) in 0.34s