Airflow

Introduction

Apache Airflow is an open-source workflow management platform for data engineering pipelines. Acceldata Airflow SDK provides APIs, decorators, and operators that allow for fine-grained end-to-end tracking and visibility of Airflow DAGs.

Features provided by acceldata-airflow-sdk:

  • DAG: A wrapper built on top of Airflow DAG monitors the beginning and end of pipeline execution
  • Pipeline: Represents an execution of a pipeline inside Airflow
  • Span: Logical collection of various tasks within Airflow
  • Job: Logical representation of a task within Airflow
  • Event: An event can hold process or business arbitrary data and is sent to the ADOC system for future tracking against a pipeline execution

Installation

Airflow installation details can be found here.

Users must ensure that the following python modules are installed in the airflow environment.

Bash
Copy

Prerequisites

To authenticate calls to ADOC, API keys are required.

Users can generate API keys in the ADOC UI's Admin Central by visiting the auto$ section.

The API keys and ADOC URL should be set up as environment variables in Airflow before using acceldata-airflow-sdk calls in Airflow DAG.

Establishing Connection between Airflow and the ADOC Server:

This guide outlines two methods for configuring Airflow to interact with the ADOC Server:

Method 1: Using Environment Variables

Define the following environment variables within your Airflow environment (for example, in your Docker container, .env file, or deployment configuration):

Environment VariablesDescriptionRequiredDefault
TORCH_CATALOG_URLURL of your ADOC (Torch) server instance.
TORCH_ACCESS_KEYAPI access key generated from the ADOC UI.
TORCH_SECRET_KEYAPI secret key generated from the ADOC UI.
ENABLE_VERSION_CHECKEnables or disables SDK and ADOC version compatibility checks.False
TORCH_CONNECTION_TIMEOUT_MSMaximum time (in milliseconds) to wait while establishing a connection to the ADOC server.5000 ms
TORCH_READ_TIMEOUT_MSMaximum time (in milliseconds) to wait for a response from the ADOC server after a successful connection.15000 ms

Example:

Bash
Copy

Info These environment variables are automatically picked up by the Airflow SDK during DAG or task execution.

Method 2: Using Airflow Connection

You can also configure the connection directly in the Airflow UI by performing the following steps:

  1. Create an Airflow Connection:

    • Log in to your Airflow UI.
    • Navigate to the Admin > Connections section.
    • Click + Create to add a new connection.
  2. Configure Connection Details:

    • Enter a unique identifier for the connection (e.g., adoc_conn).
    • Set the Conn Type to HTTP.
    • Enter the URL of your ADOC (Torch) server in the Host field.
    • Enter the API access key from the ADOC UI in the Login field.
    • Enter the API secret key from the ADOC UI in the Password field.
    • (Optional) In the Extra field, enter the following JSON:
JSON
Copy

You can toggle version checking or adjust timeouts as needed:

JSON
Copy

Steps to Set up Airflow DAG

Before using the features provided by the acceldata-airflow-sdk, users must first set up Airflow DAG scripts with a few steps.

Minimum Instrumentation Required

Step 1. Use acceldata-airflow-sdk DAG Object

Acceldata-airflow-sdk tracks the end of the DAG run using customized DAG.

Users must import DAG from acceldata_airflow_sdk.dag and replace Airflow DAG with acceldata-airflow-sdk DAG in the DAG script. This will enable you to terminate the pipeline run based on the success or failure of the DAG execution in Airflow. This was accomplished by utilizing Airflow's success and failure callbacks.

When utilizing the acceldata-airflow-sdk DAG, you must include the pipeline uid, which will be updated in ADOC to track the DAG being performed in Airflow.

Python
Copy

Step 2. Setup TorchInitializer Task

Acceldata-airflow-sdk uses the TorchInitializer task to track the beginning of the DAG run by creating a pipeline and pipeline run along with an all-encompassing root span.

Create a task for the TorchInitializer operator and add it as the DAG's root (first) task. Inside the TorchInitializer operator, Acceldata-airflow-sdk establishes a connection to ADOC using the TorchClient class, which employs the credentials specified in the environment variables above. Using the TorchClient connection, the TorchInitializer operator creates a new pipeline if one does not already exist, a new pipeline run for each DAG run, and a root span to bind the entire pipeline run. Users can create a pipeline in the Torch UI and then provide the UID of that pipeline in TorchInitializer. In that case, a new pipeline will not be used.

Python
Copy

ADOC can now track the entire DAG as a single unit after performing these two steps.

Tracking Each Task

To allow ADOC to provide a fine-grained view of the DAG, users must add more instrumentation to the DAG code, as described in the sections below.

Tracking Each Task Using Jobs

Each task should be decorated with a job decorator and input, output, and metadata should be passed as arguments to make it visible as a job in the ADOC pipeline. The task's inputs should be described in the inputs list, and the task's output asset should be described in the outputs list. The job_uid parameter is used to specify a custom job_uid, which is generated by default. Furthermore, if users do not want the job to be bounded by a span, the bounded_by_span argument can be set to False.

Obtaining the Asset's UID for Use in the Input and Output List

To obtain the UID of an asset, the user must first open an asset in the ADOC UI. A path to the asset is shown in the asset under the Asset name, as shown in the image above. The first part highlighted in green is the data source name, and the remaining items can be used as asset names by using a period as a field separator. The data source name in this example is ATHENA-DS, and the asset name is AwsDataCatalog.sampledb.elb_logs.request_processing_time.

This asset can be used as an input with the following syntax: inputs=[Node(asset uid='ATHENA-DS.AwsDataCatalog.sampledb.elb_logs.request_processing_time')] ,

Subdividing a Task into Multiple Spans

Users can represent a single task with multiple steps in multiple child spans with create_child_span and send events for those child spans. To create a child span, users must first obtain the parent span context, which returns us to the root span. Users must use the parent span context to call create child span, and it will appear as child span in the ADOC pipelines view.

Python
Copy

Tracking Tasks Created with Airflow Operators

In some cases, users may prefer to use an Airflow operator such as PostgresOperator or ExecutePolicyOperator provided by the Airflow SDK instead of PythonOperator. A JobOperator has been provided to wrap such tasks, create the corresponding job, and bind it with span.

If JobOperator is being used to wrap another operator, such as ExecutePolicyOperator in this case, the DAG argument should not be specified.

Python
Copy

Linking a Task with Another Task

In previous examples, each pipeline job takes an asset as input and produces another asset as output, which the next job will use as input. Acceldata-airflow-sdk uses these to connect jobs in the ADOC pipeline UI. However, there may be times when a task does not produce another asset as an output. ExecutePolicyOperator is used to execute a policy and generate a result. In such cases, users can provide a job_uid as output instead of an asset to link the next job.

Python
Copy

In this getting-started guide, we looked at how to use Acceldata Airflow SDK decorators and operators to make Airflow DAG visible in ADOC pipelines' UI. In addition, we investigated the use of operators to implement Data Quality policies and provide job-to-job linking.

Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard