Python

Installation

The following Python modules must be installed in the environment:

Install acceldata-sdk

Bash
Copy

Prerequisites

To make calls to ADOC, API keys are required.

Creating an API Key

You can generate API keys in the ADOC UI's Admin Central by visiting the auto$ section.

Before using acceldata-sdk calls, make sure you have your API keys and ADOC URL handy.

Bash
Copy

Features Provided by Acceldata SDK

  • Pipeline: Represents an execution of a data pipeline
  • Span: Logical collection of various tasks
  • Job: Logical representation of a task
  • Event: An event can contain arbitrary process or business data and is transmitted to the ADOC system for future tracking against a Pipeline execution

Minimum Instrumentation Required

Step 1. Create Pipeline and Pipeline Run

A pipeline should be created and a new pipeline run should be started before beginning the data processing code.

You must provide the pipeline_uid, which will be updated in ADOC to track the data pipeline execution.

Python
Copy

Tracking Each Task

You must add more instrumentation to the code to allow ADOC to provide a fine-grained view of the data pipeline, as described in the sections below.

Tracking Each Task Using Jobs

Before each function is executed, a job_uid, input, output, and metadata should be passed as arguments to make each task visible as a job in the ADOC pipeline. The task's inputs should be described in the inputs list, and the task's output assets should be represented in the outputs list. A corresponding span can be created in addition to a job to populate the timeline and allow events to be associated with tasks.

Python
Copy

Getting the UID of the Asset to be Used in the Input and Output List

To get the UID of an asset, you must first open an asset in the ADOC UI. A path to the asset is shown in the asset under the Asset name, as shown in the image above. The first part highlighted in green is the data source name, and the remaining items can be used as asset names by using a period as a field separator. The DataSource name in this example is ATHENA-DS, and the asset name is AwsDataCatalog.sampledb.elb_logs.request processing time.

This asset can be used as an input with the following syntax: inputs=[Node(asset_uid='ATHENA-DS.AwsDataCatalog.sampledb.elb_logs.request_processing_time')],

Subdividing a Task into Multiple Spans

You can represent a single task with multiple steps in multiple child spans with create_child_span and send events for those child spans. To create a child span, you must first get the parent span context, which returns us to the root span. You must use the parent span context to call create child span, and it will appear as child span in the ADOC pipelines view.

Python
Copy

Linking a Task with Another Task

In previous examples, each pipeline job takes an asset as input and produces another asset as output, which the next job will use as input. Acceldata-sdk uses these to connect jobs in the ADOC pipeline UI. However, there may be times when a task does not produce another asset as an output. In such cases, you can provide a job_uid as output instead of an asset to link the next job.

Python
Copy

SDK APIs

ADOC is a complete solution for monitoring the quality of data in your data lake and warehouse. You can use ADOC to ensure that your business decisions are supported by high-quality data. ADOC provides tools for measuring the quality of data in a data catalog and ensuring that important data sources are never overlooked. All users, including analysts, data scientists, and developers, can rely on ADOC to monitor data flow in the warehouse or data lake and ensure that no data is lost.

The ADOC catalog and pipeline APIs are triggered by the Acceldata SDK.

Features Provided by Acceldata SDK

  • Pipeline APIs
  • Datasource APIs

Prerequisites

  1. Install the acceldata-sdk PyPI package in a Python environment: pip install acceldata-sdk
  2. Create ADOC client: ADOC clients are used to send data to ADOC servers. They include several methods for communicating with the server. ADOC clients have access to the APIs for data reliability and pipelines. ADOC URLs and API keys are required to create an ADOC client. Go to the ADOC's UI's settings, and generate keys for the client to generate auto$.
Python
Copy

The mention of Torch denotes ADOC.

Pipeline APIs

Acceldata SDK supports a number of pipeline APIs such as, creating a pipeline, adding jobs and spans, initiating pipeline runs, and so on. During the span life cycle, Acceldata SDK can send various events. As a result, Acceldata SDK has complete control over the pipelines.

Create Pipeline and Job

A pipeline represents the entire ETL (Extract, Transform, Load) pipeline including Asset nodes and Job nodes. The Lineage graph for all data assets is formed by the complete pipeline definition.

A Job node or Process node is an entity that performs a task in the ETL workflow. According to this representation, a job's input is some assets or other jobs, and its output is a few other assets or jobs. ADOC will create the Lineage using the set of jobs definitions in the workflow and track version changes for the Pipeline.

To begin creating pipelines and jobs, create a creation object with the necessary parameters. And, using SDK-supported methods, you can perform corresponding operations on the ADOC server side.

Acceldata SDK (Software Development Kit) provides the CreateJob class which needs to be passed to the create_job function as a parameter to create a job.

The parameters required include:

ParameterDescription
uidUnique Id of the job. It is a mandatory parameter.
nameName of the job. It is a mandatory parameter.
pipeline_run_idId of the pipeline run for which you want to add the job. This parameter is mandatory if a job is created using a pipeline. It is not required if a job is created using a pipeline run.
descriptionDescription of the job.
inputsInput for the job. This could be the uid of an asset specified using the asset uid Node object parameter or it could be the uid of another job specified using the job uid Node object parameter.
outputsOutput for the job. This could be the uid of an asset specified using the asset uid Node object parameter or it could be the uid of another job specified using the job uid Node object parameter.
metaMetadata of the job.
contextContext of the job.
bounded_by_spanThis parameter is of boolean value. If the job needs to be bound by a span, this must be set to True. False is the default value. It is an optional parameter.
span_uidThis is the new span's uid, which is a string. If the value of bounded by span is set to True, this parameter is mandatory.
Python
Copy

Create Pipeline Run and Generate Spans and Send Span Events

A pipeline run denotes the pipeline's execution. The same pipeline can be run multiple times, with each execution (run) producing a new snapshot version. A hierarchical span group exists for each pipeline run. A span is a hierarchical way of grouping a number of metrics. It can be as specific as you want. The APIs will allow you to create a span object from a pipeline object, and then start hierarchical spans from parent spans.

A span is typically a process or a task that can be granular. This hierarchical system is capable of simulating highly complex pipeline observability flows. Optionally, a span can also be associated with a job. This way, we can track the start and completion of the job, as well as failure tracking. For a span, start and stop are implicitly tracked.

Acceldata SDK also allows you to create new pipeline runs and add spans to them. SDK can send some custom and standard span events during the span life cycle to collect pipeline run metrics for observability.

The create_span function's parameters available under a pipeline run are:

ParameterDescription
uidThe uid of the span being created. This should be unique. This is a mandatory parameter.
associatedJobUidsList of job uids with which the span needs to be associated with.
context_dataThis is dict of key-value pair providing custom context information related to a span.

The create_child_span function's parameters which are available under span_context. By placing a span under another span, this is used to create a hierarchy of spans.

ParameterDescription
uidThe uid of the span being created. This should be unique. This is a mandatory parameter.
context_dataList of job uids with which the span needs to be associated with.
associatedJobUidsList of job uids with which the span needs to be associated with.
Python
Copy

Get Latest Pipeline Run

Acceldata SDK can obtain a pipeline run with a specific pipeline run id. The pipeline run instance allows you to continue the ETL pipeline and add spans, jobs, and events. As a result, Acceldata SDK has full access to the ADOC pipeline service.

The parameters required include:

ParameterDescription
pipeline_run_idRun Id of the pipeline run.
continuation_idContinuation Id of the pipeline run.
pipeline_idId of the pipeline to which the run belongs to.
Python
Copy

Get Pipeline Details for a Particular Pipeline Run ID

The Acceldata SDK can obtain pipeline details for a specific pipeline run.

Python
Copy

Get All Spans for a Particular Pipeline Run ID

Acceldata SDK can retrieve all spans for a specific pipeline run id.

Python
Copy

Get Pipeline Runs for a Pipeline

All pipeline runs are accessible via the Acceldata SDK.

The parameters required include:

ParameterDescription
pipeline_idThe ID of the pipeline
Python
Copy

Get All Pipelines

All pipelines are accessible via the Acceldata SDK.

Python
Copy

Delete a Pipeline

Acceldata SDK can delete a pipeline.

Python
Copy

Using Continuation Id to Continue the Same Pipeline Across Different ETL Scripts Example

A new pipeline run is created in this instance of ETL1 using a continuation id, but the pipeline run is not closed.

Python
Copy

ETL2 - This script will carry on the pipeline run that was started by ETL1.

Python
Copy

Datasource APIs

Acceldata SDK also has full access to catalog APIs.

Datasource APIs

ADOC can crawl more than fifteen different data sources.

Python
Copy

Crawler Operations

You can start the crawler and check the crawler's status.

Python
Copy

Assets APIs

Get Asset Details Using an Asset Identifier like Asset ID/UID

Acceldata SDK includes methods for retrieving assets from a given data source.

Python
Copy

Get All Asset Types in ADOC

Python
Copy

Asset's Tags, Labels, Metadata, and Sample Data

Using the SDK, you can add tags, labels, and custom metadata, as well as obtain sample data for the asset. Tags and labels can be used to quickly filter out assets.

Python
Copy

Executing Asset Profiling, Cancelling Profiles, and Retrieving Latest Profiling Status

Crawled assets can be profiled and sampled using spark jobs that run on Livy. Created policies including Reconciliation and Data Quality policies can also be triggered.

Python
Copy

Here is an example demonstrating how to use all of the pipeline APIs provided by the Python Acceldata SDK:

Pipeline APIs

Trigger Profiling

Note This feature is supported for acceldata_sdk version >=3.8.0.

The StartProfilingRequest class is used to initialize and handle requests for starting profiling operations. This class encapsulates the type of profiling to be executed and the configuration for markers.

ParameterTypeDescriptionDefault Value
profilingTypeExecutionTypeThe type of profiling to be executed. This parameter is mandatory. (SELECTIVE, FULL, INCREMENTAL)N/A
markerConfigMarkerConfigThe configuration for the markers. This parameter is optional and applicable during SELECTIVE profiling. Currently supports BoundsIdMarkerConfig, BoundsFileEventMarkerConfig, and BoundsDateTimeMarkerConfig.None

Trigger Full Profiling

Executes profiling across the entire dataset to provide a comprehensive analysis of all data attributes and characteristics.

Python
Copy

Trigger Incremental Profiling

Executes profiling based on the configured incremental strategy within the asset, focusing on new or modified data to capture changes incrementally.

Python
Copy

Trigger Selective Profiling

Executes profiling over a subset of data, constrained by the selected incremental strategy, targeting specific data attributes or segments for detailed analysis.

  • ** ID based selective profiling: Uses a monotonically increasing column value to select data bounds. For example, it profiles new rows added after the last profiled row. We use BoundsIdMarkerConfig to selectively profile using this approach.
  • *
Python
Copy
  • DateTime based selective profiling: Profiles data using an increasing date column to select data bounds. We use BoundsDateTimeMarkerConfig to selectively profile using this approach.
Python
Copy
  • File event based selective profiling: Profiles data based on file events. We use BoundsFileEventMarkerConfig to selectively profile using this approach.
Python
Copy

Policy APIs

View All Policies, Retrieve Specific Policy, and List Policy Executions

Python
Copy

Execute Policies Synchronously and Asynchronously

Acceldata SDK includes the utility function execute_policy, which can be used to execute policies both synchronously and asynchronously. This will return an object on which get_result and get_status can be called to obtain the execution's result and status.

Parameters for Executing Synchronous Policies

The required parameters include:

ParameterDescription
syncThis is a Boolean parameter that determines whether the policy should be executed synchronously or asynchronously. It is a required parameter. If set to 'True', it will return only after the execution has completed. If 'False', it will return immediately after the execution begins.
policy_typeThe policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
policy_idThe policy id to be executed is specified as a string parameter. It is a required parameter.
incrementalThis is a Boolean parameter that specifies whether policy execution should be incremental or full. The default value is False.
pipeline_run_idThe run id of the pipeline run where the policy is being executed is specified by a long parameter. This can be used to link the execution of the policy with a specific pipeline run.
failure_strategy

The enum parameter is used to determine the behavior in the event of a failure. The default value is DoNotFail.

  • failure_strategy takes enum of type FailureStrategy which can have 3 values: DoNotFail, FailOnError, and FailOnWarning.
  • DoNotFail will never throw and exception. In the event of a failure, the error will be logged.
  • FailOnError will only throw an exception if there is an error. In the event of a warning, it is to return with no errors.
  • FailOnWarning will throw exceptions on both warnings and errors.

To get the execution result, you can call get_policy_execution_result on torch_client or call get_result on the execution object which will return a result object.

Parameters to Retrieve Policy Execution Results

The required parameters include:

ParameterDescription
policy_typeThe policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idThe execution id to be queried for the result is specified as a string parameter. It is a required parameter.
failure_strategyThe enum parameter is used to determine the behavior in the event of a failure. The default value is DoNotFail.

For more information on hard linked and soft linked policies, see Hard Linked and Soft Linked Policies.

To get the current status, you can call get_policy_status on torch_client or call get_status on the execution object which will get the current resultStatus of the execution.

The required parameters include:

ParameterDescription
policy_typeThe policy type is specified using an enum parameter. It is a required parameter. It is an enum that will accept constant values as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.
execution_idThe execution id to be queried for the result is specified as a string parameter. It is a required parameter.

get_status does not take any parameter.

Asynchronous Execution Example

Python
Copy

Synchronous Execution Example

Python
Copy

Cancel Execution Example

Python
Copy

Trigger Policies

Note This feature is supported for acceldata_sdk version >=3.8.0.

