SDK APIs

Introduction

ADOC is a complete solution for monitoring the quality of data in your data lake and warehouse. You can use ADOC to ensure that your business decisions are supported by high-quality data. ADOC provides tools for measuring the quality of data in a data catalog and ensuring that important data sources are never overlooked. All users, including analysts, data scientists, and developers, can rely on ADOC to monitor data flow in the warehouse or data lake and ensure that no data is lost.

The ADOC catalog and pipeline APIs are triggered by the Acceldata SDK.

Features Provided by Acceldata SDK

Prerequisites

  1. Install the acceldata-sdk PyPI package in a Python environment: pip install acceldata-sdk
  2. Create ADOC client: ADOC clients are used to send data to ADOC servers. They include several methods for communicating with the server. ADOC clients have access to the APIs for data reliability and pipelines. ADOC URLs and API keys are required to create an ADOC client. Go to the ADOC's UI's settings, and generate keys for the client to generate API Keys.
Python
Copy

The mention of Torch denotes ADOC.

Pipeline APIs

Acceldata SDK supports a number of pipeline APIs such as, creating a pipeline, adding jobs and spans, initiating pipeline runs, and so on. During the span life cycle, Acceldata SDK can send various events. As a result, Acceldata SDK has complete control over the pipelines.

Create Pipeline and Job

A pipeline represents the entire ETL (Extract, Transform, Load) pipeline including Asset nodes and Job nodes. The Lineage graph for all data assets is formed by the complete pipeline definition.

A Job node or Process node is an entity that performs a task in the ETL workflow. According to this representation, a job's input is some assets or other jobs, and its output is a few other assets or jobs. ADOC will create the Lineage using the set of jobs definitions in the workflow and track version changes for the Pipeline.

To begin creating pipelines and jobs, create a creation object with the necessary parameters. And, using SDK-supported methods, you can perform corresponding operations on the ADOC server side.

Acceldata SDK (Software Development Kit) provides the CreateJob class which needs to be passed to the create_job function as a parameter to create a job.

The parameters required include:

ParameterDescription
uidUnique Id of the job. It is a mandatory parameter.
nameName of the job. It is a mandatory parameter.
pipeline_run_idId of the pipeline run for which you want to add the job. This parameter is mandatory if a job is created using a pipeline. It is not required if a job is created using a pipeline run.
descriptionDescription of the job.
inputsInput for the job. This could be the uid of an asset specified using the asset uid Node object parameter or it could be the uid of another job specified using the job uid Node object parameter.
outputsOutput for the job. This could be the uid of an asset specified using the asset uid Node object parameter or it could be the uid of another job specified using the job uid Node object parameter.
metaMetadata of the job.
contextContext of the job.
bounded_by_spanThis parameter is of boolean value. If the job needs to be bound by a span, this must be set to True. False is the default value. It is an optional parameter.
span_uidThis is the new span's uid, which is a string. If the value of bounded by span is set to True, this parameter is mandatory.
Python
Copy

Create Pipeline Run and Generate Spans and Send Span Events

A pipeline run denotes the pipeline's execution. The same pipeline can be run multiple times, with each execution (run) producing a new snapshot version. A hierarchical span group exists for each pipeline run. A span is a hierarchical way of grouping a number of metrics. It can be as specific as you want. The APIs will allow you to create a span object from a pipeline object, and then start hierarchical spans from parent spans.

A span is typically a process or a task that can be granular. This hierarchical system is capable of simulating highly complex pipeline observability flows. Optionally, a span can also be associated with a job. This way, we can track the start and completion of the job, as well as failure tracking. For a span, start and stop are implicitly tracked.

Acceldata SDK also allows you to create new pipeline runs and add spans to them. SDK can send some custom and standard span events during the span life cycle to collect pipeline run metrics for observability.

The create_span function's parameters available under a pipeline run are:

ParameterDescription
uidThe uid of the span being created. This should be unique. This is a mandatory parameter.
associatedJobUidsList of job uids with which the span needs to be associated with.
context_dataThis is dict of key-value pair providing custom context information related to a span.

The create_child_span function's parameters which are available under span_context. By placing a span under another span, this is used to create a hierarchy of spans.

ParameterDescription
uidThe uid of the span being created. This should be unique. This is a mandatory parameter.
context_dataList of job uids with which the span needs to be associated with.
associatedJobUidsList of job uids with which the span needs to be associated with.
Python
Copy

Get Latest Pipeline Run

Acceldata SDK can obtain a pipeline run with a specific pipeline run id. The pipeline run instance allows you to continue the ETL pipeline and add spans, jobs, and events. As a result, Acceldata SDK has full access to the ADOC pipeline service.

The parameters required include:

ParameterDescription
pipeline_run_idRun Id of the pipeline run.
continuation_idContinuation Id of the pipeline run.
pipeline_idId of the pipeline to which the run belongs to.
Python
Copy

Get Pipeline Details for a Particular Pipeline Run ID

The Acceldata SDK can obtain pipeline details for a specific pipeline run.

Python
Copy

Get All Spans for a Particular Pipeline Run ID

Acceldata SDK can retrieve all spans for a specific pipeline run id.

Python
Copy

Get Pipeline Runs for a Pipeline

All pipeline runs are accessible via the Acceldata SDK.

The parameters required include:

ParameterDescription
pipeline_idThe ID of the pipeline
Python
Copy

Get All Pipelines

All pipelines are accessible via the Acceldata SDK.

Python
Copy

Delete a Pipeline

Acceldata SDK can delete a pipeline.

Python
Copy

Using Continuation Id to Continue the Same Pipeline Across Different ETL Scripts Example

A new pipeline run is created in this instance of ETL1 using a continuation id, but the pipeline run is not closed.

Python
Copy

ETL2 - This script will carry on the pipeline run that was started by ETL1.

Python
Copy

Datasource APIs

Acceldata SDK also has full access to catalog APIs. ADOC can also crawl more than fifteen different data sources.

Python
Copy

Crawler Operations

You can start the crawler and check the crawler's status.

Python
Copy

Assets APIs

Get Asset Details Using an Asset Identifier like Asset ID/UID

Acceldata SDK includes methods for retrieving assets from a given data source.

Python
Copy

Get All Asset Types in ADOC

Python
Copy

Asset's Tags, Labels, Metadata, and Sample Data

Using the SDK, you can add tags, labels, and custom metadata, as well as obtain sample data for the asset. Tags and labels can be used to quickly filter out assets.

Python
Copy

Executing Asset Profiling, Cancelling Profiles, and Retrieving Latest Profiling Status

Crawled assets can be profiled and sampled using spark jobs that run on Livy. Created policies including Reconciliation and Data Quality policies can also be triggered.

Python
Copy

Here is an example demonstrating how to use all of the pipeline APIs provided by the Python Acceldata SDK:

Pipeline APIs

Trigger Profiling

Note This feature is supported for acceldata_sdk version >=3.8.0.

The StartProfilingRequest class is used to initialize and handle requests for starting profiling operations. This class encapsulates the type of profiling to be executed and the configuration for markers.

ParameterTypeDescriptionDefault Value
profilingTypeExecutionTypeThe type of profiling to be executed. This parameter is mandatory. (SELECTIVE, FULL, INCREMENTAL)N/A
markerConfigMarkerConfigThe configuration for the markers. This parameter is optional and applicable during SELECTIVE profiling. Currently supports BoundsIdMarkerConfig, BoundsFileEventMarkerConfig, and BoundsDateTimeMarkerConfig.None

Trigger Full Profiling

Executes profiling across the entire dataset to provide a comprehensive analysis of all data attributes and characteristics.

Python
Copy

Trigger Incremental Profiling

Executes profiling based on the configured incremental strategy within the asset, focusing on new or modified data to capture changes incrementally.

Python
Copy

Trigger Selective Profiling

Executes profiling over a subset of data, constrained by the selected incremental strategy, targeting specific data attributes or segments for detailed analysis.

  • ID based selective profiling: Uses a monotonically increasing column value to select data bounds. For example, it profiles new rows added after the last profiled row. We use BoundsIdMarkerConfig to selectively profile using this approach.
Python
Copy
  • DateTime based selective profiling: Profiles data using an increasing date column to select data bounds. We use BoundsDateTimeMarkerConfig to selectively profile using this approach.
Python
Copy
  • File event based selective profiling: Profiles data based on file events. We use BoundsFileEventMarkerConfig to selectively profile using this approach.
Python
Copy

Policy APIs

View All Policies, Retrieve Specific Policy, and List Policy Executions

Python
Copy

Execute Policies Synchronously and Asynchronously

Acceldata SDK includes the utility function execute_policy, which can be used to execute policies both synchronously and asynchronously. This will return an object on which get_result and get_status can be called to obtain the execution's result and status.

Parameters for Executing Synchronous Policies

The required parameters include:

ParameterDescription
syncThis is a Boolean parameter that determines whether the policy should be executed synchronously or asynchronously. It is a required parameter. If set to 'True', it will return only after the execution has completed. If 'False', it will return immediately after the execution begins.
policy_typeThe policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
policy_idThe policy id to be executed is specified as a string parameter. It is a required parameter.
incrementalThis is a Boolean parameter that specifies whether policy execution should be incremental or full. The default value is False.
pipeline_run_idThe run id of the pipeline run where the policy is being executed is specified by a long parameter. This can be used to link the execution of the policy with a specific pipeline run.
failure_strategy

The enum parameter is used to determine the behavior in the event of a failure. The default value is DoNotFail.

  • failure_strategy takes enum of type FailureStrategy which can have 3 values: DoNotFail, FailOnError, and FailOnWarning.
  • DoNotFail will never throw and exception. In the event of a failure, the error will be logged.
  • FailOnError will only throw an exception if there is an error. In the event of a warning, it is to return with no errors.
  • FailOnWarning will throw exceptions on both warnings and errors.

