Using ExecutePolicy Operator

ExecutePolicyOperator OPERATOR

ExecutePolicyOperator is used to execute a policy by passing policy type and policy_id. Only data quality and reconciliation policies are supported for ad-hoc execution using this operator.

The parameters for ExecutePolicyOperator include:

ParameterDescription
syncA Boolean parameter used to decide if the policy should be executed synchronously or asynchronously. It is a mandatory parameter. If it is set to True it will return only after the execution ends. If it is set to False it will return immediately after starting the execution.
policy_typeA PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
policy_idA String parameter used to specify the policy id to be executed. It is a mandatory parameter.
incrementalA Boolean parameter used to specify if the policy execution should be incremental or full. The default value is False.
failure_strategy

An enum parameter used to decide the behaviour in case of failure. The default value is DoNotFail.

  • failure_strategy takes enum of type FailureStrategy which can have three values DoNotFail, FailOnError , and FailOnWarning.
  • DoNotFail will never throw. In case of errors in policy execution, it will log the error.
  • FailOnError will Throw an exception only if it's an error. In case of a warning, it will return without any errors.
  • FailOnWarning will throw exceptions on warnings as well as errors.
Python
Copy

ExecutePolicyOperator stores the execution id of the policy executed in xcom using the key {policy_type.name}_{policy_id}_execution_id. Replace the policy_type and policy_id based on the policy.

Hence, to query the result in another task you need to pull the execution id from xcom using the same key {policy_type}_{policy_id}_execution_id.

Query the Result Using get_policy_execution_result

get_policy_execution_result is a helper function that can query the result of policy executed with the operator using the execution id pulled from xcom. In this example, the policy_type is PolicyType.DATA_QUALITY.name and the policy_id is 46.

The parameters for get_polcy_execution_result include:

ParameterDescription
policy_typeA PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idA String parameter specifying the execution id for which users want to query the results. It is a mandatory parameter.
failure_strategy

An enum parameter used to decide the behaviour in case of failure. The default value is DoNotFail.

  • failure_strategy takes enum of type FailureStrategy which can have three values DoNotFail, FailOnError , and FailOnWarning.
  • DoNotFail will never throw. In case of errors in policy execution, it will log the error.
  • FailOnError will Throw an exception only if it's an error. In case of a warning, it will return without any errors.
  • FailOnWarning will throw exceptions on warnings as well as errors.
Python
Copy

Circuit Breaker Pattern Based on Policy Execution Result

Users can interrupt DAG execution based on the result of policy execution. For example, if the policy execution encounters errors, the user may wish to exit the DAG execution. Then, failure strategy=FailureStrategy.FailOnError can be set. If policy execution fails, this will result in DAG execution being halted by throwing an exception.

Python
Copy

Query the Status Using get_policy_status

get_policy_status is a helper function that can query the current status of the policy executed using the operator.

The parameter for get_policy_status include:

ParameterDescription
policy_typeA PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idA String parameter specifying the execution id for which users want to query the results. It is a mandatory parameter.

You need to pull the execution id from xcom using the same key {policy_type.name}_{policy_id}_execution_id which was pushed by ExecutePolicyOperator. Replace the policy_type and policy_id based on the policy. In this example the policy_type is PolicyType.DATA_QUALITY.name and the policy_id is 46.

Python
Copy
Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard