Pipeline APIs
The Acceldata SDK provides robust support for a diverse array of pipeline APIs, covering everything from pipeline creation to job and span addition, as well as the initiation of pipeline runs and more. With capabilities extending throughout the entire span lifecycle, the SDK exercises complete control over the entire pipeline infrastructure, generating various events as needed.
Each pipeline encapsulates the ETL (Extract, Transform, Load) process, constituting Asset nodes and associated Jobs. The collective definition of the pipeline shapes the lineage graph, providing a comprehensive view of data assets across the system.
Within this framework, a Job Node, or Process Node, acts as a pivotal component, executing specific tasks within the ETL workflow. These jobs receive inputs from various assets or other jobs and produce outputs that seamlessly integrate into additional assets or jobs. ADOC harnesses this network of job definitions within the workflow to construct lineage and meticulously monitors version changes for the pipeline, ensuring a dynamic and efficient data processing environment.
Retrieve All Pipelines
All pipelines can be accessed through the Acceldata SDK.
List<PipelineInfo> pipelineInfos = adocClient.getPipelines();
Creating a Pipeline
This section pertains to the process of creating a pipeline using the Acceldata SDK. Creating a pipeline in the Acceldata SDK is a fundamental task in managing data workflows. To accomplish this, you need to construct a CreatePipeline
object that encapsulates the essential information required for pipeline creation. Two mandatory parameters that must be defined when creating a pipeline are the Uid (Unique Identifier) and the Name.
By specifying the Uid and Name, you provide unique identification and a recognizable label to the new pipeline. Once the CreatePipeline
object is properly configured, it can be used as input to initiate the pipeline creation process, allowing you to efficiently set up and manage your data workflows within Acceldata.
//Create Pipeline
CreatePipeline createPipeline = CreatePipeline.builder()
.uid("adoc_etl_pipeline").name("ADOC ETL Pipeline")
.description("ADOC ETL Pipeline ").build();
Pipeline pipeline = adocClient.createPipeline(createPipeline);
Retrieve a Pipeline
To access a specific pipeline, you can provide either the pipeline UID (Unique Identifier) or the pipeline ID as a parameter for retrieval. This flexibility allows you to easily obtain the details of a desired pipeline.
//Get Pipeline
Pipeline pipeline = adocClient.getPipeline("adoc_etl_pipeline");
Deleting a Pipeline
You can initiate the deletion process by using the code snippet below:
String identity="Pass pipeline id or pipeline uid here"
Pipeline pipeline = adocClient.getPipeline(identity);
adocClient.deletePipeline(pipeline.getId());
Retrieve Pipeline Runs for a Pipeline
You can access all the pipeline runs associated with a specific pipeline. There are two ways to accomplish this:
- Using the pipeline object:
pipeline.getRuns()
- Utilizing the AdocClient:
adocClient.getPipelineRuns(pipeline.getId())
Both methods allow you to retrieve information about pipeline runs.
//Using pipeline object
pipeline.getRuns();
//Using Adocclient
List<PipelineRun> runs = adocClient.getPipelineRuns(pipeline.getId());
Initiating a Pipeline Run
A pipeline run signifies the execution of a pipeline. It's important to note that a single pipeline can be executed multiple times, with each execution, referred to as a "run," resulting in a new snapshot version. This process captures the evolving state of the data within the pipeline and allows for effective monitoring and tracking of changes over time.
To initiate a pipeline run, you can use the Pipeline
object. When creating a run, there are two optional parameters that you can include:
optionalContextData
: Additional context data that can be passed.optionalContinuationId
: A continuation identifier, which is also an optional parameter. It is an identifier used to link multiple ETL pipelines together as a single pipeline in the ADOC UI.
Here's an example of how to create a pipeline run using the Acceldata SDK:
Optional<Map<String, Object>> optionalContextData = Optional.empty();
Optional<String> optionalContinuationId = Optional.empty();
PipelineRun pipelineRun = pipeline.createPipelineRun(optionalContextData, optionalContinuationId);
Getting the Latest Run of the Pipeline
You can obtain the latest run of a pipeline using either the Pipeline
object or the AdocClient
from the adoc-java-sdk
. Here are two ways to achieve this:
//Using pipeline object
PipelineRun latestRun = pipeline.getLatestPipelineRun();
//Using the AdocClient
PipelineRun latestRun = adocClient.getLatestPipelineRun(pipelineId);
Getting a Particular Run of the Pipeline
You can retrieve a particular run of a pipeline by passing the unique pipeline run ID as a parameter.
//Using Pipeline object
PipelineRun pipelineRun = pipeline.getRun(25231L);
adocClient.getRun(25231L);
//Using the AdocClient
PipelineRun pipelineRun = adocClient.getPipelineRun(25231L)
Updating a Pipeline Run
The updatePipelineRun
method is responsible for updating the attributes of a pipeline run, such as its result, status, and optional context data. It accepts the following parameters:
result
: APipelineRunResult
indicating the updated result of the pipeline run.status
: APipelineRunStatus
indicating the updated status of the pipeline run.optionalContextData
: An optional map that can be used to pass additional context data for the update.
PipelineRunResult result = PipelineRunResult.SUCCESS;
PipelineRunStatus status = PipelineRunStatus.COMPLETED;
Optional<Map<String, Object>> optionalContextData = Optional.empty();
pipelineRun.updatePipelineRun(result, status, optionalContextData);
Create Job Using Pipeline
When creating a job using a Pipeline
object, there are mandatory parameters that must be provided, including uid
, name
, and pipelineRunId
. The adoc-java-sdk
library offers the CreateJob
class, which serves as the parameter for job creation. The required parameters are as follows:
uid
: A unique identifier for the job.name
: The name of the job.pipelineRunId
: The ID of the associated pipeline run.
Several other parameters are available to define and configure the job. Below is a description of these parameters:
Parameter | Description |
---|---|
uid | Unique ID of the job. It is a mandatory parameter. |
name | Name of the job. It is a mandatory parameter. |
pipelineRunId | The pipeline run's ID is required when you want to add a job. This is mandatory if a job is made using a pipeline object. However, it's optional if the job is created using a pipeline run. |
description | Description of the job. It is an optional parameter. |
inputs | Job input can take various forms. It might be the uid of an asset, specified using the asset_uid and source parameters in the Node object. Alternatively, it could be the uid of another job, specified using the jobUid parameter in the Node object. |
outputs | Job output comes in different forms. It may be the uid of an asset, specified through the asset_uid and source parameters in the Node object. Alternatively, it could be the uid of another job, specified through the jobUid parameter in the Node object. |
meta | It is the metadata of the job and is an optional parameter. |
context | It is the context of the job and is an optional parameter. |
boundedBySpan | This parameter is of boolean value. If the job needs to be bound by a span, this must be set to true . The default value for this parameter is false . It is an optional parameter. |
spanUid | This is the new span's uid, which is a string. If the value of bounded by span i.e. boundedBySpan is set to true , this parameter is mandatory. |
import acceldata.adoc.client.models.job.Node;
// Fetching the pipeline created using the UID "adoc_etl_pipeline"
Pipeline pipeline = adocClient.getPipeline("adoc_etl_pipeline");
// Creating a run for the pipeline to be consumed during job creation
PipelineRun pipelineRun = pipeline.createPipelineRun(Optional.empty(), Optional.empty());
// Creating a Job
// 1. Constructing job inputs
List<Node> inputs = Arrays.asList(Node.builder().asset_uid("datasource-name.database.schema.table_1").source("MySql").build());
// 2. Constructing job outputs
List<Node> outputs = Arrays.asList(Node.builder().jobUid("transform_data").build());
// 3. Constructing job metadata
JobMetadata jobMetadata = JobMetadata.builder().owner("Jake")
.team("Backend").codeLocation("https://github.com/").build();
// 4. Constructing a context map
Map<String, Object> contextMap = new HashMap<>();
contextMap.put("key21", "value21");
// 5. Getting the latest run of the pipeline to be passed to the CreateJob object
PipelineRun latestRun = pipeline.getLatestPipelineRun();
// 6. Setting up CreateJob parameters
CreateJob createJob = CreateJob.builder()
.uid("source_data")
.name("Source Data")
.description("Source Data Job")
.inputs(inputs)
.outputs(outputs)
.meta(jobMetadata)
.context(contextMap)
.boundedBySpan(true)
.spanUid("source_data.span")
.pipelineRunId(latestRun.getId())
.build();
// 7. Creating a Job
acceldata.adoc.client.models.node.Node jobNode = pipeline.createJob(createJob);
Spans
Hierarchical spans are a way to organize and track tasks within pipeline runs for observability. Each pipeline run has its own set of spans. Spans are like containers for measuring how tasks are performed. They can be very detailed, allowing us to keep a close eye on specific jobs and processes in the pipeline.
Spans can also be connected to jobs, which help us monitor when jobs start, finish, or encounter issues. Spans keep track of when a task begins and ends, giving us insight into how long each task takes.
The Acceldata SDK makes it easy to create new pipeline runs and add spans. It also helps us generate custom events and metrics for observing the performance of pipeline runs. In simple terms, hierarchical spans and the Acceldata SDK give us the tools to closely monitor and analyze the execution of tasks in our pipelines.
Creating Span for a Pipeline Run
The createSpan
function, used within the context of a pipeline run, offers the following parameters:
Parameter | Description |
---|---|
uid | This parameter represents the unique identifier (UID) for the newly created span. It is a mandatory field, and each span should have a distinct UID. |
associatedJobUids | This parameter accepts a list of job UIDs that should be associated with the span. In the case of creating a span for a pipeline run, an empty list can be provided since spans are typically associated with jobs. |
optionalContextData | This parameter is a dictionary that allows you to provide custom key-value pairs containing context information relevant to the span. It can be used to add additional details or metadata associated with the span. |
// Define associated job UIDs
List<String> associatedJobUids = Arrays.asList("jobUid1");
// Create a context map
Map<String, Object> contextMap = new HashMap<>();
contextMap.put("key21", "value21");
// Wrap the context map in an Optional
Optional<Map<String, Object>> optionalContextData = Optional.of(contextMap);
// Create a new span context within the pipeline run
SpanContext spanContext = pipelineRun.createSpan("adoc_etl_pipeline_root_span", optionalContextData, associatedJobUids);
Obtaining the Root Span for a Pipeline Run
In this code snippet, the getRootSpan
method is used to retrieve the root span associated with a specific pipeline run. The root span represents the top-level span within the execution hierarchy of the pipeline run. It provides essential context and observability data for the entire run. The resulting SpanContext
object, stored in the rootSpanContext
variable, represents the root span and can be used for further analysis and observability purposes. This code simplifies the process of accessing critical information about the pipeline run's top-level span.
SpanContext rootSpanContext = pipelineRun.getRootSpan();
Checking if the Current Span is a Root Span
A root span is the top-level span that encompasses the entire task or process hierarchy.
The isRoot()
method is used to make this determination. If the span is a root span, the isRootSpan
variable is set to true
, indicating its top-level status within the span hierarchy. This check provides insights into the context of the current span, which can be valuable for decision-making or monitoring in complex workflows.
boolean isRootSpan = spanContext.isRoot();
Creating Child Span
The createChildSpan
function within SpanContext
is used to create a hierarchical structure of spans by placing a span under another span. It offers the following parameters:
Parameter | Description |
---|---|
uid | This parameter represents the unique identifier (UID) for the newly created child span. It is a mandatory field, and each child span should have a distinct UID. |
optionalContextData | This parameter is a dictionary that allows you to provide custom key-value pairs containing context information relevant to the child span. It can be used to add additional details or metadata associated with the span. |
associatedJobUids | This parameter accepts a list of job UIDs that should be associated with the child span. This association allows you to link the child span with specific jobs. |
SpanContext spanContext = pipelineRun.getRootSpan();
Optional<Map<String, Object>> optionalContextData = Optional.empty();
List<String> associatedJobUids = Arrays.asList("source_data");
spanContext.createChildSpan("source_data.span", optionalContextData, associatedJobUids );
Retrieving Spans for a Given Pipeline Run
This code snippet demonstrates how to retrieve all the spans associated with a specific pipeline run. It begins by obtaining a reference to a PipelineRun
object using the getRun
method on a Pipeline
object and providing the pipeline run's ID as a parameter. Subsequently, it calls the getSpans
method on the pipelineRun
object, which returns a list of Span
objects related to the specified pipeline run. This functionality is valuable for accessing and analyzing span data to monitor the execution of the pipeline run.
//Get all the spans for a given pipeline run
PipelineRun pipelineRun = pipeline.getRun(25232L);
List<Span> spans = pipelineRun.getSpans();
Sending Span Events
The SDK has the capability to send both custom and standard span events at different stages of a span's lifecycle. These events play a crucial role in gathering metrics related to pipeline run observability.
// Step 1: Prepare context data for the event
Map<String, Object> contextData = new HashMap<>();
contextData.put("client_time", DateTime.now()); // Capture client time
contextData.put("row_count", 100); // Include the row count
// Step 2: Send a generic event for the span
// Create a generic event with the event name "source_data.span_start" and the context data
GenericEvent genericEvent = GenericEvent.createEvent("source_data.span_start", contextData);
// Send the created event to the child span (assumed to be represented by childSpanContext)
childSpanContext.sendEvent(genericEvent);
This code snippet demonstrates the usage of the abort
and failed
methods within a SpanContext
to send events for failed or aborted spans. It begins by preparing context data with information such as client time and row count. Including context data in these events provides essential information for tracking and handling span failures and aborts, aiding in observability and diagnostic purposes.
// Step 1: Prepare context data for the event
Map<String, Object> contextData = new HashMap<>();
contextData.put("client_time", DateTime.now()); // Capture client time
contextData.put("row_count", 100); // Include the row count
// Step 2: Fail Span
// Send a failed event for the span and include the context data
spanContext.failed(Optional.of(contextData));
// or
// Step 2: Abort Span
// Send an abort event for the span and include the context data
spanContext.abort(Optional.of(contextData));
Ending the Span
We can end the span by using end method on the SpanContext object and passing the optionalContextData
.
//Context data
Map<String, Object> contextData = new HashMap<>();
contextData.put("client_time", DateTime.now());
contextData.put("row_count", 100);
spanContext.end(Optional.of(contextData));
In conclusion, the Acceldata SDK equips users with powerful tools to manage and monitor data pipelines effectively. It enables seamless pipeline creation, job management, and hierarchical span organization, enhancing observability and control over data workflows. With the ability to generate custom events, the SDK offers a comprehensive solution for optimizing pipeline operations and ensuring data integrity.