To get the execution result, you can call get_policy_execution_result on torch_client or call get_result on the execution object which will return a result object.

Parameters to Retrieve Policy Execution Results

The required parameters include:

ParameterDescription
policy_typeThe policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idThe execution id to be queried for the result is specified as a string parameter. It is a required parameter.
failure_strategyThe enum parameter is used to determine the behavior in the event of a failure. The default value is DoNotFail.

For more information on hard linked and soft linked policies, see Hard Linked and Soft Linked Policies.

To get the current status, you can call get_policy_status on torch_client or call get_status on the execution object which will get the current resultStatus of the execution.

The required parameters include:

ParameterDescription
policy_typeThe policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idThe execution id to be queried for the result is specified as a string parameter. It is a required parameter.

get_status does not take any parameter.

Asynchronous Execution Example

Python
Copy

Synchronous Execution Example

Python
Copy

Cancel Execution Example

Python
Copy

Trigger Policies

Note This feature is supported for acceldata_sdk version >=3.8.0.

The PolicyExecutionRequest class is used to initialize and handle requests for policy execution. This class encapsulates various configurations and parameters required for executing a policy, including execution type, marker configurations, rule item selections, and Spark-specific settings.

ParameterTypeDescriptionDefault Value
executionTypeExecutionTypeThe type of execution. This parameter is mandatory and defines how the policy execution should be carried out. (SELECTIVE, FULL, INCREMENTAL)N/A
markerConfigsOptional[List[AssetMarkerConfig]]A list of marker configurations. This parameter is optional. Useful during selective execution. Currently supports BoundsIdMarkerConfig, BoundsFileEventMarkerConfig, BoundsDateTimeMarkerConfig and TimestampBasedMarkerConfig.None
ruleItemSelectionsOptional[List[int]]A list of rule item selections by their identifiers. This parameter is optional. If not passed all the rule item definitions will be executed.None
includeInQualityScoreboolA flag indicating whether to include the execution in the quality score. This parameter is optional.True
pipelineRunIdOptional[int]The ID of the pipeline run to which the policy execution is attached. This parameter is optional.None
sparkSQLDynamicFilterVariableMappingOptional[List[RuleSparkSQLDynamicFilterVariableMapping]]A list of Spark SQL dynamic filter variable mappings applicable for a Data Quality policy. This parameter is optional.None
sparkResourceConfigOptional[SparkResourceConfig]The configuration for Spark resources. This parameter is optional.None

Data Quality Policy Execution Examples

Trigger Full Data Quality Policy

To execute a full Data Quality policy across the entire dataset:

Python
Copy

Trigger Incremental Data Quality Policy

To execute an incremental Data Quality policy based on a configured incremental strategy:

Python
Copy

Trigger Selective Data Quality Policy

To execute a selective Data Quality policy over a subset of data, as determined by the chosen incremental strategy, you can use sparkSQLDynamicFilterVariableMapping and sparkResourceConfig, as demonstrated in this optional example. Note that sparkSQLDynamicFilterVariableMapping is only relevant if the Data Quality policy includes SQL filters.

  • ID-Based Selective Policy Execution: Uses a monotonically increasing column value to define data boundaries for policy execution, implemented with BoundsIdMarkerConfig.
Python
Copy
  • DateTime-Based Selective Policy Execution: Utilizes an increasing date column to define data boundaries for policy execution, implemented with BoundsDateTimeMarkerConfig
Python
Copy
  • File Event-Based Selective Policy Execution: Uses file events to establish data boundaries for policy execution, implemented with BoundsFileEventMarkerConfig.
Python
Copy
  • Kafka Timestamp-Based Selective Policy Execution: Uses offsets associated with specified timestamps to set data boundaries for policy execution, implemented with TimestampBasedMarkerConfig.
Python
Copy

Reconciliation Policy Execution Examples

Trigger Full Reconciliation Policy

To trigger a full reconciliation policy across the entire dataset:

Python
Copy

Trigger Incremental Reconciliation Policy

To trigger an incremental Reconciliation policy based on a configured incremental strategy:

Python
Copy

Trigger Selective Reconciliation Policy

To trigger a selective Reconciliation policy over a subset of data, constrained by the selected incremental strategy:

  • ID-Based Selective Policy Execution: Uses a monotonically increasing column value to define data boundaries for policy execution, implemented with BoundsIdMarkerConfig.
Python
Copy
  • DateTime-Based Selective Policy Execution: Utilizes an increasing date column to define data boundaries for policy execution, implemented with BoundsDateTimeMarkerConfig.
Python
Copy
  • File Event-Based Selective Policy Execution: Uses file events to establish data boundaries for policy execution, implemented with BoundsFileEventMarkerConfig.
Python
Copy
  • Kafka Timestamp-Based Selective Policy Execution: Uses offsets associated with specified timestamps to set data boundaries for policy execution, implemented with TimestampBasedMarkerConfig.
Python
Copy
Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard