Transform Pipelines in Data Prep Kit

In our previous blog post (https://thealliance.ai/blog/architecture-of-data-prep-kit-framework), we provided an overview of the Data Prep Kit (DPK) architecture. This post demonstrates how Kubeflow Pipelines (KFP) enables the automation of DPK transform executions on Kubernetes. We will begin by detailing the infrastructure components that underpin the DPK implementation.
Infrastructure for running DPK pipelines
To run DPK pipelines, you need a Kubernetes cluster with the following components installed:
- KFP pipelines – The runtime and UI for the execution of the KFP. Currently, there are two versions of KFP – KFP v1 and KFP v2. DPK supports both.
- KubeRay operator – An operator for managing Ray clusters on Kubernetes.
- KubeRay API server – An API server that interacts with the KubeRay operator and Ray clusters. It provides simple REST APIs for creating and managing Ray clusters and submitting remote jobs to these clusters.
The overall infrastructure is presented in Figure 1
Figure 1: Infrastructure for running DPK pipelines
This infrastructure can be implemented using various Kubernetes clusters, ranging from a local Kind cluster to a multi-node Kubernetes cluster deployed in the cloud. The DPK documentation provide detailed instructions on creating such a cluster.
To simplify this complex and highly distributed system, DPK introduces a set of libraries and KFP-shared components designed to facilitate the writing of KFP pipelines.
Libraries supporting KFP pipelines
DPK introduced three libraries for supporting the implementation of KFP pipelines:
Shared workflow support
This library is independent of the KFP version and provides various utility functions for Ray remote job operations and KFP.
The implementation of the Ray remote jobs operation is based on the Python client for KubeRay’s API server. Since this API is not available through any PyPI package, we have incorporated it directly into the library. It provides a set of higher-level operations including:
- create Ray cluster - based on the cluster name, namespace, and parameters for head and worker Ray nodes
- delete Ray cluster - based on the cluster name and namespace
- submit Ray job - based on the cluster name and namespace and remote job definition
- follow remote job execution - based on cluster name, and namespace and job submission ID. This method waits for Ray remote job completion, periodically printing intermediate execution results to the job execution log.
The usage of these operations significantly simplifies the programming of the operations on the Ray cluster.
KFP utils provide some commonly used utility functions for KFP development. One of the more interesting functions, compute_execution_params, is used to compute the number of Ray actors required for DPK transforms’ execution based on the cluster resources and actors’ resource requirements.
KFP v1 and v2 workflow support
As KFP V1 and V2 offer slightly different APIs, DPK provides two libraries with identical functionality each tailored to a specific KFP API version. These libraries assist in creating and configuring KFP components and provide helper functions for managing KFP pipelines, such as “upload_pipeline”, “delete_pipeline”, “get_pipeline_by_name”, and others. Additionally, they include test support libraries for running a complete pipeline test
Both libraries offer reusable components that handle the coreoperations required for building DPK KFP pipelines.
DPK KFP components
DPK includes three main KFP shared components: createRayClusterComponent, executeRayJobComponent, and deleteRayClusterComponent. Their names are self-explanatory. These components facilitate the execution transforms, which can be represented as a simple pipeline, as shown in Figure 2:
Figure 2: Simple KFP pipeline
The only transform-dependent component is typically implemented as a lightweight Python component. This component receives the input parameters needed for transform execution and returns a dictionary containing the provided and default input parameters including the number of actors that will be used for the execution (based on implementation included in the shared library). The rest of the execution is handled by reusing the same shared components.
When a transform needs to access multiple S3 buckets with different access credentials, such as for retrieving config files, models, etc., executeRayJobComponent_multi_s3 should be used instead of the executeRayJobComponent. The last shared KFP component, executeSubWorkflowComponent, is used to execute simple pipelines as KFPv1 sub-workflows, as explained in the super pipelines section below.
DPK pipeline generator
As mentioned above, single-transform pipelines (‘simple’ pipelines) are very similar. Therefore, DPK provides a pipeline generator that creates a KFP Python pipeline from a given YAML file with input parameters. This YAML file contains three sections:
- pipeline_parameters – the pipeline name, description, the script that implements the transform, and other parameters
- pipeline_common_input_parameters_values – parameters common to all transforms, such as input and output folders, S3 access secret name, and others
- pipeline_transform_input_parameters – transform-specific parameters
For example, here is a pipeline definition for the “Noop” transform.
DPK pipeline testing
A transform Kubeflow pipeline can be tested by deploying it and executing it on a Kind cluster (setup details). You can start the test manually by executing the “make workflow-test” command in the transform directory. This command automates the setup of a Kind cluster, including KFP deployment (KFP v1 is used by default. If you need KFP v2, please set the `KFPv2` environment variable to 1). The command also compiles the pipeline and deploys it on the KFP server within the cluster. Finally, data processing tests are executed. See the Installation steps section of the DPK KFP documentation for more information about setting up a Kind cluster.
The DPK repo runs such KFP tests automatically using GitHub Actions workflows. (In a fork, you can manually trigger the GitHub workflow testing by selecting Actions -> Workflow Manual Run and running the workflow-manual-run workflow on the specified transform.)
Multiple Transforms in a DPK Pipeline
The most compelling use case is executing multiple DPK transforms within a single pipeline. To facilitate this, DPK introduces the concept of a 'super' KFP pipeline, which aggregates several 'simple' pipelines. This aggregation can be achieved by executing simple pre-deployed pipelines as sub-workflows in KFP v1 or as nested pipelines in KFP v2.
Figure 3 provides a high-level overview of an example code data preprocessing super pipeline using KFPv1.
Figure 3: Super pipeline of Code data processing with KFPv1
Each step in the picture above represents a separate execution of a “simple” pipeline for the specific transform. Each step contains the steps of a simple pipeline, such as creating a Ray cluster, executing a Ray job, and deleting the cluster. Users can see the execution of the steps as separate Runs in the KFP dashboard.
When KFPv2 is used, the view is different.
Figure 4 illustrates a super pipeline containing two simple pipelines: Noop and Document Identification. The project documentation includes views of their nested pipelines.
Figure 4 Super pipeline with Noop and Document Identification running with KFPv2
Conclusion
While Data Prep Kit significantly simplifies transform implementations, it does not, on its own, address the challenges of running transforms at scale, scheduling them, or combining multiple transforms into a single pipeline for repeatable execution. This is where KFP pipeline integration, described in this blog post can help. KFP support included in the Data Prep Kit handles all the boilerplate operations, such as creating and destroying Kubernetes-based Ray clusters and submitting jobs to them. It also provides a range of utilities that simplify wrapping of Ray-based transform implementations into KFP steps. Once this is done, a user can leverage all the great KFP capabilities for pipeline execution automation.
Although the current implementation only supports Ray-based transforms, it can be easily extended to support Spark.