The PolicyExecutionRequest class is used to initialize and handle requests for policy execution. This class encapsulates various configurations and parameters required for executing a policy, including execution type, marker configurations, rule item selections, and Spark-specific settings.

ParameterTypeDescriptionDefault Value
executionTypeExecutionTypeThe type of execution. This parameter is mandatory and defines how the policy execution should be carried out. (SELECTIVE, FULL, INCREMENTAL)N/A
markerConfigsOptional[List[AssetMarkerConfig]]A list of marker configurations. This parameter is optional. Useful during selective execution. Currently supports BoundsIdMarkerConfig, BoundsFileEventMarkerConfig, BoundsDateTimeMarkerConfig and TimestampBasedMarkerConfig.None
ruleItemSelectionsOptional[List[int]]A list of rule item selections by their identifiers. This parameter is optional. If not passed all the rule item definitions will be executed.None
includeInQualityScoreboolA flag indicating whether to include the execution in the quality score. This parameter is optional.True
pipelineRunIdOptional[int]The ID of the pipeline run to which the policy execution is attached. This parameter is optional.None
sparkSQLDynamicFilterVariableMappingOptional[List[RuleSparkSQLDynamicFilterVariableMapping]]A list of Spark SQL dynamic filter variable mappings applicable for a Data Quality policy. This parameter is optional.None
sparkResourceConfigOptional[SparkResourceConfig]The configuration for Spark resources. This parameter is optional.None

Data Quality Policy Execution Examples

Trigger Full Data Quality Policy

To execute a full Data Quality policy across the entire dataset:

Python
Copy

Trigger Incremental Data Quality Policy

To execute an incremental Data Quality policy based on a configured incremental strategy:

Python
Copy

Trigger Selective Data Quality Policy

To execute a selective Data Quality policy over a subset of data, as determined by the chosen incremental strategy, you can use sparkSQLDynamicFilterVariableMapping and sparkResourceConfig, as demonstrated in this optional example. Note that sparkSQLDynamicFilterVariableMapping is only relevant if the Data Quality policy includes SQL filters.

  • ID-Based Selective Policy Execution: Uses a monotonically increasing column value to define data boundaries for policy execution, implemented with BoundsIdMarkerConfig.
Python
Copy
  • DateTime-Based Selective Policy Execution: Utilizes an increasing date column to define data boundaries for policy execution, implemented with BoundsDateTimeMarkerConfig
Python
Copy
  • File Event-Based Selective Policy Execution: Uses file events to establish data boundaries for policy execution, implemented with BoundsFileEventMarkerConfig.
Python
Copy
  • Kafka Timestamp-Based Selective Policy Execution: Uses offsets associated with specified timestamps to set data boundaries for policy execution, implemented with TimestampBasedMarkerConfig.
Python
Copy

Reconciliation Policy Execution Examples

Trigger Full Reconciliation Policy

To trigger a full reconciliation policy across the entire dataset:

Python
Copy

Trigger Incremental Reconciliation Policy

To trigger an incremental Reconciliation policy based on a configured incremental strategy:

Python
Copy

Trigger Selective Reconciliation Policy

To trigger a selective Reconciliation policy over a subset of data, constrained by the selected incremental strategy:

  • ID-Based Selective Policy Execution: Uses a monotonically increasing column value to define data boundaries for policy execution, implemented with BoundsIdMarkerConfig.
Python
Copy
  • DateTime-Based Selective Policy Execution: Utilizes an increasing date column to define data boundaries for policy execution, implemented with BoundsDateTimeMarkerConfig.
Python
Copy
  • File Event-Based Selective Policy Execution: Uses file events to establish data boundaries for policy execution, implemented with BoundsFileEventMarkerConfig.
Python
Copy
  • Kafka Timestamp-Based Selective Policy Execution: Uses offsets associated with specified timestamps to set data boundaries for policy execution, implemented with TimestampBasedMarkerConfig.
Python
Copy

External Integrations

In scenarios where direct pipeline instrumentation encounters challenges, our updated capabilities offer a flexible approach. While real-time tracking traditionally assumes instant span creation, this may not align with cases involving pipeline execution details sourced from databases or APIs.

To address this, we have optimized our API or SDK to efficiently load pipeline monitoring metadata independently of the platform's ongoing activity.

Prerequisites

Supported by the acceldata_sdk version, commencing from acceldata-sdk.2.12.0.

