Python
Installation
The following Python modules must be installed in the environment:
Install acceldata-sdk
pip install acceldata-sdkPrerequisites
To make calls to ADOC, API keys are required.
Creating an API Key
You can generate API keys in the ADOC UI's Admin Central by visiting the auto$ section.
Before using acceldata-sdk calls, make sure you have your API keys and ADOC URL handy.
TORCH_CATALOG_URL - URL of the torch catalogueTORCH_ACCESS_KEY - API access key generated from torchTORCH_SECRET_KEY - API secret key generated from torchFeatures Provided by Acceldata SDK
Pipeline: Represents an execution of a data pipelineSpan: Logical collection of various tasksJob: Logical representation of a taskEvent: An event can contain arbitrary process or business data and is transmitted to the ADOC system for future tracking against a Pipeline execution
Minimum Instrumentation Required
Step 1. Create Pipeline and Pipeline Run
A pipeline should be created and a new pipeline run should be started before beginning the data processing code.
You must provide the pipeline_uid, which will be updated in ADOC to track the data pipeline execution.
torch_credentials = { 'url': torch_url, 'access_key': torch_access_key, 'secret_key': torch_secret_key }torch_client = TorchClient(**torch_credentials)meta = PipelineMetadata(owner='sdk/pipeline-user', team='TORCH', codeLocation='...')pipeline_name_ = pipeline_namepipeline = CreatePipeline( uid=pipeline_uid, name=pipeline_name_, description=f'The pipeline {pipeline_name_} has been created from torch-sdk', meta=meta, context={'pipeline_uid': pipeline_uid, 'pipeline_name': pipeline_name_})pipeline_res = torch_client.create_pipeline(pipeline=pipeline)print('pipeline id :: ', pipeline_res.id)pipeline_run = pipeline_res.create_pipeline_run()global pipeline_run_idpipeline_run_id = pipeline_run.idspan_name_ = f'{pipeline_uid}.span'global parent_span_contextparent_span_context = pipeline_run.create_span(uid=span_name_)Tracking Each Task
You must add more instrumentation to the code to allow ADOC to provide a fine-grained view of the data pipeline, as described in the sections below.
Tracking Each Task Using Jobs
Before each function is executed, a job_uid, input, output, and metadata should be passed as arguments to make each task visible as a job in the ADOC pipeline. The task's inputs should be described in the inputs list, and the task's output assets should be represented in the outputs list. A corresponding span can be created in addition to a job to populate the timeline and allow events to be associated with tasks.
def athena_to_redshift(job_uid, inputs, outputs, metadata, context_job, span_uid): span_uid_temp = span_uid pipeline = torch_client.get_pipeline(pipeline_uid) pipeline_run = pipeline.get_run(pipeline_run_id) job = CreateJob( uid=job_uid, name=f'{job_uid} Job', version=pipeline_run.versionId, description=f'{job_uid} created using torch job decorator', inputs=inputs, outputs=outputs, meta=metadata, context=context_job ) job = pipeline.create_job(job) parent_span_context1 = pipeline_run.get_root_span() associated_job_uids = [job_uid] if span_uid is None: span_uid_temp = job_uid span_context = parent_span_context1.create_child_span( uid=span_uid_temp, context_data={ 'time': str(datetime.now()) }, associatedJobUids=associated_job_uids) ...Getting the UID of the Asset to be Used in the Input and Output List

To get the UID of an asset, you must first open an asset in the ADOC UI. A path to the asset is shown in the asset under the Asset name, as shown in the image above. The first part highlighted in green is the data source name, and the remaining items can be used as asset names by using a period as a field separator. The DataSource name in this example is ATHENA-DS, and the asset name is AwsDataCatalog.sampledb.elb_logs.request processing time.
This asset can be used as an input with the following syntax: inputs=[Node(asset_uid='ATHENA-DS.AwsDataCatalog.sampledb.elb_logs.request_processing_time')],
Subdividing a Task into Multiple Spans
You can represent a single task with multiple steps in multiple child spans with create_child_span and send events for those child spans. To create a child span, you must first get the parent span context, which returns us to the root span. You must use the parent span context to call create child span, and it will appear as child span in the ADOC pipelines view.
def athena_to_redshift():... parent_span_context = span_context athena_result_span = parent_span_context.create_child_span( uid="athena.finance.approved_result", context_data={'client_time': str(datetime.now())} ) ... athena_result_span.send_event( GenericEvent( context_data={ 'client_time': str(datetime.now()) }, event_uid="finance.athena.approved_result" ) ) athena_result_span.end( context_data={'client_time': str(datetime.now())} ) .... redshift_upload_span = parent_span_context.create_child_span( uid="redshift.data.approved_upload", context_data={'client_time': str(datetime.now())} ) ... redshift_upload_span.send_event( GenericEvent( context_data={ 'client_time': str(datetime.now()) }, event_uid="finance.redshift.approved_upload" ) ) redshift_upload_span.end( context_data={'client_time': str(datetime.now())} )Linking a Task with Another Task
In previous examples, each pipeline job takes an asset as input and produces another asset as output, which the next job will use as input. Acceldata-sdk uses these to connect jobs in the ADOC pipeline UI. However, there may be times when a task does not produce another asset as an output. In such cases, you can provide a job_uid as output instead of an asset to link the next job.
def syncoperator_result():outputs=[Node(job_uid='quality.customers.athena')]job = CreateJob( uid=job_uid, name=f'{job_uid} Job', pipeline_run_id=pipeline_run.versionId, description=f'{job_uid} created using torch job decorator', inputs=inputs, outputs=outputs, meta=metadata, context=context_job ) ...SDK APIs
ADOC is a complete solution for monitoring the quality of data in your data lake and warehouse. You can use ADOC to ensure that your business decisions are supported by high-quality data. ADOC provides tools for measuring the quality of data in a data catalog and ensuring that important data sources are never overlooked. All users, including analysts, data scientists, and developers, can rely on ADOC to monitor data flow in the warehouse or data lake and ensure that no data is lost.
The ADOC catalog and pipeline APIs are triggered by the Acceldata SDK.
Features Provided by Acceldata SDK
- Pipeline APIs
- Datasource APIs
Prerequisites
- Install the
acceldata-sdkPyPI package in a Python environment:pip install acceldata-sdk - Create ADOC client: ADOC clients are used to send data to ADOC servers. They include several methods for communicating with the server. ADOC clients have access to the APIs for data reliability and pipelines. ADOC URLs and API keys are required to create an ADOC client. Go to the ADOC's UI's settings, and generate keys for the client to generate auto$.
from acceldata_sdk.torch_client import TorchClienttorch_client = TorchClient(url='https://acceldata.host.dev:9999', access_key='******', secret_key='*****************')The mention of Torch denotes ADOC.
Pipeline APIs
Acceldata SDK supports a number of pipeline APIs such as, creating a pipeline, adding jobs and spans, initiating pipeline runs, and so on. During the span life cycle, Acceldata SDK can send various events. As a result, Acceldata SDK has complete control over the pipelines.
Create Pipeline and Job
A pipeline represents the entire ETL (Extract, Transform, Load) pipeline including Asset nodes and Job nodes. The Lineage graph for all data assets is formed by the complete pipeline definition.
A Job node or Process node is an entity that performs a task in the ETL workflow. According to this representation, a job's input is some assets or other jobs, and its output is a few other assets or jobs. ADOC will create the Lineage using the set of jobs definitions in the workflow and track version changes for the Pipeline.
To begin creating pipelines and jobs, create a creation object with the necessary parameters. And, using SDK-supported methods, you can perform corresponding operations on the ADOC server side.
Acceldata SDK (Software Development Kit) provides the CreateJob class which needs to be passed to the create_job function as a parameter to create a job.
The parameters required include:
| Parameter | Description |
|---|---|
| uid | Unique Id of the job. It is a mandatory parameter. |
| name | Name of the job. It is a mandatory parameter. |
| pipeline_run_id | Id of the pipeline run for which you want to add the job. This parameter is mandatory if a job is created using a pipeline. It is not required if a job is created using a pipeline run. |
| description | Description of the job. |
| inputs | Input for the job. This could be the uid of an asset specified using the asset uid Node object parameter or it could be the uid of another job specified using the job uid Node object parameter. |
| outputs | Output for the job. This could be the uid of an asset specified using the asset uid Node object parameter or it could be the uid of another job specified using the job uid Node object parameter. |
| meta | Metadata of the job. |
| context | Context of the job. |
| bounded_by_span | This parameter is of boolean value. If the job needs to be bound by a span, this must be set to True. False is the default value. It is an optional parameter. |
| span_uid | This is the new span's uid, which is a string. If the value of bounded by span is set to True, this parameter is mandatory. |
from acceldata_sdk.torch_client import TorchClientfrom acceldata_sdk.models.job import CreateJob, JobMetadata, Nodefrom acceldata_sdk.models.pipeline import CreatePipeline, PipelineMetadata, PipelineRunResult, PipelineRunStatus# Create pipelinepipeline = CreatePipeline( uid='monthly_reporting_pipeline', name='Monthly reporting Pipeline', description='Pipeline to create monthly reporting tables', meta=PipelineMetadata('Vaishvik', 'acceldata_sdk_code', '...'), context={'key1': 'value1'})torch_client = TorchClient(url="https://torch.acceldata.local", access_key="*******", secret_key="******************************",do_version_check=False)pipeline_response = torch_client.create_pipeline(pipeline=pipeline)pipeline_run = pipeline_response.create_pipeline_run()# Create a job using pipeline object.# Passing of pipeline_run_id is mandatoryjob = CreateJob( uid='monthly_sales_aggregate', name='Monthly Sales Aggregate', description='Generates the monthly sales aggregate tables for the complete year', inputs=[Node(asset_uid='datasource-name.database.schema.table_1')], outputs=[Node(job_uid='job2_uid')], meta=JobMetadata('vaishvik', 'backend', 'https://github.com/'), context={'key21': 'value21'}, bounded_by_span=True, pipeline_run_id=pipeline_run.id, span_uid="test_shubh")job_response = pipeline_response.create_job(job)# Create a job using pipeline_run object.# Passing of pipeline_run_id is not neededjob = CreateJob( uid='monthly_sales_aggregate_r', name='Monthly Sales Aggregate', description='Generates the monthly sales aggregate tables for the complete year', inputs=[Node(asset_uid='datasource-name.database.schema.table_1')], outputs=[Node(job_uid='job2_uid')], meta=JobMetadata('vaishvik', 'backend', 'https://github.com/'), context={'key21': 'value21'})job_response_using_run = pipeline_run.create_job(job)Create Pipeline Run and Generate Spans and Send Span Events
A pipeline run denotes the pipeline's execution. The same pipeline can be run multiple times, with each execution (run) producing a new snapshot version. A hierarchical span group exists for each pipeline run. A span is a hierarchical way of grouping a number of metrics. It can be as specific as you want. The APIs will allow you to create a span object from a pipeline object, and then start hierarchical spans from parent spans.
A span is typically a process or a task that can be granular. This hierarchical system is capable of simulating highly complex pipeline observability flows. Optionally, a span can also be associated with a job. This way, we can track the start and completion of the job, as well as failure tracking. For a span, start and stop are implicitly tracked.
Acceldata SDK also allows you to create new pipeline runs and add spans to them. SDK can send some custom and standard span events during the span life cycle to collect pipeline run metrics for observability.
The create_span function's parameters available under a pipeline run are:
| Parameter | Description |
|---|---|
| uid | The uid of the span being created. This should be unique. This is a mandatory parameter. |
| associatedJobUids | List of job uids with which the span needs to be associated with. |
| context_data | This is dict of key-value pair providing custom context information related to a span. |
The create_child_span function's parameters which are available under span_context. By placing a span under another span, this is used to create a hierarchy of spans.
| Parameter | Description |
|---|---|
| uid | The uid of the span being created. This should be unique. This is a mandatory parameter. |
| context_data | List of job uids with which the span needs to be associated with. |
| associatedJobUids | List of job uids with which the span needs to be associated with. |
from acceldata_sdk.events.generic_event import GenericEventfrom datetime import datetime# create a pipeline run of the pipelinepipeline_run = pipeline_response.create_pipeline_run()# get root span of a pipeline runroot_span = pipeline_run.get_root_span()# create span in the pipeline runspan_context = pipeline_run.create_span(uid='monthly.generate.data.span')# check current span is root or notspan_context.is_root()# end the span span_context.end()# check if the current span has children or notspan_context.has_children()# create a child spanchild_span_context = span_context.create_child_span('monthly.generate.customer.span')# send custom eventchild_span_context.send_event( GenericEvent(context_data={'client_time': str(datetime.now()), 'row_count': 100}, event_uid="order.customer.join.result"))# abort spanchild_span_context.abort()# failed spanchild_span_context.failed()# update a pipeline run of the pipelineupdatePipelineRunRes = pipeline_run.update_pipeline_run(context_data={'key1': 'value2', 'name': 'backend'}, result=PipelineRunResult.SUCCESS, status=PipelineRunStatus.COMPLETED)Get Latest Pipeline Run
Acceldata SDK can obtain a pipeline run with a specific pipeline run id. The pipeline run instance allows you to continue the ETL pipeline and add spans, jobs, and events. As a result, Acceldata SDK has full access to the ADOC pipeline service.
The parameters required include:
| Parameter | Description |
|---|---|
| pipeline_run_id | Run Id of the pipeline run. |
| continuation_id | Continuation Id of the pipeline run. |
| pipeline_id | Id of the pipeline to which the run belongs to. |
pipeline_run = torch_client.get_pipeline_run(pipeline_run_id=pipeline_run_id)pipeline = torch_client.get_pipeline(pipeline_id=pipeline_id)pipeline_run = torch_client.get_pipeline_run(continuation_id=continuation_id, pipeline_id=pipeline.id)pipeline_run = pipeline.get_run(continuation_id=continuation_id)Get Pipeline Details for a Particular Pipeline Run ID
The Acceldata SDK can obtain pipeline details for a specific pipeline run.
pipeline_details = pipeline_run.get_details()Get All Spans for a Particular Pipeline Run ID
Acceldata SDK can retrieve all spans for a specific pipeline run id.
pipeline_run_spans = pipeline_run.get_spans()Get Pipeline Runs for a Pipeline
All pipeline runs are accessible via the Acceldata SDK.
The parameters required include:
| Parameter | Description |
|---|---|
| pipeline_id | The ID of the pipeline |
runs = torch_client.get_pipeline_runs(pipeline_id)runs = pipeline.get_runs()Get All Pipelines
All pipelines are accessible via the Acceldata SDK.
delete_response = pipeline.delete()Delete a Pipeline
Acceldata SDK can delete a pipeline.
delete_response = pipeline.delete()Using Continuation Id to Continue the Same Pipeline Across Different ETL Scripts Example
A new pipeline run is created in this instance of ETL1 using a continuation id, but the pipeline run is not closed.
from acceldata_sdk.torch_client import TorchClientfrom acceldata_sdk.models.pipeline import CreatePipeline, PipelineMetadata, PipelineRunResult, PipelineRunStatus# Create pipelinepipeline_uid = 'monthly_reporting_pipeline'pipeline = CreatePipeline( uid=pipeline_uid, name='Monthly reporting Pipeline', description='Pipeline to create monthly reporting tables', meta=PipelineMetadata('Vaishvik', 'acceldata_sdk_code', '...'), context={'key1': 'value1'})torch_client = TorchClient(url="https://torch.acceldata.local", access_key="*******", secret_key="******************************",do_version_check=False)pipeline_response = torch_client.create_pipeline(pipeline=pipeline)# A new continuation id should be generated on every run. Same continuation id cannot be reused.cont_id = "continuationid_demo_1"pipeline_run = pipeline_response.create_pipeline_run(continuation_id=cont_id)# Make sure pipeline_run is not ended using the update_pipeline_run call so that same run can be used in next ETL scriptETL2 - This script will carry on the pipeline run that was started by ETL1.
from acceldata_sdk.torch_client import TorchClientfrom acceldata_sdk.models.pipeline import PipelineRunResult, PipelineRunStatustorch_client = TorchClient(url="https://torch.acceldata.local", access_key="*******", secret_key="******************************",do_version_check=False)pipeline_uid = 'monthly_reporting_pipeline'# First get the same pipeline using the previously used UID. Then we will get the previously started pipeline_run using the continuation_idpipeline = torch_client.get_pipeline(pipeline_uid)# continuation_id should be a same ID used in ETL1 script so that same pipeline_run is continued in the pipeline.cont_id = "continuationid_demo_1"pipeline_run = pipeline.get_run(continuation_id=cont_id)# Use this pipeline run to create span and jobs# At the end of this script close the pipeline run using update_pipeline_run if we do not want to continue the same pipeline_run furtherupdatePipelineRunRes = pipeline_run.update_pipeline_run(context_data={'key1': 'value2', 'name': 'backend'}, result=PipelineRunResult.SUCCESS, status=PipelineRunStatus.COMPLETED)Datasource APIs
Acceldata SDK also has full access to catalog APIs.
Datasource APIs
ADOC can crawl more than fifteen different data sources.
# Get datasourceds_res = torch_client.get_datasource('snowflake_ds_local')ds_res = torch_client.get_datasource(5, properties=True)# Get datasources based on the typedatasources = torch_client.get_datasources(const.AssetSourceType.SNOWFLAKE)Crawler Operations
You can start the crawler and check the crawler's status.
# Start a crawlerdatasource.start_crawler()torch_client.start_crawler('datasource_name')# Get running crawler statusdatasource.get_crawler_status()torch_client.get_crawler_status('datasource_name')Assets APIs
Get Asset Details Using an Asset Identifier like Asset ID/UID
Acceldata SDK includes methods for retrieving assets from a given data source.
from acceldata_sdk.models.create_asset import AssetMetadata# Get asset by id/uidasset = torchclient.get_asset(1)asset = torch_client.get_asset('Feature_bag_datasource.feature_1')Get All Asset Types in ADOC
asset_types = torch_client.get_all_asset_types()Asset's Tags, Labels, Metadata, and Sample Data
Using the SDK, you can add tags, labels, and custom metadata, as well as obtain sample data for the asset. Tags and labels can be used to quickly filter out assets.
# asset metadatafrom acceldata_sdk.models.tags import AssetLabel, CustomAssetMetadataasset = torch_client.get_asset(asset_id)# Get metadata of an assetasset.get_metadata()# Get all tagstags = asset.get_tags()# Add tag assettag_add = asset.add_tag(tag='asset_tag')# Add asset labelslabels = asset.add_labels(labels=[AssetLabel('test1', 'demo1'), AssetLabel('test2', 'demo2')])# Get asset labelslabels = asset.get_labels()# Add custom metadataasset.add_custom_metadata(custom_metadata=[CustomAssetMetadata('testcm1', 'democm1'), CustomAssetMetadata('testcm2', 'democm2')])# sample datasample_data = asset.sample_data()Executing Asset Profiling, Cancelling Profiles, and Retrieving Latest Profiling Status
Crawled assets can be profiled and sampled using spark jobs that run on Livy. Created policies including Reconciliation and Data Quality policies can also be triggered.
# profile an asset, get profile req details, cancel the profileprofile_res = asset.start_profile(profiling_type=ProfilingType.FULL)profile_req_details = profile_res.get_status()cancel_profile_res = profile_res.cancel()profile_res = asset.get_latest_profile_status()profile_req_details_by_req_id = torch_client.get_profile_status(asset_id=profile_req_details.assetId,req_id=profile_req_details.id)Here is an example demonstrating how to use all of the pipeline APIs provided by the Python Acceldata SDK:
Trigger Profiling
The StartProfilingRequest class is used to initialize and handle requests for starting profiling operations. This class encapsulates the type of profiling to be executed and the configuration for markers.
| Parameter | Type | Description | Default Value |
|---|---|---|---|
profilingType | ExecutionType | The type of profiling to be executed. This parameter is mandatory. (SELECTIVE, FULL, INCREMENTAL) | N/A |
markerConfig | MarkerConfig | The configuration for the markers. This parameter is optional and applicable during SELECTIVE profiling. Currently supports BoundsIdMarkerConfig, BoundsFileEventMarkerConfig, and BoundsDateTimeMarkerConfig. | None |
Trigger Full Profiling
Executes profiling across the entire dataset to provide a comprehensive analysis of all data attributes and characteristics.
from acceldata_sdk.models.common_types import ExecutionTypefrom acceldata_sdk.models.profile import StartProfilingRequestfrom acceldata_sdk.torch_client import TorchClientasset = torch_client.get_asset(identifier=test_const.table_asset_name)profiling_execution_request = StartProfilingRequest( profilingType=ExecutionType.FULL )asset.start_profile(start_profiling_request=profiling_execution_request)Trigger Incremental Profiling
Executes profiling based on the configured incremental strategy within the asset, focusing on new or modified data to capture changes incrementally.
from acceldata_sdk.models.common_types import ExecutionTypefrom acceldata_sdk.models.profile import StartProfilingRequestfrom acceldata_sdk.torch_client import TorchClientasset = torch_client.get_asset(identifier=test_const.table_asset_name)profiling_execution_request = StartProfilingRequest(profilingType=ExecutionType.INCREMENTAL)asset.start_profile(start_profiling_request=profiling_execution_request)Trigger Selective Profiling
Executes profiling over a subset of data, constrained by the selected incremental strategy, targeting specific data attributes or segments for detailed analysis.
- **
ID based selective profiling: Uses a monotonically increasing column value to select data bounds. For example, it profiles new rows added after the last profiled row. We use
BoundsIdMarkerConfigto selectively profile using this approach. - *
from acceldata_sdk.models.common_types import ExecutionType, BoundsIdMarkerConfigfrom acceldata_sdk.models.profile import StartProfilingRequestfrom acceldata_sdk.torch_client import TorchClientmarkerConfig = BoundsIdMarkerConfig(idColumnName="ID", fromId=0, toId=1000)profiling_execution_request = StartProfilingRequest(profilingType=ExecutionType.SELECTIVE, markerConfig=markerConfig)asset.start_profile(start_profiling_request=profiling_execution_request)- DateTime based selective profiling: Profiles data using an increasing date column to select data bounds. We use
BoundsDateTimeMarkerConfigto selectively profile using this approach.
from acceldata_sdk.models.common_types import ExecutionType, BoundsDateTimeMarkerConfigfrom acceldata_sdk.models.profile import StartProfilingRequestfrom acceldata_sdk.torch_client import TorchClientmarkerConfig = BoundsDateTimeMarkerConfig(dateColumnName="TO_DATE", format="yyyy-MM-dd", fromDate="2023-07-01 00:00:00.000", toDate="2024-07-14 23:59:59.999", timeZoneId="Asia/Calcutta")profiling_execution_request = StartProfilingRequest(profilingType=ExecutionType.SELECTIVE, markerConfig=markerConfig)asset.start_profile(start_profiling_request=profiling_execution_request)- File event based selective profiling: Profiles data based on file events. We use
BoundsFileEventMarkerConfigto selectively profile using this approach.
from acceldata_sdk.models.common_types import ExecutionType, BoundsFileEventMarkerConfigfrom acceldata_sdk.models.profile import StartProfilingRequestfrom acceldata_sdk.torch_client import TorchClientmarkerConfig = BoundsFileEventMarkerConfig( fromDate="2019-04-01 00:00:00.000", toDate="2024-07-16 23:59:59.999", timeZoneId="Asia/Calcutta")profiling_execution_request = StartProfilingRequest(profilingType=ExecutionType.SELECTIVE, markerConfig=markerConfig)asset.start_profile(start_profiling_request=profiling_execution_request)Policy APIs
View All Policies, Retrieve Specific Policy, and List Policy Executions
import acceldata_sdk.constants as const# Get policy from acceldata_sdk.models.ruleExecutionResult import RuleType, PolicyFilterrule = torch_client.get_policy(const.PolicyType.RECONCILIATION, "auth001_reconciliation")# List all executions# List executions by policy iddq_rule_executions = torch_client.policy_executions(1114, RuleType.DATA_QUALITY)# List executions by namedq_rule_executions = torch_client.policy_executions('dq-scala', RuleType.DATA_QUALITY)# List executions by policy namerecon_rule_executions = rule.get_executions()filter = PolicyFilter(policyType=RuleType.RECONCILIATION, enable=True)# List all rulesrecon_rules = torch_client.list_all_policies(filter=filter)Execute Policies Synchronously and Asynchronously
Acceldata SDK includes the utility function execute_policy, which can be used to execute policies both synchronously and asynchronously. This will return an object on which get_result and get_status can be called to obtain the execution's result and status.
Parameters for Executing Synchronous Policies
The required parameters include:
| Parameter | Description |
|---|---|
| sync | This is a Boolean parameter that determines whether the policy should be executed synchronously or asynchronously. It is a required parameter. If set to 'True', it will return only after the execution has completed. If 'False', it will return immediately after the execution begins. |
| policy_type | The policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION. |
| policy_id | The policy id to be executed is specified as a string parameter. It is a required parameter. |
| incremental | This is a Boolean parameter that specifies whether policy execution should be incremental or full. The default value is False. |
| pipeline_run_id | The run id of the pipeline run where the policy is being executed is specified by a long parameter. This can be used to link the execution of the policy with a specific pipeline run. |
| failure_strategy | The enum parameter is used to determine the behavior in the event of a failure. The default value is DoNotFail.
|
To get the execution result, you can call get_policy_execution_result on torch_client or call get_result on the execution object which will return a result object.
Parameters to Retrieve Policy Execution Results
The required parameters include:
| Parameter | Description |
|---|---|
| policy_type | The policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION. |
| execution_id | The execution id to be queried for the result is specified as a string parameter. It is a required parameter. |
| failure_strategy | The enum parameter is used to determine the behavior in the event of a failure. The default value is DoNotFail. |
For more information on hard linked and soft linked policies, see Hard Linked and Soft Linked Policies.
To get the current status, you can call get_policy_status on torch_client or call get_status on the execution object which will get the current resultStatus of the execution.
The required parameters include:
| Parameter | Description |
|---|---|
| policy_type | The policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION. |
| execution_id | The execution id to be queried for the result is specified as a string parameter. It is a required parameter. |
get_status does not take any parameter.
Asynchronous Execution Example
from acceldata_sdk.torch_client import TorchClientfrom acceldata_airflow_sdk.initialiser import torch_credentialsimport acceldata_sdk.constants as constfrom acceldata_sdk.constants import FailureStrategytorch_client = TorchClient(**torch_credentials)async_executor = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 46, sync=False, failure_strategy=FailureStrategy.DoNotFail,policy_execution_request=policy_execution_request)# Wait for execution to get the final resultexecution_result = async_executor.get_result(failure_strategy=FailureStrategy.DoNotFail)# Get the current statusexecution_status = async_executor.get_status()Synchronous Execution Example
from acceldata_sdk.torch_client import TorchClientfrom acceldata_airflow_sdk.initialiser import torch_credentialsimport acceldata_sdk.constants as constfrom acceldata_sdk.constants import FailureStrategytorch_client = TorchClient(**torch_credentials)# This will wait for execution to get the final resultsync_executor = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 46, sync=True, failure_strategy=FailureStrategy.DoNotFail)# Wait for execution to get the final resultexecution_result = sync_executor.get_result(FailureStrategy = const.FailureStrategy.DoNotFail)# Get the current statusexecution_status = sync_executor.get_status()Cancel Execution Example
execution_result = sync_executor.cancel()Trigger Policies
The PolicyExecutionRequest class is used to initialize and handle requests for policy execution. This class encapsulates various configurations and parameters required for executing a policy, including execution type, marker configurations, rule item selections, and Spark-specific settings.
| Parameter | Type | Description | Default Value |
|---|---|---|---|
executionType | ExecutionType | The type of execution. This parameter is mandatory and defines how the policy execution should be carried out. (SELECTIVE, FULL, INCREMENTAL) | N/A |
markerConfigs | Optional[List[AssetMarkerConfig]] | A list of marker configurations. This parameter is optional. Useful during selective execution. Currently supports BoundsIdMarkerConfig, BoundsFileEventMarkerConfig, BoundsDateTimeMarkerConfig and TimestampBasedMarkerConfig. | None |
ruleItemSelections | Optional[List[int]] | A list of rule item selections by their identifiers. This parameter is optional. If not passed all the rule item definitions will be executed. | None |
includeInQualityScore | bool | A flag indicating whether to include the execution in the quality score. This parameter is optional. | True |
pipelineRunId | Optional[int] | The ID of the pipeline run to which the policy execution is attached. This parameter is optional. | None |
sparkSQLDynamicFilterVariableMapping | Optional[List[RuleSparkSQLDynamicFilterVariableMapping]] | A list of Spark SQL dynamic filter variable mappings applicable for a Data Quality policy. This parameter is optional. | None |
sparkResourceConfig | Optional[SparkResourceConfig] | The configuration for Spark resources. This parameter is optional. | None |
Data Quality Policy Execution Examples
Trigger Full Data Quality Policy
To execute a full Data Quality policy across the entire dataset:
from acceldata_sdk.models.common_types import ExecutionType,PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientdq_policy = torch_client.get_policy(identifier=dq_policy_name)policy_execution_request = PolicyExecutionRequest(executionType=ExecutionType.FULL)policy_execution_result = torch_client.execute_dq_rule(rule_id=dq_policy.id,policy_execution_request=policy_execution_request)Trigger Incremental Data Quality Policy
To execute an incremental Data Quality policy based on a configured incremental strategy:
from acceldata_sdk.models.common_types import ExecutionType,PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientasset = torch_client.get_asset(identifier=test_const.table_asset_name)policy_execution_request = PolicyExecutionRequest(executionType=ExecutionType.INCREMENTAL)policy_execution_result = torch_client.execute_dq_rule(rule_id=dq_policy.id, policy_execution_request=policy_execution_request)Trigger Selective Data Quality Policy
To execute a selective Data Quality policy over a subset of data, as determined by the chosen incremental strategy, you can use sparkSQLDynamicFilterVariableMapping and sparkResourceConfig, as demonstrated in this optional example. Note that sparkSQLDynamicFilterVariableMapping is only relevant if the Data Quality policy includes SQL filters.
- ID-Based Selective Policy Execution: Uses a monotonically increasing column value to define data boundaries for policy execution, implemented with BoundsIdMarkerConfig.
from acceldata_sdk.models.common_types import ExecutionType,AssetMarkerConfig, YunikornConfig, SparkResourceConfig, \ RuleSparkSQLDynamicFilterVariableMapping, Mapping, BoundsIdMarkerConfig, PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientdq_policy = torch_client.get_policy(identifier=test_const.spark_sql_policy_name)markerConfig = BoundsIdMarkerConfig(idColumnName="ID", fromId=0, toId=1000)#assetId in the marker configuration refers to the unique identifier of the underlying asset on which the Data Quality (DQ) policy is established.assetMarkerConfig = AssetMarkerConfig(assetId=9667404, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]yunikornConfig = YunikornConfig(minExecutors=1, maxExecutors=2, executorCores=2, executorMemory="2g", driverCores=2, driverMemory="2g")sparkResourceConfig = SparkResourceConfig(yunikorn=yunikornConfig, additionalConfiguration={})ruleItemSelections = []mapping = Mapping(key="column_name", isColumnVariable=True, value="100")sparkSQLDynamicFilterVariableMapping = [RuleSparkSQLDynamicFilterVariableMapping( ruleName="SelectiveDQPolicysparkSQLDynamicFilterVariable", mapping=[mapping])]policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE, sparkResourceConfig=sparkResourceConfig, sparkSQLDynamicFilterVariableMapping=sparkSQLDynamicFilterVariableMapping, ruleItemSelections=ruleItemSelections)torch_client.execute_dq_rule(rule_id=dq_policy.id, policy_execution_request=policy_execution_request)- DateTime-Based Selective Policy Execution: Utilizes an increasing date column to define data boundaries for policy execution, implemented with
BoundsDateTimeMarkerConfig
from acceldata_sdk.models.common_types import ExecutionType, BoundsDateTimeMarkerConfig, AssetMarkerConfig, PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientdq_policy = torch_client.get_policy(identifier=dq_policy_name)markerConfig = BoundsDateTimeMarkerConfig(dateColumnName="TO_DATE", format="yyyy-MM-dd", fromDate="2023-07-01 00:00:00.000", toDate="2024-07-14 23:59:59.999", timeZoneId="Asia/Calcutta")#assetId in the marker configuration refers to the unique identifier of the underlying asset on which the Data Quality (DQ) policy is established.assetMarkerConfig = AssetMarkerConfig(assetId=9667404, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE)policy_execution_result = torch_client.execute_dq_rule(rule_id=dq_policy.id, policy_execution_request=policy_execution_request)- File Event-Based Selective Policy Execution: Uses file events to establish data boundaries for policy execution, implemented with
BoundsFileEventMarkerConfig.
from acceldata_sdk.models.common_types import ExecutionType, AssetMarkerConfig, BoundsFileEventMarkerConfig, PolicyExecutionRequest from acceldata_sdk.torch_client import TorchClientdq_policy = torch_client.get_policy(identifier=dq_policy_name)markerConfig = BoundsFileEventMarkerConfig( fromDate="2024-07-01 00:00:00.000", toDate="2024-07-01 23:59:59.999", timeZoneId="Asia/Calcutta")#assetId in the marker configuration refers to the unique identifier of the underlying asset on which the Data Quality (DQ) policy is established.assetMarkerConfig = AssetMarkerConfig(assetId=1202688, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE)policy_execution_result = torch_client.execute_dq_rule(rule_id=dq_policy.id, policy_execution_request=policy_execution_request)- Kafka Timestamp-Based Selective Policy Execution: Uses offsets associated with specified timestamps to set data boundaries for policy execution, implemented with
TimestampBasedMarkerConfig.
from acceldata_sdk.models.common_types import ExecutionType, AssetMarkerConfig, TimestampBasedMarkerConfig, PolicyExecutionRequest from acceldata_sdk.torch_client import TorchClientdq_policy = torch_client.get_policy(identifier="kafka_dq_policy")markerConfig = TimestampBasedMarkerConfig( format="yyyy-mm-dd", initialOffset="2023-06-01", timeZoneId="Asia/Calcutta")#assetId in the marker configuration refers to the unique identifier of the underlying asset on which the Data Quality (DQ) policy is established.assetMarkerConfig = AssetMarkerConfig(assetId=5241961, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE)policy_execution_result = torch_client.execute_dq_rule(rule_id=dq_policy.id, policy_execution_request=policy_execution_request)Reconciliation Policy Execution Examples
Trigger Full Reconciliation Policy
To trigger a full reconciliation policy across the entire dataset:
from acceldata_sdk.models.common_types import ExecutionType,PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientdq_policy = torch_client.get_policy(identifier=dq_policy_name)policy_execution_request = PolicyExecutionRequest(executionType=ExecutionType.FULL)policy_execution_result = torch_client.execute_reconciliation_rule(rule_id=dq_policy.id,policy_execution_request=policy_execution_request)Trigger Incremental Reconciliation Policy
To trigger an incremental Reconciliation policy based on a configured incremental strategy:
from acceldata_sdk.models.common_types import ExecutionType, PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientasset = torch_client.get_asset(identifier=test_const.table_asset_name)policy_execution_request = PolicyExecutionRequest(executionType=ExecutionType.INCREMENTAL)policy_execution_result = torch_client.execute_reconciliation_rule(rule_id=dq_policy.id, policy_execution_request=policy_execution_request)Trigger Selective Reconciliation Policy
To trigger a selective Reconciliation policy over a subset of data, constrained by the selected incremental strategy:
- ID-Based Selective Policy Execution: Uses a monotonically increasing column value to define data boundaries for policy execution, implemented with BoundsIdMarkerConfig.
from acceldata_sdk.models.common_types import ExecutionType,AssetMarkerConfig, YunikornConfig, SparkResourceConfig, Mapping, PolicyExecutionRequest, BoundsIdMarkerConfigfrom acceldata_sdk.torch_client import TorchClientrecon_policy = torch_client.get_policy(identifier=test_const.recon_policy_name)markerConfig = BoundsIdMarkerConfig(idColumnName="ID", fromId=0, toId=1000)#assetId in the marker configuration denotes the unique identifier of the underlying asset on which the Reconciliation policy is based. This could represent either the left or right asset ID.assetMarkerConfig = AssetMarkerConfig(assetId=9667404, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]yunikornConfig = YunikornConfig(minExecutors=1, maxExecutors=2, executorCores=2, executorMemory="2g", driverCores=2, driverMemory="2g")sparkResourceConfig = SparkResourceConfig(yunikorn=yunikornConfig, additionalConfiguration={})ruleItemSelections = []policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE, sparkResourceConfig=sparkResourceConfig, sparkSQLDynamicFilterVariableMapping=sparkSQLDynamicFilterVariableMapping, ruleItemSelections=ruleItemSelections)torch_client.execute_reconciliation_rule(rule_id=recon_policy.id, policy_execution_request=policy_execution_request)- DateTime-Based Selective Policy Execution: Utilizes an increasing date column to define data boundaries for policy execution, implemented with
BoundsDateTimeMarkerConfig.
from acceldata_sdk.models.common_types import ExecutionType, BoundsDateTimeMarkerConfig, AssetMarkerConfig, PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientrecon_policy = torch_client.get_policy(identifier=recon_policy_name)markerConfig = BoundsDateTimeMarkerConfig(dateColumnName="TO_DATE", format="yyyy-MM-dd", fromDate="2023-07-01 00:00:00.000", toDate="2024-07-14 23:59:59.999", timeZoneId="Asia/Calcutta")#assetId in the marker configuration denotes the unique identifier of the underlying asset on which the Reconciliation policy is based. This could represent either the left or right asset ID.assetMarkerConfig = AssetMarkerConfig(assetId=9667404, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE)policy_execution_result = torch_client.execute_reconciliation_rule(rule_id=recon_policy.id, policy_execution_request=policy_execution_request)- File Event-Based Selective Policy Execution: Uses file events to establish data boundaries for policy execution, implemented with
BoundsFileEventMarkerConfig.
from acceldata_sdk.models.common_types import ExecutionType,AssetMarkerConfig, BoundsFileEventMarkerConfig, PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientrecon_policy = torch_client.get_policy(identifier=dq_policy_name)markerConfig = BoundsFileEventMarkerConfig( fromDate="2024-07-01 00:00:00.000", toDate="2024-07-01 23:59:59.999", timeZoneId="Asia/Calcutta")#assetId in the marker configuration denotes the unique identifier of the underlying asset on which the Reconciliation policy is based. This could represent either the left or right asset ID.assetMarkerConfig = AssetMarkerConfig(assetId=1202688, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE)policy_execution_result = torch_client.execute_reconciliation_rule(rule_id=recon_policy.id, policy_execution_request=policy_execution_request)- Kafka Timestamp-Based Selective Policy Execution: Uses offsets associated with specified timestamps to set data boundaries for policy execution, implemented with
TimestampBasedMarkerConfig.
from acceldata_sdk.models.common_types import ExecutionType,AssetMarkerConfig, TimestampBasedMarkerConfig, PolicyExecutionRequestfrom acceldata_sdk.torch_client import TorchClientrecon_policy = torch_client.get_policy(identifier="kafka_recon_policy")markerConfig = TimestampBasedMarkerConfig( format="yyyy-mm-dd", initialOffset="2023-06-01", timeZoneId="Asia/Calcutta")#assetId in the marker configuration denotes the unique identifier of the underlying asset on which the Reconciliation policy is based. This could represent either the left or right asset ID.assetMarkerConfig = AssetMarkerConfig(assetId=5241961, markerConfig=markerConfig)markerConfigs = [assetMarkerConfig]policy_execution_request = PolicyExecutionRequest( markerConfigs=markerConfigs, executionType=ExecutionType.SELECTIVE)policy_execution_result = torch_client.execute_reconciliation_rule(rule_id=recon_policy.id, policy_execution_request=policy_execution_request)External Integrations
In scenarios where direct pipeline instrumentation encounters challenges, our updated capabilities offer a flexible approach. While real-time tracking traditionally assumes instant span creation, this may not align with cases involving pipeline execution details sourced from databases or APIs.
To address this, we have optimized our API or SDK to efficiently load pipeline monitoring metadata independently of the platform's ongoing activity.
Prerequisites
Supported by the acceldata_sdk version, commencing from acceldata-sdk.2.12.0.
No Real-Time Alerts: Real-time alerts are triggered only for current activities. Historical loads will not trigger the following real-time alerts.
Pipeline Alerts
- Pipeline Duration: Set alerts based on user-defined thresholds.
- Pipeline Start Time: Configure alerts based on user-defined thresholds.
- Pipeline End Time: Establish alerts based on user-defined thresholds.
Job Alerts
- Job Duration: Set alerts based on user-defined thresholds.
- Job Start Time: Configure alerts based on user-defined thresholds.
- Job End Time: Establish alerts based on user-defined thresholds.
Span Alerts
- Span Duration: Set alerts based on user-defined thresholds.
- Span Start Time: Configure alerts based on user-defined thresholds.
- Span End Time: Establish alerts based on user-defined thresholds.
Event Based Alerts: Evaluated as soon as the span events are received.
Post-Processing Alerts: Avoid configuring post-processing alerts for historical loads; allocate them for upcoming data flows. The following are the post-processing alerts:
Pipeline Alerts
- Pipeline Duration: Set alerts based on previous executions.
- Pipeline Start Time: Configure alerts based on previous executions.
- Pipeline End Time: Establish alerts based on previous executions.
Job Alerts
- Job Duration: Set alerts based on previous executions.
- Job Start Time: Configure alerts based on previous executions.
- Job End Time : Establish alerts based on previous executions.
Span Alerts
- Span Duration: Set alerts based on previous executions.
- Span Start Time: Configure alerts based on previous executions.
- Span End Time: Establish alerts based on previous executions.
Event Based Alerts: Evaluated as soon as the span events are received, making it applicable for historical processing.
Creating a Historical Pipeline
- Creating a pipeline with explicit times: While creating a pipeline for historical load, the
createdAtfield specifying the pipeline creation time needs to be passed.
#Add necessary imports hereexplicit_pipeline_createdAt = datetime(2023, 11, 14, 12, 30, 0) meta = PipelineMetadata(owner='sdk/pipeline-user', team='ADOC', codeLocation='...')pipeline_name_ = pipeline_namepipeline = CreatePipeline( uid=pipeline_uid, name=pipeline_name_, description=f'The pipeline {pipeline_name_} has been created from acceldata-sdk using External integration', meta=meta, context={'pipeline_uid': pipeline_uid, 'pipeline_name': pipeline_name_}, createdAt=explicit_pipeline_createdAt )pipeline_res = torch_client.create_pipeline(pipeline=pipeline)- Updating a pipeline with explicit times: When updating any details of the pipeline for historical load, the
updatedAtfield needs to be passed.
#Add necessary imports hereexplicit_pipeline_updatedAt = datetime(2023, 11, 14, 12, 50, 0)meta = PipelineMetadata(owner='sdk/pipeline-user', team='ADOC', codeLocation='...')pipeline_name_ = pipeline_namepipeline = CreatePipeline( uid=pipeline_uid, name=pipeline_name_, description=f'The pipeline {pipeline_name_} has been updated from acceldata-sdk using External integration', meta=meta, context={'pipeline_uid': pipeline_uid, 'pipeline_name': pipeline_name_}, updatedAt=explicit_pipeline_updatedAt )pipeline_res = torch_client.create_pipeline(pipeline=pipeline)- Creating Pipeline run with explicit times: The
startedAtparameter needs to be set with the historical pipeline run creation time.
#Add necessary imports here#Consuming pipeline_res object from step 1 aboveexplicit_pipeline_run_startedAt = datetime(2023, 11, 14, 13, 30, 0)pipeline_run = pipeline_res.create_pipeline_run(startedAt=explicit_pipeline_run_startedAt)- Creating spans to support sending span events with explicit times: In order to enable start span events to consume the historical time the flag
with_explicit_timeparameter needs to be set toTruewhile span creation. If this parameter is not set, spans will be created and the span start event will be sent with the current time.
#Consuming the pipeline_run object created in the step 3 abovespan_name_=f'{pipeline_uid}.root.span'parent_span_context = pipeline_run.create_span(uid=span_name_, with_explicit_time=True)- Sending span events with explicit times: The historical span event start/end/failed/abort times can be passed using the
created_atparameter.
#Sending span start event#consuming the parent_span_context created in the step 4 aboveroot_span_created_at = datetime(2023, 11, 14, 13, 30, 1)parent_span_context.start(created_at=root_span_created_at)- Creating jobs bound by span with explicit times: When jobs bound by span are created, ensure that the bounded span supports explicit times. To enable the span start event corresponding to the bounded span with the job, the
with_explicit_timeparameter needs to be set, else the span will be bound and start with the current time.
#Creating Job bounded by spanjob_uid="customers.data-generation"inputs=[]outputs=[Node(job_uid='customers.s3-upload')]job_span_uid=f'{job_uid} Span'job = CreateJob( uid="job_uid", name=f'{job_uid} Job', pipeline_run_id=pipeline_run.id, description=f'{job_uid} created using torch job decorator', inputs=inputs, outputs=outputs, bounded_by_span=True, span_uid=job_span_uid, with_explicit_time=True )created_job = pipeline_run.create_job(job)job_span_context = pipeline_run.get_span(job_span_uid)job_span_context.start(created_at= datetime(2023, 11, 14, 13, 40, 1))job_span_context.send_event(GenericEvent(context_data={'Size':100, 'total_file': 1, 'schema': 'name,address,dept_id'}, event_uid="customers.data.generation.metadata", created_at=datetime(2023, 11, 14, 13, 45, 1)))job_span_context.end(created_at=datetime(2023, 11, 14, 13, 48, 1))- Updating pipeline run with explicit times: To end the pipeline run with historical time, the
finishedAtparameter needs to be set, otherwise the span will end with the current time.
#Updating the pipeline_run object created in the step 3#Ending the root span for the pipeline runroot_span_finishedAt = datetime(2023, 11, 14, 13, 50, 1)parent_span_context.end(created_at=root_span_finishedAt)explicit_pipeline_run_finishedAt = datetime(2023, 11, 14, 13, 50, 1)#Ending the pipeline runpipeline_run.update_pipeline_run( context_data={'status': 'success'}, result=PipelineRunResult.SUCCESS, status=PipelineRunStatus.COMPLETED, finishedAt=explicit_pipeline_run_finishedAt )Here is a snapshot of the pipeline reconstructed back in time using the above code: