Airflow
Airflow SDK
Apache Airflow is an open-source workflow management platform for data engineering pipelines. Acceldata Airflow SDK provides APIs, decorators, and operators that allow for fine-grained end-to-end tracking and visibility of Airflow DAGs.
Features provided by acceldata-airflow-sdk:
DAG: A wrapper built on top of Airflow DAG monitors the beginning and end of pipeline executionPipeline: Represents an execution of a pipeline inside AirflowSpan: Logical collection of various tasks within AirflowJob: Logical representation of a task within AirflowEvent: An event can hold process or business arbitrary data and is sent to the ADOC system for future tracking against a pipeline execution
Installation
Airflow installation details can be found here.
Users must ensure that the following python modules are installed in the airflow environment.
pip install acceldata-sdkpip install acceldata-airflow-sdkPrerequisites
To authenticate calls to ADOC, API keys are required.
Users can generate API keys in the ADOC UI's Admin Central by visiting the auto$ section.
The API keys and ADOC URL should be set up as environment variables in Airflow before using acceldata-airflow-sdk calls in Airflow DAG.
Establishing Connection between Airflow and the ADOC Server:
This guide outlines two methods for configuring Airflow to interact with the ADOC Server:
Method 1: Using Environment Variables
- Set Environment Variables:
- Define the following environment variables within your Airflow environment docker container:
TORCH_CATALOG_URL: URL of your ADOC Server instance.TORCH_ACCESS_KEY: API access key generated from the ADOC UI.TORCH_SECRET_KEY: API secret key generated from the ADOC UI.ENABLE_VERSION_CHECK(Optional): Set toTrueto enable the version compatibility check between ADOC and SDK. Set it toFalseto disable the check.
- Define the following environment variables within your Airflow environment docker container:
Method 2: Using Airflow Connection
Create an Airflow Connection:
- Log in to your Airflow UI.
- Navigate to the "Admin" -> "Connections" section.
- Click on "Create" to create a new connection.
Configure Connection Details:
- Enter a unique identifier for the connection (e.g., "adoc_conn").
- Set the "Conn Type" to "HTTP".
- Enter the URL of your ADOC server in the "Host" field.
- Enter the API access key from the ADOC UI in the "Login" field.
- Enter the API secret key from the ADOC UI in the "Password" field.
- (Optional)In the "Extra" field, enter the following JSON:
{"ENABLE_VERSION_CHECK": true} // To enable version check{"ENABLE_VERSION_CHECK": false} //To disable version checkSteps to Set up Airflow DAG
Before using the features provided by the acceldata-airflow-sdk, users must first set up Airflow DAG scripts with a few steps.
Minimum Instrumentation Required
Step 1. Use acceldata-airflow-sdk DAG Object
Acceldata-airflow-sdk tracks the end of the DAG run using customized DAG.
Users must import DAG from acceldata_airflow_sdk.dag and replace Airflow DAG with acceldata-airflow-sdk DAG in the DAG script. This will enable you to terminate the pipeline run based on the success or failure of the DAG execution in Airflow. This was accomplished by utilizing Airflow's success and failure callbacks.
When utilizing the acceldata-airflow-sdk DAG, you must include the pipeline uid, which will be updated in ADOC to track the DAG being performed in Airflow.
from acceldata_airflow_sdk.dag import DAGpipeline_uid = "torch.airflow.pipeline"default_args = {'start_date': datetime(2022, 5, 31)}dag = DAG( dag_id='TEST_CUSTOMERS_ETL', schedule_interval=None, default_args=default_args, start_date=datetime(2022, 6, 6))Step 2. Setup TorchInitializer Task
Acceldata-airflow-sdk uses the TorchInitializer task to track the beginning of the DAG run by creating a pipeline and pipeline run along with an all-encompassing root span.
Create a task for the TorchInitializer operator and add it as the DAG's root (first) task. Inside the TorchInitializer operator, Acceldata-airflow-sdk establishes a connection to ADOC using the TorchClient class, which employs the credentials specified in the environment variables above. Using the TorchClient connection, the TorchInitializer operator creates a new pipeline if one does not already exist, a new pipeline run for each DAG run, and a root span to bind the entire pipeline run. Users can create a pipeline in the Torch UI and then provide the UID of that pipeline in TorchInitializer. In that case, a new pipeline will not be used.
from acceldata_airflow_sdk.operators.torch_initialiser_operator import TorchInitializerfrom acceldata_sdk.models.pipeline import PipelineMetadatatorch_initializer_task = TorchInitializer( task_id='torch_pipeline_initializer', pipeline_uid=pipeline_uid, pipeline_name=pipeline_name, connection_id=torch_connection_id, meta=PipelineMetadata(owner='Demo', team='demo_team', codeLocation='...'), dag=dag)ADOC can now track the entire DAG as a single unit after performing these two steps.
Tracking Each Task
To allow ADOC to provide a fine-grained view of the DAG, users must add more instrumentation to the DAG code, as described in the sections below.
Tracking Each Task Using Jobs
Each task should be decorated with a job decorator and input, output, and metadata should be passed as arguments to make it visible as a job in the ADOC pipeline. The task's inputs should be described in the inputs list, and the task's output asset should be described in the outputs list. The job_uid parameter is used to specify a custom job_uid, which is generated by default. Furthermore, if users do not want the job to be bounded by a span, the bounded_by_span argument can be set to False.
Obtaining the Asset's UID for Use in the Input and Output List

To obtain the UID of an asset, the user must first open an asset in the ADOC UI. A path to the asset is shown in the asset under the Asset name, as shown in the image above. The first part highlighted in green is the data source name, and the remaining items can be used as asset names by using a period as a field separator. The data source name in this example is ATHENA-DS, and the asset name is AwsDataCatalog.sampledb.elb_logs.request_processing_time.
This asset can be used as an input with the following syntax: inputs=[Node(asset uid='ATHENA-DS.AwsDataCatalog.sampledb.elb_logs.request_processing_time')] ,
Subdividing a Task into Multiple Spans
Users can represent a single task with multiple steps in multiple child spans with create_child_span and send events for those child spans. To create a child span, users must first obtain the parent span context, which returns us to the root span. Users must use the parent span context to call create child span, and it will appear as child span in the ADOC pipelines view.
from acceldata_sdk.models.job import JobMetadata, Nodefrom acceldata_airflow_sdk.decorators.job import job( inputs=[Node(asset_uid=f'{athena_ds}.{athena_table}')], outputs=[Node(asset_uid=f'{redshift_ds}.{rs_table_approved_name}'), Node(job_uid='data_quality_check')], metadata=JobMetadata('BEN', 'finance', 'https://github.com/finance/reports/rds_customers.kt') )def athena_to_redshift(**context): parent_span_context = context['span_context_parent'] athena_result_span = parent_span_context.create_child_span( uid="athena.finance.approved_result", context_data={'client_time': str(datetime.now())} ) ... athena_result_span.send_event( GenericEvent( context_data={ 'client_time': str(datetime.now()) }, event_uid="finance.athena.approved_result" ) ) athena_result_span.end( context_data={'client_time': str(datetime.now())} ) .... redshift_upload_span = parent_span_context.create_child_span( uid="redshift.data.approved_upload", context_data={'client_time': str(datetime.now())} ) ... redshift_upload_span.send_event( GenericEvent( context_data={ 'client_time': str(datetime.now()) }, event_uid="finance.redshift.approved_upload" ) ) redshift_upload_span.end( context_data={'client_time': str(datetime.now())} )Tracking Tasks Created with Airflow Operators
In some cases, users may prefer to use an Airflow operator such as PostgresOperator or ExecutePolicyOperator provided by the Airflow SDK instead of PythonOperator. A JobOperator has been provided to wrap such tasks, create the corresponding job, and bind it with span.
If JobOperator is being used to wrap another operator, such as ExecutePolicyOperator in this case, the DAG argument should not be specified.
from acceldata_airflow_sdk.operators.job_operator import JobOperatorfrom acceldata_airflow_sdk.operators.execute_policy_operator import ExecutePolicyOperatorfrom acceldata_sdk.models.job import JobMetadata, Nodeimport acceldata_sdk.constants as constsyncoperator_task = ExecutePolicyOperator( task_id='torch_pipeline_syncop_test', rule_type=constants.policy_type, rule_id=constants.policy_id, failure_strategy=const.FailureStrategy.FailOnWarning, sync=True)# Wrap the policy operator with the Job operator so that a job is created for the policy executionsync_operator_wrap_job_task = JobOperator( task_id='syncoperator_task', inputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customer_orders2')], outputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customer_orders3')], metadata=JobMetadata('name', 'team', 'code_location'), operator=syncoperator_task, dag=dag)Linking a Task with Another Task
In previous examples, each pipeline job takes an asset as input and produces another asset as output, which the next job will use as input. Acceldata-airflow-sdk uses these to connect jobs in the ADOC pipeline UI. However, there may be times when a task does not produce another asset as an output. ExecutePolicyOperator is used to execute a policy and generate a result. In such cases, users can provide a job_uid as output instead of an asset to link the next job.
(job_uid='data_quality_check', outputs=[Node(job_uid='quality.customers.athena')], metadata=JobMetadata('BEN', 'finance', 'https://github.com/finance/reports/rds_customers.kt') )def syncoperator_result(**context): ...In this getting-started guide, we looked at how to use Acceldata Airflow SDK decorators and operators to make Airflow DAG visible in ADOC pipelines' UI. In addition, we investigated the use of operators to implement Data Quality policies and provide job-to-job linking.
Using Operators and Decorators
Decorators in Python
A decorator is a Python design pattern that allows a user to add new functionality to an existing object without modifying its structure. A decorator accepts a function, adds some functionality, and returns it. The SDK has provided Job and Span decorators to simplify instrumenting an Airflow DAG using this design pattern.
Operators in Airflow
An operator is a template for a predefined Task that you can define declaratively within your DAG. Airflow operator tasks could not be used with the decorator design pattern. As a result, the SDK includes JobOperator and SpanOperator to simplify instrumentation for tasks created for operators which do not use PythonOperator.
Create Job and Span Using Job Decorator
To create a job and span in the pipeline, the user must decorate the python function with a job decorator, as shown below.
Object of Node should have either asset_uid ( {data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.
The parameters for a job decorator include:
| Parameter | Description |
|---|---|
| span_uid | A String parameter to specify the UID of the span to be created. The default value is None. If span_uid is not provided, a span corresponding to the job will be created with the value job_uid. |
| job_uid | A String parameter to specify the job UID of the pipeline. The default value is None. If job_uid is not provided, uid is constructed using the dag_id, task_id and function name. |
| inputs | An Array parameter of Node type objects being used by the job as input. The default value is an empty array. |
| outputs | An Array parameter of Node type objects being returned by the job as output. The default value is an empty array. |
| metadata | Parameter of type JobMetadata specifying the metadata of the job. The default value is None. |
| xcom_to___event_mapper_ids | A list parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list. |
| bounded_by_span | A Boolean parameter deciding whether to create a span along with the Job. The default value is True. If it is set to True to create a span, make sure it has **context parameter inside the function argument. Th gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context. |
from acceldata_airflow_sdk.decorators.job import jobfrom acceldata_sdk.models.job import JobMetadata, Node(job_uid='monthly.order.aggregate.job', inputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customer_orders')], outputs=[Node(job_uid='Job2_uid')], metadata=JobMetadata(name = 'Vaishvik_brahmbhatt', team = 'backend', code_location ='https://github.com/acme/reporting/report.scala'), span_uid='customer.orders.datagen.span', xcom_to_event_mapper_ids = ['run_id', 'event_id'], bounded_by_span=True )def monthly_order_aggregate(**context): passCreate Span Using Decorator
A span for a python function can be created by decorating a python function with a span decorator that takes span uid as parameters. Add the **context parameter inside the function argument to decorate it with span. This provides access to the task's context. Various span events can be sent inside the function using the context. The parent span context can be extracted from the context dict using the key name span_context_parent, for example datagen_span_context = context['span context parent']. It will return be a span context instance that can be used to create child spans and send custom events as shown in the below example.
The parameters for a span decorator include:
| Parameter | Description |
|---|---|
| span_uid | A String parameter to specify the UID of the span to be created. It is a mandatory parameter. |
| xcom_to_event_mapper_ids | A parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list. |
from acceldata_airflow_sdk.decorators.span import spanfrom acceldata_sdk.events.generic_event import GenericEvent(span_uid='customer.orders.datagen.span', associated_job_uids = ['monthly.order.aggregate.transfer'], xcom_to_event_mapper_ids = ['run_id', 'event_id'] )def data_gen(**context): datagen_span_context = context['span_context_parent'] customer_datagen_span = datagen_span_context.create_child_span( uid="customer.data.gen", context_data= {'client_time': str(datetime.now()) } ) customer_datagen_span.send_event( GenericEvent( context_data={ 'client_time': str(datetime.now()), 'row_count': len(rows) }, event_uid="order.customer.join.result" ) ) customer_datagen_span.end( context_data={'client_time': str(datetime.now()), 'customers_count': len(customer_ids) } )JobOperator OPERATOR
In some cases, users may prefer to use an Airflow operator such as PostgresOperator or ExecutePolicyOperator provided by the Airflow SDK instead of PythonOperator. A JobOperator has been provided to wrap such tasks, create the corresponding job, and bind it with span in order to instrument them.
JobOperator will execute any std operator passed as an operator parameter, create a job, and send span start and end events. Simply wrap the std operator in a JobOperator. Check that the wrapped operator is not included in the DAG. If the operator is wrapped in a JobOperator, the JobOperator will handle the operator's task within its execution.
As parameters, Node objects should have either asset_uid (data source.asset path from its root) or job_uid (UID of next job).
The parameters for a JobOperator include:
| Parameter | Description |
|---|---|
| span_uid | A String parameter to specify the UID of the span to be created. The default value is None. If span_uid is not provided, a span corresponding to the job will be created with the value job_uid. |
| job_uid | A String parameter to specify the job UID of the pipeline. The default value is None. If job_uid is not provided, uid is constructed using the dag_id, task_id and function name. |
| inputs | An Array parameter of Node type objects being used by the job as input. The default value is an empty array. |
| outputs | An Array parameter of Node type objects being returned by the job as output. The default value is an empty array. |
| metadata | Parameter of type JobMetadata specifying the metadata of the job. The default value is None. |
| xcom_to _vent_mapper_ids | A list parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list. |
| bounded_by_span | A Boolean parameter deciding whether to create a span along with the Job. The default value is True. If it is set to True to create a span, make sure it has **context parameter inside the function argument. Th gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context. |
| operator | A parameter specifying the Standard airflow operator. It is a mandatory parameter. |
Other parameters will be the same as the base operator for the airflow standard. Make sure that the type of object inside a Node has asset_uid (data source.asset path from its root) or job_uid (UID of next Job) as parameters.
Do not specify the dag parameter in the std airflow operator being passed as an argument to JobOperator as the execution of the operator task is taken care of by JobOperator.
If JobOperator is being used to wrap another operator ExecutePolicyOperator in this case, then the ExecutePolicyOperator task should not specify dag argument.
from acceldata_airflow_sdk.operators.job_operator import JobOperatorfrom acceldata_airflow_sdk.operators.execute_policy_operator import ExecutePolicyOperatorfrom acceldata_sdk.models.job import JobMetadata, Nodeimport acceldata_sdk.constants as constsyncoperator_task = ExecutePolicyOperator( task_id='torch_pipeline_syncop_test', rule_type=constants.policy_type, rule_id=constants.policy_id, failure_strategy=const.FailureStrategy.FailOnWarning, sync=True)# Wrap the policy operator with the JobOperator so that a job is created for the policy executionsync_operator_wrap_job_task = JobOperator( task_id='syncoperator_task', inputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customer_orders2')], outputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customer_orders3')], metadata=JobMetadata('name', 'team', 'code_location'), operator=syncoperator_task, dag=dag)SpanOperator OPERATOR
SpanOperator will execute any std operator passed as an operator parameter, create a span, and send span start and end events. Simply wrap the std operator with a span operator. Check that the wrapped operator has not been added to the DAG. If the operator is wrapped by a span operator, the span operator will handle the operator task within its execution.
| Parameter | Description |
|---|---|
| span_uid | A String parameter to specify the UID of the span to be created. The default value is None. If span_uid is not provided, a span corresponding to the job will be created with the value job_uid. |
| xcom_to_vent_mapper___ids | A list parameter having a list of xcom keys used to send xcom variables in span event. The default value is an empty list. |
| operator | A parameter specifying the Standard airflow operator. It is a mandatory parameter. |
Other parameters will be the same as the airflow standard base operator.
Do not specify the dag parameter in std airflow operator being passed as an argument to SpanOperator as the execution of operator task is taken care of by SpanOperator.
from torch_airflow_sdk.operators.span_operator import SpanOperatorget_order_agg_for_q4 = PostgresOperator( task_id="get_monthly_order_aggregate_last_quarter", postgres_conn_id='example_db', sql="select * from information_schema.attributess",)get_order_agg_for_q4 = SpanOperator( task_id="get_monthly_order_aggregate_last_quarter", span_uid='monthly.order.agg.q4.span', operator=get_order_agg_for_q4, associated_job_uids = ['monthly.order.aggregate.transfer'], xcom_to_event_mapper_ids = ['run_id', 'event_id'] , dag=dag)Creating Airflow Connection
If you want to avoid using environment variables, you can create a connection in the Airflow UI as described below, and provide the connection id of that connection in the TorchInitializer. Set the following for the connection:
- Conn id: Create a unique ID for the connection
- Conn Type: HTTP
- Host - URL of the torch catalogue
- Login - API access key generated from torch UI
- Password - API secret key generated from torch UI
- Extra - {"ENABLE_VERSION_CHECK": true}. This value will enable or disable the version compatibility checks between Torch and SDK. The default value is 'True'. To disable the version check, set it to 'False'.
TorchInitializer Operator
At the root of the dag, you must add a task with a TorchInitializer Operator. This operator will build a new pipeline. This will also generate a new pipeline run and root span for that dag run of the Airflow dag.
The parameters for TorchInitializer Operator include:
| Parameter | Description |
|---|---|
| create_pipeline | A Boolean parameter that determines whether to create a pipeline (if one does not already exist) and run the pipeline. The default value is 'True'. If a pipeline or pipeline run was created outside of Airflow DAG, this can be useful. |
| span_name | A string parameter specifying the name of the Root Span. The default value is 'None'. If no name is provided, the pipeline_uid.span will be used as the span name. |
| meta | A parameter that specifies the pipeline's metadata (PipelineMetadata). The default value is 'None'. If it is not provided, PipelineMetadata(owner='sdk/pipeline-user', team='TORCH', codeLocation='...') is set as meta. |
| pipeline_uid | A string parameter specifying the pipeline's UID. It is a required parameter. |
| pipeline_name | A string parameter specifying the name of the pipeline. The default value is 'None'. If it is not provided, pipeline_uid will be used as the name. |
| continuation_id | A string parameter that uniquely identifies a pipeline run. This parameter can accept jinja templates as well. The default value is 'None'. When we want a pipeline run to span multiple DAGs, we can use this parameter. To use it, we must provide a continuation id when creating the pipeline in the first DAG with create_pipeline=True, and then provide the same continuation id when continuing the same pipeline run in the second DAG with create_pipeline=False. |
| connection_id | A string parameter that uniquely identifies a Torch credentials-storing connection. The default value is 'None'. When we want to use Torch credentials from the Airflow connection instead of environment variables, we can use this parameter. Refer to the Creating Airflow Connection section above for more information. |
from acceldata_airflow_sdk.operators.torch_initialiser_operator import TorchInitializerfrom acceldata_sdk.models.pipeline import PipelineMetadata# example of jinja templates being used in continuation_id# jinja template to pull value from config json# continuation_id=f"{{{{ dag_run.conf['continuation_id'] }}}}"# jinja template to pull value from xcom# continuation_id=f"{{{{ task_instance.xcom_pull(key='continuation_id') }}}}"torch_initializer_task = TorchInitializer( task_id='torch_pipeline_initializer', pipeline_uid='customer.orders.monthly.agg.demo', pipeline_name='CUSTOMERS ORDERS MONTHLY AGG', continuation_id='heterogeneous_test', create_pipeline=True, span_name='customer.orders.monthly.agg.demo.span', meta=PipelineMetadata(owner='test', team='testing', codeLocation='...'), connection_id=torch_connection_id, dag=dag)Here is an example demonstrating usage of all the decorators and operators provided in airflow-sdk:
Using Execute Policy Operator
ExecutePolicy OPERATOR
ExecutePolicyOperator is used to execute a policy by passing policy type and policy_id. Only data quality and reconciliation policies are supported for ad-hoc execution using this operator.
The parameters for ExecutePolicyOperator include:
| Parameter | Description |
|---|---|
| sync | A Boolean parameter used to decide if the policy should be executed synchronously or asynchronously. It is a mandatory parameter. If it is set to True it will return only after the execution ends. If it is set to False it will return immediately after starting the execution. |
| policy_type | A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION. |
| policy_id | A String parameter used to specify the policy id to be executed. It is a mandatory parameter. |
| incremental | A Boolean parameter used to specify if the policy execution should be incremental or full. The default value is False. |
| failure_strategy | An enum parameter used to decide the behaviour in case of failure. The default value is DoNotFail.
|
from acceldata_airflow_sdk.operators.execute_policy_operator import ExecutePolicyOperatorfrom acceldata_sdk.constants import FailureStrategy, PolicyTypeoperator_task = ExecutePolicyOperator( task_id='torch_pipeline_operator_test', policy_type=PolicyType.DATA_QUALITY, policy_id=46, sync=True, failure_strategy=FailureStrategy.DoNotFail, dag=dag)ExecutePolicyOperator stores the execution id of the policy executed in xcom using the key {policy_type.name}_{policy_id}_execution_id. Replace the policy_type and policy_id based on the policy.
Hence, to query the result in another task you need to pull the execution id from xcom using the same key {policy_type}_{policy_id}_execution_id.
Query the Result Using get_policy_execution_result
get_policy_execution_result is a helper function that can query the result of policy executed with the operator using the execution id pulled from xcom. In this example, the policy_type is PolicyType.DATA_QUALITY.name and the policy_id is 46.
The parameters for get_polcy_execution_result include:
| Parameter | Description |
|---|---|
| policy_type | A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION. |
| execution_id | A String parameter specifying the execution id for which users want to query the results. It is a mandatory parameter. |
| failure_strategy | An enum parameter used to decide the behaviour in case of failure. The default value is DoNotFail.
|
from acceldata_sdk.torch_client import TorchClientfrom acceldata_airflow_sdk.initialiser import torch_credentialsfrom acceldata_sdk.constants import FailureStrategy, PolicyType, RuleExecutionStatus def ruleoperator_result(**context): xcom_key = f'{PolicyType.DATA_QUALITY.name}_46_execution_id' task_instance = context['ti'] # pull the execution id from xcom if execution_id is not None: # `adoc_connection_id` represents the unique identifier for the connection established # between ADOC and Airflow. torch_client = TorchClient(**torch_credentials(conn_id=adoc_connection_id)) result = torch_client.get_policy_execution_result(policy_type=PolicyType.DATA_QUALITY, execution_id=execution_id, failure_strategy=FailureStrategy.FailOnError) if result.execution.resultStatus == RuleExecutionStatus.ERRORED: print(result.execution.executionError)Circuit Breaker Pattern Based on Policy Execution Result
Users can interrupt DAG execution based on the result of policy execution. For example, if the policy execution encounters errors, the user may wish to exit the DAG execution. Then, failure strategy=FailureStrategy.FailOnError can be set. If policy execution fails, this will result in DAG execution being halted by throwing an exception.
from acceldata_sdk.torch_client import TorchClientfrom acceldata_airflow_sdk.initialiser import torch_credentialsfrom acceldata_sdk.constants import FailureStrategy, PolicyType, RuleExecutionStatusdef ruleoperator_result(**context): xcom_key = f'{PolicyType.DATA_QUALITY.name}_46_execution_id' task_instance = context['ti'] # pull the execution id from xcom execution_id = task_instance.xcom_pull(key=xcom_key) if execution_id is not None: # `adoc_connection_id` represents the unique identifier for the connection established # between ADOC and Airflow. torch_client = TorchClient(**torch_credentials(conn_id=adoc_connection_id)) result = torch_client.get_polcy_execution_result(policy_type=PolicyType.DATA_QUALITY, execution_id=execution_id, failure_strategy=FailureStrategy.FailOnError) if result.execution.resultStatus == RuleExecutionStatus.ERRORED: print(result.execution.executionError)Query the Status Using get_policy_status
get_policy_status is a helper function that can query the current status of the policy executed using the operator.
The parameter for get_policy_status include:
| Parameter | Description |
|---|---|
| policy_type | A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION. |
| execution_id | A String parameter specifying the execution id for which users want to query the results. It is a mandatory parameter. |
You need to pull the execution id from xcom using the same key {policy_type.name}_{policy_id}_execution_id which was pushed by ExecutePolicyOp
Context Switching in Heterogeneous Pipelines
In some ETL pipelines, it is possible to have a DAG dependent on another. In such cases, you may want to represent and track both DAGs as part of a single pipeline in ADOC.
Summary
This guide demonstrates a context switching between two ETL pipelines.

In this example scenario, the ADOC pipeline run begins in dag 1 and ends in dag 2.
DAG1
In dag1 override_success_callback=True is set in the DAG ensuring that the new pipeline_run is not closed at the end of dag1. In addition to that, continuation_id is passed in the TorchInitializer task which will be used to link this run of dag1 to a specific run of dag2.
dag = DAG( dag_id='demo_donot_end_pipeline_run', schedule_interval=None, default_args=default_args, start_date=datetime(2022, 5, 11), on_success_callback=_send_success_event, on_failure_callback=_send_failure_event, override_success_callback=True, override_failure_callback=True)torch_initializer_task = TorchInitializer( task_id='torch_pipeline_initializer', pipeline_uid=pipeline_uid, pipeline_name=pipeline_name, continuation_id=continuation_id, dag=dag, meta=PipelineMetadata(owner='Demo', team='demo_team', codeLocation='...')DAG2
In dag2, create pipeline=False is set in the TorchInitializer task, ensuring that when TorchInitializer is called in dag2, a new pipeline run is not started. The dag2TorchInitializer task is passed the same continuation_id as dag1, which will be used to link this run of dag2 to the run of dag1.
dag = DAG( dag_id='demo_donot_start_pipeline_run', schedule_interval=None, default_args=default_args, start_date=datetime(2022, 5, 11), on_success_callback=_send_success_event, on_failure_callback=_send_failure_event)# create_pipeline=False ensures that a new pipeline_run is not started when TorchInitializer is calledtorch_initializer_task = TorchInitializer( task_id='torch_pipeline_initializer', pipeline_uid=pipeline_uid, pipeline_name=pipeline_name, dag=dag, continuation_id=f"{{{{ task_instance.xcom_pull(key='continuation_id') }}}}", create_pipeline=False)Here is an example demonstrating a context switch happening between two Airflow DAGs. This is a working code example that you can test on your own.
Observing Airflow DAGs with ADOC Listener Plugin
The ADOC Listener plugin integrates Airflow DAGs for automatic observation in ADOC.
The plugin performs the following actions without requiring any additional code in your Airflow DAG, unless you disable instrumentation through environment variables.
When the DAG starts:
- It creates the pipeline if it does not already exist in ADOC.
- It creates a new pipeline run in ADOC.
When a TaskInstance starts:
- It creates jobs in ADOC for each of the airflow operators used in the task.
- It constructs job input nodes based on the upstream tasks.
- It creates a span and associates it with the jobs.
- It emits span events with metadata.
When a TaskInstance is completed:
- It emits span events with metadata.
- It ends the spans with either success or failure.
When the DAG is completed:
- It updates the pipeline run with success or failure in ADOC.
Prerequisites
Ensure to have the following applications installed in your system:
- Python V3.6.0 and above [Download Python]
- Airflow V2.3.0 and above [Apache Airflow]
API keys are essential for authentication when making calls to ADOC.

Configuration
Plugin Environment Variables
The adoc_listener_plugin utilizes the acceldata-sdk to push data to the ADOC backend.
Mandatory Environment Variables
The ADOC client relies on the following environment variables:
TORCH_CATALOG_URL: The URL of the ADOC server.TORCH_ACCESS_KEY: The API access key generated from the ADOC UI.TORCH_SECRET_KEY: The API secret key generated from the ADOC UI.
Optional Environment Variables
By default, all DAGs are observed. However, the following set of environment variables can be used to modify this behavior.
The environment variables for ignoring or observing DAGs are mutually exclusive.
If the following environment variables match with the DAG ids, the observation of the matched DAG ids will be ignored, while all other DAG ids are still observed:
DAGIDS_TO_IGNORE: Comma-separated DAG ids to ignore observation.DAGIDS_REGEX_TO_IGNORE: Regular expression for DAG ids to ignore observation.
If the following environment variables match with the DAG ids, only the observation of those specific DAG ids will be observed, while all other DAG ids will be ignored:
DAGIDS_TO_OBSERVE: Comma-separated dag ids to observe.DAGIDS_REGEX_TO_OBSERVE: Regular expression for DAG ids to observe.
Deployment
- To deploy the plugin on an on-premise instance of Apache Airflow, refer (Link Removed).
- To deploy the plugin on Amazon MWAA, refer (Link Removed).
- To deploy the plugin on Google Cloud Composer, refer (Link Removed).
Enhance Data Reliability with Automated Data Reliability
Consider automating your data reliability which complements this plugin integration by enhancing data reliability and observability without requiring modifications to the pipeline code.
Ways to Observe your DAGs in ADOC
You can observe your pipelines created with Airflow DAGs in ADOC using the following three methods:
- Full Code: In the full code approach, the Airflow Listener Integration is not used to observe our DAGs. If you are using the Airflow Listener Integration, you must disregard automated observation for these DAGs, as explained in the Configuration#configuration section.
- Switching to asset lineage would involve excluding this DAG from the integration and adding complete observability for it.
- If you intend to connect multiple DAGs using the
continuation_idlogic, make sure to disable DAG observation in the configuration when using the listener plugin integration. This is crucial as the integration will conclude the pipeline run upon the termination of the DAG run.
Examples demonstrating the full code approach can be referred here.
Light Code: In the Light Code approach, you should utilize the Airflow Listener Integration while having the flexibility to enhance the code beyond what the plugin offers. This can involve:
- Sending additional span events for the task. You can find an example demonstrating this use case for reference here.
- Invoking policies for the DAG. For examples, refer the following:
No Code: In the No Code approach, you should utilize the Airflow Listener Integration exclusively. An example demonstrating how to observe your DAG, along with its corresponding pipeline, spans, and events, without requiring any additional code can be referred here.
Deploying on On-Prem Apache Airflow
To deploy the plugin on an on-premises instance of Apache Airflow, perform the following:
1. Setup Environment Variables
Configure the required and optional environment variables for the plugin. For a detailed list of these variables and their descriptions, refer to the Configuration#configuration section.
2. Verify Airflow Version
Verify that the Airflow environment where you plan to deploy the plugin is running version 2.5.0 or later. You can check the version by running:
airflow versionConfirm if the output version is 2.5.0 or higher, such as 2.5.3.
3. Install Required Packages
Install the following package on all Airflow components: adoc_airflow_plugin.
Use the appropriate package manager or installation method for your environment to install this dependency.
4. Restart Airflow Components
Restart all Airflow components to apply the plugin changes.
5. Validate the Plugin Installation
Navigate to Admin > Plugins in your Airflow UI. Confirm that the AcceldataListenerPlugin is listed correctly, indicating successful installation.
6. Verify Instrumentation
Once the plugin is installed and the environment is configured, execute a DAG in your Airflow instance. After the DAG run is complete, go to ADOC UI > Pipelines to verify successful instrumentation. Find the pipeline matching your DAG's name; it should appear with its associated spans and events, as shown in the screenshot below.

Deploying on Amazon MWAA
To deploy the plugin on Managed Workflows for Apache Airflow (MWAA), perform the following:
1. Setup Environment Variables
Configure the necessary environment variables within your MWAA environment. For a detailed list of required variables and their values, refer the Configuration#configuration section.
2. Create Plugins.zip File
Package the environment variables into a plugins.zip file. For instructions on creating a custom plugin that generates runtime environment variables, refer Creating a custom plugin that generates runtime environment variables - Amazon Managed Workflows for Apache Airflow.
The following screenshot is a sample env_var_plugin.py file for your reference:

You can either hardcode the environment variables directly in the file or use AWS Secrets Manager to securely store them and configure the plugins.zip file to read from Secrets Manager.
3. Install Required Packages
Check for
requirements.txt: Inspect your DAG code folder in the Amazon S3 bucket to see if arequirements.txtfile is already present.If
requirements.txtdoes not exist:- Upload a new
requirements.txtfile that includes the latest versions of theadoc_airflow_pluginpackage.
- Upload a new
If
requirements.txtexists:- Verify that it includes the
adoc_airflow_pluginpackages. If they are not listed, update the file to include their latest versions.
- Verify that it includes the
4. Update Airflow Environment Packages
Point your MWAA environment to the updated versions of the plugins.zip and requirements.txt files. Apply these changes in the MWAA console to ensure the updated configurations are loaded correctly.
5. Validate the Plugin Installation
Navigate to Admin > Plugins in your Airflow UI. Ensure that both AcceldataListenerPlugin and env_var_plugin are listed, confirming successful installation.
6. Verify Instrumentation
After installing the plugin and configuring the environment, trigger a DAG in your Airflow instance. Once the DAG run is complete, navigate to ADOC UI > Pipelines to confirm successful instrumentation. Locate the pipeline corresponding to your DAG’s name; it should be displayed with its associated spans and events, as shown in the screenshot below.

Deploying on Google Cloud Composer
To deploy the plugin on Google Cloud Composer, perform the following:
1. Setup Environment Variables
Configure the required and optional environment variables for the plugin. For a detailed list of these variables and their descriptions, refer Configuration#configuration. You may add airflow_monitoring to DAGIDS_TO_IGNORE to exclude health check DAGs from being monitored.
2. Install Required Packages
In your Composer environment, navigate to the PYPI PACKAGES tab and add the following package:
- adoc_airflow_plugin
This will install the necessary dependencies for the plugin to function correctly.
3. Validate the Plugin Installation
Navigate to Admin > Plugins in your Airflow UI. Ensure that AcceldataListenerPlugin is listed, indicating successful installation.
4. Verify Instrumentation
After installing the plugin and configuring the environment, trigger a DAG in your Airflow instance. Once the DAG run is complete, navigate to ADOC UI > Pipelines to verify successful instrumentation. Find the pipeline that corresponds to your DAG’s name; it should be displayed with its associated spans and events, as shown in the screenshot below.

##