Airflow

Airflow SDK

Apache Airflow is an open-source workflow management platform for data engineering pipelines. Acceldata Airflow SDK provides APIs, decorators, and operators that allow for fine-grained end-to-end tracking and visibility of Airflow DAGs.

Features provided by acceldata-airflow-sdk:

  • DAG: A wrapper built on top of Airflow DAG monitors the beginning and end of pipeline execution
  • Pipeline: Represents an execution of a pipeline inside Airflow
  • Span: Logical collection of various tasks within Airflow
  • Job: Logical representation of a task within Airflow
  • Event: An event can hold process or business arbitrary data and is sent to the ADOC system for future tracking against a pipeline execution

Installation

Airflow installation details can be found here.

Users must ensure that the following python modules are installed in the airflow environment.

Bash
Copy

Prerequisites

To authenticate calls to ADOC, API keys are required.

Users can generate API keys in the ADOC UI's Admin Central by visiting the auto$ section.

The API keys and ADOC URL should be set up as environment variables in Airflow before using acceldata-airflow-sdk calls in Airflow DAG.

Establishing Connection between Airflow and the ADOC Server:

This guide outlines two methods for configuring Airflow to interact with the ADOC Server:

Method 1: Using Environment Variables

  1. Set Environment Variables:
    • Define the following environment variables within your Airflow environment docker container:
      • TORCH_CATALOG_URL: URL of your ADOC Server instance.
      • TORCH_ACCESS_KEY: API access key generated from the ADOC UI.
      • TORCH_SECRET_KEY: API secret key generated from the ADOC UI.
      • ENABLE_VERSION_CHECK (Optional): Set to True to enable the version compatibility check between ADOC and SDK. Set it to False to disable the check.

Method 2: Using Airflow Connection

  1. Create an Airflow Connection:

    • Log in to your Airflow UI.
    • Navigate to the "Admin" -> "Connections" section.
    • Click on "Create" to create a new connection.
  2. Configure Connection Details:

    • Enter a unique identifier for the connection (e.g., "adoc_conn").
    • Set the "Conn Type" to "HTTP".
    • Enter the URL of your ADOC server in the "Host" field.
    • Enter the API access key from the ADOC UI in the "Login" field.
    • Enter the API secret key from the ADOC UI in the "Password" field.
    • (Optional)In the "Extra" field, enter the following JSON:
JSON
Copy

Steps to Set up Airflow DAG

Before using the features provided by the acceldata-airflow-sdk, users must first set up Airflow DAG scripts with a few steps.

Minimum Instrumentation Required

Step 1. Use acceldata-airflow-sdk DAG Object

Acceldata-airflow-sdk tracks the end of the DAG run using customized DAG.

Users must import DAG from acceldata_airflow_sdk.dag and replace Airflow DAG with acceldata-airflow-sdk DAG in the DAG script. This will enable you to terminate the pipeline run based on the success or failure of the DAG execution in Airflow. This was accomplished by utilizing Airflow's success and failure callbacks.

When utilizing the acceldata-airflow-sdk DAG, you must include the pipeline uid, which will be updated in ADOC to track the DAG being performed in Airflow.

Python
Copy

Step 2. Setup TorchInitializer Task

Acceldata-airflow-sdk uses the TorchInitializer task to track the beginning of the DAG run by creating a pipeline and pipeline run along with an all-encompassing root span.

Create a task for the TorchInitializer operator and add it as the DAG's root (first) task. Inside the TorchInitializer operator, Acceldata-airflow-sdk establishes a connection to ADOC using the TorchClient class, which employs the credentials specified in the environment variables above. Using the TorchClient connection, the TorchInitializer operator creates a new pipeline if one does not already exist, a new pipeline run for each DAG run, and a root span to bind the entire pipeline run. Users can create a pipeline in the Torch UI and then provide the UID of that pipeline in TorchInitializer. In that case, a new pipeline will not be used.

Python
Copy

ADOC can now track the entire DAG as a single unit after performing these two steps.

Tracking Each Task

To allow ADOC to provide a fine-grained view of the DAG, users must add more instrumentation to the DAG code, as described in the sections below.

Tracking Each Task Using Jobs

Each task should be decorated with a job decorator and input, output, and metadata should be passed as arguments to make it visible as a job in the ADOC pipeline. The task's inputs should be described in the inputs list, and the task's output asset should be described in the outputs list. The job_uid parameter is used to specify a custom job_uid, which is generated by default. Furthermore, if users do not want the job to be bounded by a span, the bounded_by_span argument can be set to False.

Obtaining the Asset's UID for Use in the Input and Output List

To obtain the UID of an asset, the user must first open an asset in the ADOC UI. A path to the asset is shown in the asset under the Asset name, as shown in the image above. The first part highlighted in green is the data source name, and the remaining items can be used as asset names by using a period as a field separator. The data source name in this example is ATHENA-DS, and the asset name is AwsDataCatalog.sampledb.elb_logs.request_processing_time.

This asset can be used as an input with the following syntax: inputs=[Node(asset uid='ATHENA-DS.AwsDataCatalog.sampledb.elb_logs.request_processing_time')] ,

Subdividing a Task into Multiple Spans

Users can represent a single task with multiple steps in multiple child spans with create_child_span and send events for those child spans. To create a child span, users must first obtain the parent span context, which returns us to the root span. Users must use the parent span context to call create child span, and it will appear as child span in the ADOC pipelines view.

Python
Copy

Tracking Tasks Created with Airflow Operators

In some cases, users may prefer to use an Airflow operator such as PostgresOperator or ExecutePolicyOperator provided by the Airflow SDK instead of PythonOperator. A JobOperator has been provided to wrap such tasks, create the corresponding job, and bind it with span.

If JobOperator is being used to wrap another operator, such as ExecutePolicyOperator in this case, the DAG argument should not be specified.

Python
Copy

Linking a Task with Another Task

In previous examples, each pipeline job takes an asset as input and produces another asset as output, which the next job will use as input. Acceldata-airflow-sdk uses these to connect jobs in the ADOC pipeline UI. However, there may be times when a task does not produce another asset as an output. ExecutePolicyOperator is used to execute a policy and generate a result. In such cases, users can provide a job_uid as output instead of an asset to link the next job.

Python
Copy

In this getting-started guide, we looked at how to use Acceldata Airflow SDK decorators and operators to make Airflow DAG visible in ADOC pipelines' UI. In addition, we investigated the use of operators to implement Data Quality policies and provide job-to-job linking.

Using Operators and Decorators

Decorators in Python

A decorator is a Python design pattern that allows a user to add new functionality to an existing object without modifying its structure. A decorator accepts a function, adds some functionality, and returns it. The SDK has provided Job and Span decorators to simplify instrumenting an Airflow DAG using this design pattern.

Operators in Airflow

An operator is a template for a predefined Task that you can define declaratively within your DAG. Airflow operator tasks could not be used with the decorator design pattern. As a result, the SDK includes JobOperator and SpanOperator to simplify instrumentation for tasks created for operators which do not use PythonOperator.

Create Job and Span Using Job Decorator

To create a job and span in the pipeline, the user must decorate the python function with a job decorator, as shown below.

