Skip to content

Latest commit

 

History

History
149 lines (109 loc) · 5.57 KB

monitoring.md

File metadata and controls

149 lines (109 loc) · 5.57 KB

Monitoring

Monitoring DAGs failures

The easiest way to monitor your processes is to use Cloud Composer DAG metrics.

To set up an alert for a metric, go to the Cloud Composer dashboard and open the Monitoring section:

Composer monitoring

Next, pick the metric you want to monitor and click the bell icon:

Composer alert

Finally, set up the alert:

Composer alert

Monitoring DAG outputs

Sometimes your DAGs might not fail in the obvious way – with an explicit error, ending with "failed" status. To handle more complex situations we can use the following pattern: you can create "monitoring DAGs", "watching" outputs of your DAGs doing data processing. Monitoring DAGs should execute some checks, and checks should throw an exception causing a monitoring DAG to fail and trigger an alarm.

Monitoring "data latency"

If you have a DAG producing data, it's most likely a good idea to monitor if it's producing data on time.

The simplest effective way of creating a DAG monitoring data latency is creating a DAG running every N minutes, checking the difference between the current time and the latest data partition time. If the difference is above the specified threshold, just throw an error.

The following example shows a data latency monitoring BigFlow workflow. It's checking (every hour) data freshness of an imaginary, daily partitioned table, called table_to_monitor.

import bigflow
import bigflow.bigquery

working_dataset = bigflow.bigquery.DatasetConfig(
    env='dev',
    project_id='my.gcp.project_dev',
    dataset_name='data_latency_monitoring_dev',
    external_tables={
        'table_to_monitor': 'some.gcp.table_dev'
    }
).add_configuration(
    env='test',
    project_id='my.gcp.project_test',
    dataset_name='dataset_latency_monitoring_test',
    external_tables={
        'table_to_monitor': 'some.gcp.table_test'
    }
).add_configuration(
    env='prod',
    project_id='my.gcp.project_prod',
    dataset_name='dataset_latency_monitoring_prod',
    external_tables={
        'table_to_monitor': 'some.gcp.table_prod'
    }
)

@bigflow.bigquery.component(ds=working_dataset)
def check_data_latency(ds: bigflow.bigquery.interactive.Dataset):
    is_late = ds.collect_list('''
        SELECT
      TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), pt, MINUTE) > 30 * 3600 as is_late
    FROM (
      SELECT
        _PARTITIONTIME AS pt
      FROM
        `{table_to_monitor}`
      WHERE
        _PARTITIONTIME > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY)
      GROUP BY
        1
      ORDER BY
        pt DESC
      LIMIT
        1 )
    ''', record_as_dict=True)[0]['is_late']
    if is_late:
        raise RuntimeError('Your data is late!')

workflow = bigflow.Workflow(
    workflow_id='data_latency_monitoring',
    definition=[
        check_data_latency.to_job(
            id='check_data_latency', 
            retry_count=3, 
            retry_pause_sec=60)
    ],
    schedule_interval='hourly',
    start_time_factory=bigflow.workflow.hourly_start_time
)

Retrieving logs from Log Explorer

When our DAG is failing, we want to get information about the task instance run. We can do that using Log Explorer. Initially, we need to check failed task details. We can do that like in here.

Airflow task details

Then we need to go into Logs Explorer on the GCP project on which our project is set up. We can filter out logs using job id or dag id or any other detail by which we want to filter our workflow. We can add details to the search field and click run query. In this example, we can see that it narrowed down a number of log messages.

Log Explorer filter

We can also specify what kind of log message we want to filter. We can do that by specifying it on the left pane.

Log Explorer filter

Retrieving logs for Kubernetes Operator

When we use the Kubernetes operator, we can check also the details of k8s workers details and retrieve information from Log Explorer. To get the Kubernetes pod name we need to search for it in Airflow logs. We can see an example here.

Airflow filter

Then to check K8s worker information, we can similarly filter out in the search bar pod name and check information of worker and even specify resource type and check details of Kubernetes pod, cluster, or node.

Log Explorer filter

Getting logs in Dataflow for Beam jobs

When we want to check logs of failed Apache Beam jobs, we need to go to the Dataflow service on our GCP project. Then find our job run and click on it. After that, we can see a graph of the Beam pipeline with the status for each of the steps. To check logs of every job we need to click the arrow on the bottom of the site like here.

Dataflow filter

Then we can filter out proper information similarly to Log Explorer. We can filter out the severity of messages and type whether we want job logs or workers' logs.

Dataflow logs details

However, very often worker logs don't show all the details. Very useful is to open them via Log Explorer and then deleting all the filters and adding only the job id. After that, we can find all the possible information which we want.

Dataflow logs details

Monitoring "data quality"

TODO