Note The Control Plane must also be on version >= 2.12.0.

  • No Real-Time Alerts: Real-time alerts are triggered only for current activities. Historical loads will not trigger the following real-time alerts.

    • Pipeline Alerts

      • Pipeline Duration: Set alerts based on user-defined thresholds.
      • Pipeline Start Time: Configure alerts based on user-defined thresholds.
      • Pipeline End Time: Establish alerts based on user-defined thresholds.
    • Job Alerts

      • Job Duration: Set alerts based on user-defined thresholds.
      • Job Start Time: Configure alerts based on user-defined thresholds.
      • Job End Time: Establish alerts based on user-defined thresholds.
    • Span Alerts

      • Span Duration: Set alerts based on user-defined thresholds.
      • Span Start Time: Configure alerts based on user-defined thresholds.
      • Span End Time: Establish alerts based on user-defined thresholds.
    • Event Based Alerts: Evaluated as soon as the span events are received.

  • Post-Processing Alerts: Avoid configuring post-processing alerts for historical loads; allocate them for upcoming data flows. The following are the post-processing alerts:

    • Pipeline Alerts

      • Pipeline Duration: Set alerts based on previous executions.
      • Pipeline Start Time: Configure alerts based on previous executions.
      • Pipeline End Time: Establish alerts based on previous executions.
    • Job Alerts

      • Job Duration: Set alerts based on previous executions.
      • Job Start Time: Configure alerts based on previous executions.
      • Job End Time : Establish alerts based on previous executions.
    • Span Alerts

      • Span Duration: Set alerts based on previous executions.
      • Span Start Time: Configure alerts based on previous executions.
      • Span End Time: Establish alerts based on previous executions.
    • Event Based Alerts: Evaluated as soon as the span events are received, making it applicable for historical processing.

Creating a Historical Pipeline

  1. Creating a pipeline with explicit times: While creating a pipeline for historical load, the createdAt field specifying the pipeline creation time needs to be passed.
Python
Copy
  1. Updating a pipeline with explicit times: When updating any details of the pipeline for historical load, the updatedAt field needs to be passed.
Python
Copy
  1. Creating Pipeline run with explicit times: The startedAt parameter needs to be set with the historical pipeline run creation time.
Python
Copy
  1. Creating spans to support sending span events with explicit times: In order to enable start span events to consume the historical time the flag with_explicit_time parameter needs to be set to True while span creation. If this parameter is not set, spans will be created and the span start event will be sent with the current time.
Python
Copy
  1. Sending span events with explicit times: The historical span event start/end/failed/abort times can be passed using the created_at parameter.
Python
Copy
  1. Creating jobs bound by span with explicit times: When jobs bound by span are created, ensure that the bounded span supports explicit times. To enable the span start event corresponding to the bounded span with the job, the with_explicit_time parameter needs to be set, else the span will be bound and start with the current time.
Python
Copy
  1. Updating pipeline run with explicit times: To end the pipeline run with historical time, the finishedAt parameter needs to be set, otherwise the span will end with the current time.
Python
Copy

Here is a snapshot of the pipeline reconstructed back in time using the above code:

Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard
On This Page
PythonInstallationPrerequisitesCreating an API KeyFeatures Provided by Acceldata SDKMinimum Instrumentation RequiredStep 1. Create Pipeline and Pipeline RunTracking Each TaskTracking Each Task Using JobsGetting the UID of the Asset to be Used in the Input and Output ListSubdividing a Task into Multiple SpansLinking a Task with Another TaskSDK APIsFeatures Provided by Acceldata SDKPrerequisitesPipeline APIsCreate Pipeline and JobCreate Pipeline Run and Generate Spans and Send Span EventsGet Latest Pipeline RunGet Pipeline Details for a Particular Pipeline Run IDGet All Spans for a Particular Pipeline Run IDGet Pipeline Runs for a PipelineGet All PipelinesDelete a PipelineUsing Continuation Id to Continue the Same Pipeline Across Different ETL Scripts ExampleDatasource APIsDatasource APIsCrawler OperationsAssets APIsGet Asset Details Using an Asset Identifier like Asset ID/UIDGet All Asset Types in ADOCAsset's Tags, Labels, Metadata, and Sample DataExecuting Asset Profiling, Cancelling Profiles, and Retrieving Latest Profiling StatusTrigger ProfilingTrigger Full ProfilingTrigger Incremental ProfilingTrigger Selective ProfilingPolicy APIsView All Policies, Retrieve Specific Policy, and List Policy ExecutionsExecute Policies Synchronously and AsynchronouslyParameters for Executing Synchronous PoliciesParameters to Retrieve Policy Execution ResultsAsynchronous Execution ExampleSynchronous Execution ExampleCancel Execution ExampleTrigger PoliciesData Quality Policy Execution ExamplesReconciliation Policy Execution ExamplesExternal IntegrationsPrerequisitesCreating a Historical Pipeline