Object of Node should have either asset_uid ( {data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.

The parameters for a job decorator include:

ParameterDescription
span_uidA String parameter to specify the UID of the span to be created. The default value is None. If span_uid is not provided, a span corresponding to the job will be created with the value job_uid.
job_uidA String parameter to specify the job UID of the pipeline. The default value is None. If job_uid is not provided, uid is constructed using the dag_id, task_id and function name.
inputsAn Array parameter of Node type objects being used by the job as input. The default value is an empty array.
outputsAn Array parameter of Node type objects being returned by the job as output. The default value is an empty array.
metadataParameter of type JobMetadata specifying the metadata of the job. The default value is None.
xcom_to___event_mapper_idsA list parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list.
bounded_by_spanA Boolean parameter deciding whether to create a span along with the Job. The default value is True. If it is set to True to create a span, make sure it has **context parameter inside the function argument. Th gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context.
Python
Copy

Create Span Using Decorator

A span for a python function can be created by decorating a python function with a span decorator that takes span uid as parameters. Add the **context parameter inside the function argument to decorate it with span. This provides access to the task's context. Various span events can be sent inside the function using the context. The parent span context can be extracted from the context dict using the key name span_context_parent, for example datagen_span_context = context['span context parent']. It will return be a span context instance that can be used to create child spans and send custom events as shown in the below example.

The parameters for a span decorator include:

ParameterDescription
span_uidA String parameter to specify the UID of the span to be created. It is a mandatory parameter.
xcom_to_event_mapper_idsA parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list.
Python
Copy

JobOperator OPERATOR

In some cases, users may prefer to use an Airflow operator such as PostgresOperator or ExecutePolicyOperator provided by the Airflow SDK instead of PythonOperator. A JobOperator has been provided to wrap such tasks, create the corresponding job, and bind it with span in order to instrument them.

JobOperator will execute any std operator passed as an operator parameter, create a job, and send span start and end events. Simply wrap the std operator in a JobOperator. Check that the wrapped operator is not included in the DAG. If the operator is wrapped in a JobOperator, the JobOperator will handle the operator's task within its execution.

As parameters, Node objects should have either asset_uid (data source.asset path from its root) or job_uid (UID of next job).

The parameters for a JobOperator include:

ParameterDescription
span_uidA String parameter to specify the UID of the span to be created. The default value is None. If span_uid is not provided, a span corresponding to the job will be created with the value job_uid.
job_uidA String parameter to specify the job UID of the pipeline. The default value is None. If job_uid is not provided, uid is constructed using the dag_id, task_id and function name.
inputsAn Array parameter of Node type objects being used by the job as input. The default value is an empty array.
outputsAn Array parameter of Node type objects being returned by the job as output. The default value is an empty array.
metadataParameter of type JobMetadata specifying the metadata of the job. The default value is None.
xcom_to _vent_mapper_idsA list parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list.
bounded_by_spanA Boolean parameter deciding whether to create a span along with the Job. The default value is True. If it is set to True to create a span, make sure it has **context parameter inside the function argument. Th gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context.
operatorA parameter specifying the Standard airflow operator. It is a mandatory parameter.

Other parameters will be the same as the base operator for the airflow standard. Make sure that the type of object inside a Node has asset_uid (data source.asset path from its root) or job_uid (UID of next Job) as parameters.

Do not specify the dag parameter in the std airflow operator being passed as an argument to JobOperator as the execution of the operator task is taken care of by JobOperator.

If JobOperator is being used to wrap another operator ExecutePolicyOperator in this case, then the ExecutePolicyOperator task should not specify dag argument.

Python
Copy

SpanOperator OPERATOR

SpanOperator will execute any std operator passed as an operator parameter, create a span, and send span start and end events. Simply wrap the std operator with a span operator. Check that the wrapped operator has not been added to the DAG. If the operator is wrapped by a span operator, the span operator will handle the operator task within its execution.

ParameterDescription
span_uidA String parameter to specify the UID of the span to be created. The default value is None. If span_uid is not provided, a span corresponding to the job will be created with the value job_uid.
xcom_to_vent_mapper___idsA list parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list.
operatorA parameter specifying the Standard airflow operator. It is a mandatory parameter.

Other parameters will be the same as the airflow standard base operator.

Do not specify the dag parameter in std airflow operator being passed as an argument to SpanOperator as the execution of operator task is taken care of by SpanOperator.

Python
Copy

Creating Airflow Connection

If you want to avoid using environment variables, you can create a connection in the Airflow UI as described below, and provide the connection id of that connection in the TorchInitializer. Set the following for the connection:

  • Conn id: Create a unique ID for the connection
  • Conn Type: HTTP
  • Host - URL of the torch catalogue
  • Login - API access key generated from torch UI
  • Password - API secret key generated from torch UI
  • Extra - {"ENABLE_VERSION_CHECK": true}. This value will enable or disable the version compatibility checks between Torch and SDK. The default value is 'True'. To disable the version check, set it to 'False'.

TorchInitializer Operator

At the root of the dag, you must add a task with a TorchInitializer Operator. This operator will build a new pipeline. This will also generate a new pipeline run and root span for that dag run of the Airflow dag.

The parameters for TorchInitializer Operator include:

ParameterDescription
create_pipelineA Boolean parameter that determines whether to create a pipeline (if one does not already exist) and run the pipeline. The default value is 'True'. If a pipeline or pipeline run was created outside of Airflow DAG, this can be useful.
span_nameA string parameter specifying the name of the Root Span. The default value is 'None'. If no name is provided, the pipeline_uid.span will be used as the span name.
metaA parameter that specifies the pipeline's metadata (PipelineMetadata). The default value is 'None'. If it is not provided, PipelineMetadata(owner='sdk/pipeline-user', team='TORCH', codeLocation='...') is set as meta.
pipeline_uidA string parameter specifying the pipeline's UID. It is a required parameter.
pipeline_nameA string parameter specifying the name of the pipeline. The default value is 'None'. If it is not provided, pipeline_uid will be used as the name.
continuation_idA string parameter that uniquely identifies a pipeline run. This parameter can accept jinja templates as well. The default value is 'None'. When we want a pipeline run to span multiple DAGs, we can use this parameter. To use it, we must provide a continuation id when creating the pipeline in the first DAG with create_pipeline=True, and then provide the same continuation id when continuing the same pipeline run in the second DAG with create_pipeline=False.
connection_idA string parameter that uniquely identifies a Torch credentials-storing connection. The default value is 'None'. When we want to use Torch credentials from the Airflow connection instead of environment variables, we can use this parameter. Refer to the Creating Airflow Connection section above for more information.
Python
Copy

Here is an example demonstrating usage of all the decorators and operators provided in airflow-sdk:

Decorators and Operators

Using Execute Policy Operator

ExecutePolicy OPERATOR

ExecutePolicyOperator is used to execute a policy by passing policy type and policy_id. Only data quality and reconciliation policies are supported for ad-hoc execution using this operator.

The parameters for ExecutePolicyOperator include:

ParameterDescription
syncA Boolean parameter used to decide if the policy should be executed synchronously or asynchronously. It is a mandatory parameter. If it is set to True it will return only after the execution ends. If it is set to False it will return immediately after starting the execution.
policy_typeA PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
policy_idA String parameter used to specify the policy id to be executed. It is a mandatory parameter.
incrementalA Boolean parameter used to specify if the policy execution should be incremental or full. The default value is False.
failure_strategy

An enum parameter used to decide the behaviour in case of failure. The default value is DoNotFail.

  • failure_strategy takes enum of type FailureStrategy which can have three values DoNotFail, FailOnError , and FailOnWarning.
  • DoNotFail will never throw. In case of errors in policy execution, it will log the error.
  • FailOnError will Throw an exception only if it's an error. In case of a warning, it will return without any errors.
  • FailOnWarning will throw exceptions on warnings as well as errors.
Python
Copy

ExecutePolicyOperator stores the execution id of the policy executed in xcom using the key {policy_type.name}_{policy_id}_execution_id. Replace the policy_type and policy_id based on the policy.

Hence, to query the result in another task you need to pull the execution id from xcom using the same key {policy_type}_{policy_id}_execution_id.

Query the Result Using get_policy_execution_result

get_policy_execution_result is a helper function that can query the result of policy executed with the operator using the execution id pulled from xcom. In this example, the policy_type is PolicyType.DATA_QUALITY.name and the policy_id is 46.

The parameters for get_polcy_execution_result include:

ParameterDescription
policy_typeA PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idA String parameter specifying the execution id for which users want to query the results. It is a mandatory parameter.
failure_strategy

An enum parameter used to decide the behaviour in case of failure. The default value is DoNotFail.

  • failure_strategy takes enum of type FailureStrategy which can have three values DoNotFail, FailOnError , and FailOnWarning.
  • DoNotFail will never throw. In case of errors in policy execution, it will log the error.
  • FailOnError will Throw an exception only if it's an error. In case of a warning, it will return without any errors.
  • FailOnWarning will throw exceptions on warnings as well as errors.
Python
Copy

Circuit Breaker Pattern Based on Policy Execution Result

Users can interrupt DAG execution based on the result of policy execution. For example, if the policy execution encounters errors, the user may wish to exit the DAG execution. Then, failure strategy=FailureStrategy.FailOnError can be set. If policy execution fails, this will result in DAG execution being halted by throwing an exception.

Python
Copy

Query the Status Using get_policy_status

get_policy_status is a helper function that can query the current status of the policy executed using the operator.

The parameter for get_policy_status include:

ParameterDescription
policy_typeA PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idA String parameter specifying the execution id for which users want to query the results. It is a mandatory parameter.

You need to pull the execution id from xcom using the same key {policy_type.name}_{policy_id}_execution_id which was pushed by ExecutePolicyOp

Context Switching in Heterogeneous Pipelines

In some ETL pipelines, it is possible to have a DAG dependent on another. In such cases, you may want to represent and track both DAGs as part of a single pipeline in ADOC.

Summary

This guide demonstrates a context switching between two ETL pipelines.

In this example scenario, the ADOC pipeline run begins in dag 1 and ends in dag 2.

DAG1

In dag1 override_success_callback=True is set in the DAG ensuring that the new pipeline_run is not closed at the end of dag1. In addition to that, continuation_id is passed in the TorchInitializer task which will be used to link this run of dag1 to a specific run of dag2.

Python
Copy

DAG2

In dag2, create pipeline=False is set in the TorchInitializer task, ensuring that when TorchInitializer is called in dag2, a new pipeline run is not started. The dag2TorchInitializer task is passed the same continuation_id as dag1, which will be used to link this run of dag2 to the run of dag1.

Python
Copy

Here is an example demonstrating a context switch happening between two Airflow DAGs. This is a working code example that you can test on your own.

Context Switching

Observing Airflow DAGs with ADOC Listener Plugin

The ADOC Listener plugin integrates Airflow DAGs for automatic observation in ADOC.

The plugin performs the following actions without requiring any additional code in your Airflow DAG, unless you disable instrumentation through environment variables.

  1. When the DAG starts:

    1. It creates the pipeline if it does not already exist in ADOC.
    2. It creates a new pipeline run in ADOC.
  2. When a TaskInstance starts:

    1. It creates jobs in ADOC for each of the airflow operators used in the task.
    2. It constructs job input nodes based on the upstream tasks.
    3. It creates a span and associates it with the jobs.
    4. It emits span events with metadata.
  3. When a TaskInstance is completed:

    1. It emits span events with metadata.
    2. It ends the spans with either success or failure.
  4. When the DAG is completed:

    1. It updates the pipeline run with success or failure in ADOC.

Prerequisites

Ensure to have the following applications installed in your system:

API keys are essential for authentication when making calls to ADOC.

Configuration

Plugin Environment Variables

The adoc_listener_plugin utilizes the acceldata-sdk to push data to the ADOC backend.

Mandatory Environment Variables

The ADOC client relies on the following environment variables:

  • TORCH_CATALOG_URL: The URL of the ADOC server.
  • TORCH_ACCESS_KEY: The API access key generated from the ADOC UI.
  • TORCH_SECRET_KEY: The API secret key generated from the ADOC UI.

Optional Environment Variables

By default, all DAGs are observed. However, the following set of environment variables can be used to modify this behavior.

The environment variables for ignoring or observing DAGs are mutually exclusive.

If the following environment variables match with the DAG ids, the observation of the matched DAG ids will be ignored, while all other DAG ids are still observed:

  • DAGIDS_TO_IGNORE: Comma-separated DAG ids to ignore observation.
  • DAGIDS_REGEX_TO_IGNORE: Regular expression for DAG ids to ignore observation.

If the following environment variables match with the DAG ids, only the observation of those specific DAG ids will be observed, while all other DAG ids will be ignored:

  • DAGIDS_TO_OBSERVE: Comma-separated dag ids to observe.
  • DAGIDS_REGEX_TO_OBSERVE: Regular expression for DAG ids to observe.

Deployment

  • To deploy the plugin on an on-premise instance of Apache Airflow, refer (Link Removed).
  • To deploy the plugin on Amazon MWAA, refer (Link Removed).
  • To deploy the plugin on Google Cloud Composer, refer (Link Removed).

Enhance Data Reliability with Automated Data Reliability

Consider automating your data reliability which complements this plugin integration by enhancing data reliability and observability without requiring modifications to the pipeline code.

Ways to Observe your DAGs in ADOC

You can observe your pipelines created with Airflow DAGs in ADOC using the following three methods:

  1. Full Code: In the full code approach, the Airflow Listener Integration is not used to observe our DAGs. If you are using the Airflow Listener Integration, you must disregard automated observation for these DAGs, as explained in the Configuration#configuration section.
    1. Switching to asset lineage would involve excluding this DAG from the integration and adding complete observability for it.
    2. If you intend to connect multiple DAGs using the continuation_id logic, make sure to disable DAG observation in the configuration when using the listener plugin integration. This is crucial as the integration will conclude the pipeline run upon the termination of the DAG run.

Examples demonstrating the full code approach can be referred here.

  1. Light Code: In the Light Code approach, you should utilize the Airflow Listener Integration while having the flexibility to enhance the code beyond what the plugin offers. This can involve:

    1. Sending additional span events for the task. You can find an example demonstrating this use case for reference here.
    2. Invoking policies for the DAG. For examples, refer the following:
      1. Example with Torch Client
      2. Example with ExecutePolicyOperator
  2. No Code: In the No Code approach, you should utilize the Airflow Listener Integration exclusively. An example demonstrating how to observe your DAG, along with its corresponding pipeline, spans, and events, without requiring any additional code can be referred here.

Deploying on On-Prem Apache Airflow

To deploy the plugin on an on-premises instance of Apache Airflow, perform the following:

1. Setup Environment Variables

Configure the required and optional environment variables for the plugin. For a detailed list of these variables and their descriptions, refer to the Configuration#configuration section.

2. Verify Airflow Version

Verify that the Airflow environment where you plan to deploy the plugin is running version 2.5.0 or later. You can check the version by running:

Bash
Copy

Confirm if the output version is 2.5.0 or higher, such as 2.5.3.

3. Install Required Packages

Install the following package on all Airflow components: adoc_airflow_plugin.

Use the appropriate package manager or installation method for your environment to install this dependency.

4. Restart Airflow Components

Restart all Airflow components to apply the plugin changes.

5. Validate the Plugin Installation

Navigate to Admin > Plugins in your Airflow UI. Confirm that the AcceldataListenerPlugin is listed correctly, indicating successful installation.

6. Verify Instrumentation

Once the plugin is installed and the environment is configured, execute a DAG in your Airflow instance. After the DAG run is complete, go to ADOC UI > Pipelines to verify successful instrumentation. Find the pipeline matching your DAG's name; it should appear with its associated spans and events, as shown in the screenshot below.

Deploying on Amazon MWAA

To deploy the plugin on Managed Workflows for Apache Airflow (MWAA), perform the following:

1. Setup Environment Variables

Configure the necessary environment variables within your MWAA environment. For a detailed list of required variables and their values, refer the Configuration#configuration section.

2. Create Plugins.zip File

Package the environment variables into a plugins.zip file. For instructions on creating a custom plugin that generates runtime environment variables, refer Creating a custom plugin that generates runtime environment variables - Amazon Managed Workflows for Apache Airflow.

The following screenshot is a sample env_var_plugin.py file for your reference:

You can either hardcode the environment variables directly in the file or use AWS Secrets Manager to securely store them and configure the plugins.zip file to read from Secrets Manager.

3. Install Required Packages

  • Check for requirements.txt: Inspect your DAG code folder in the Amazon S3 bucket to see if a requirements.txt file is already present.

    • If requirements.txt does not exist:

      • Upload a new requirements.txt file that includes the latest versions of the adoc_airflow_plugin package.
    • If requirements.txt exists:

      • Verify that it includes the adoc_airflow_plugin packages. If they are not listed, update the file to include their latest versions.

4. Update Airflow Environment Packages

Point your MWAA environment to the updated versions of the plugins.zip and requirements.txt files. Apply these changes in the MWAA console to ensure the updated configurations are loaded correctly.

5. Validate the Plugin Installation

Navigate to Admin > Plugins in your Airflow UI. Ensure that both AcceldataListenerPlugin and env_var_plugin are listed, confirming successful installation.

6. Verify Instrumentation

After installing the plugin and configuring the environment, trigger a DAG in your Airflow instance. Once the DAG run is complete, navigate to ADOC UI > Pipelines to confirm successful instrumentation. Locate the pipeline corresponding to your DAG’s name; it should be displayed with its associated spans and events, as shown in the screenshot below.

Deploying on Google Cloud Composer

To deploy the plugin on Google Cloud Composer, perform the following:

1. Setup Environment Variables

Configure the required and optional environment variables for the plugin. For a detailed list of these variables and their descriptions, refer Configuration#configuration. You may add airflow_monitoring to DAGIDS_TO_IGNORE to exclude health check DAGs from being monitored.

2. Install Required Packages

In your Composer environment, navigate to the PYPI PACKAGES tab and add the following package:

  • adoc_airflow_plugin

This will install the necessary dependencies for the plugin to function correctly.

3. Validate the Plugin Installation

Navigate to Admin > Plugins in your Airflow UI. Ensure that AcceldataListenerPlugin is listed, indicating successful installation.

4. Verify Instrumentation

After installing the plugin and configuring the environment, trigger a DAG in your Airflow instance. Once the DAG run is complete, navigate to ADOC UI > Pipelines to verify successful instrumentation. Find the pipeline that corresponds to your DAG’s name; it should be displayed with its associated spans and events, as shown in the screenshot below.

##

Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard
On This Page
AirflowAirflow SDKFeatures provided by acceldata-airflow-sdk:InstallationPrerequisitesEstablishing Connection between Airflow and the ADOC Server:Steps to Set up Airflow DAGMinimum Instrumentation RequiredStep 1. Use acceldata-airflow-sdk DAG ObjectStep 2. Setup TorchInitializer TaskTracking Each TaskTracking Each Task Using JobsObtaining the Asset's UID for Use in the Input and Output ListSubdividing a Task into Multiple SpansTracking Tasks Created with Airflow OperatorsLinking a Task with Another TaskUsing Operators and DecoratorsDecorators in PythonOperators in AirflowCreate Job and Span Using Job DecoratorCreate Span Using DecoratorJobOperator OPERATORSpanOperator OPERATORCreating Airflow ConnectionTorchInitializer OperatorUsing Execute Policy OperatorExecutePolicy OPERATORQuery the Result Using get_policy_execution_resultCircuit Breaker Pattern Based on Policy Execution ResultQuery the Status Using get_policy_statusContext Switching in Heterogeneous PipelinesSummaryDAG1DAG2Observing Airflow DAGs with ADOC Listener PluginPrerequisitesConfigurationPlugin Environment VariablesDeploymentEnhance Data Reliability with Automated Data ReliabilityWays to Observe your DAGs in ADOCDeploying on On-Prem Apache Airflow1. Setup Environment Variables2. Verify Airflow Version3. Install Required Packages4. Restart Airflow Components5. Validate the Plugin Installation6. Verify InstrumentationDeploying on Amazon MWAA1. Setup Environment Variables2. Create Plugins.zip File3. Install Required Packages4. Update Airflow Environment Packages5. Validate the Plugin Installation6. Verify InstrumentationDeploying on Google Cloud Composer1. Setup Environment Variables2. Install Required Packages3. Validate the Plugin Installation4. Verify Instrumentation