Skip to main content

Spark Connector User Guide

Overview

The Koverse Data Platform (KDP) Spark Connector is built using the Spark 3 DatasourceV2 framework and it uses Scala 2.12. It works just like built-in data sources such as the JSON data source or the JDBC data source.

It’s an intuitive and powerful tool for using KDP to store your datasets before and after Spark transformations.

Installation

The first step is to add the Spark Connector as a dependency to your Spark application. It can be found at the Maven Central Repository.

Add the Connector to your project configuration file:

Maven:

<dependency>
<groupId>com.koverse</groupId>
<artifactId>kdp-spark-connector_2.12</artifactId>
<version>4.0.2</version>
</dependency>

SBT:

libraryDependencies += "com.koverse" % "kdp-spark-connector_2.12" % "4.0.2"

Gradle:

implementation group: 'com.koverse', name: 'kdp-spark-connector_2.12', version: '4.0.2'

Examples

Using the KDP Spark Connector in Your Spark Application

The below example code shows how to add the KDP Spark Connector to your Spark application for reading and writing to KDP. The Spark Connector requires you to save your username and password as the environment variables: KDP_EMAIL and KDP_PSWD before running your application. It will authenticate with KDP before performing any operations.

The default setting for the KdpUrl is https://api.app.koverse.com, although KdpUrl can also be passed in as a datasource “option”.

For all actions you will need to pass in the workspaceId for the datasets you are reading from.

String workspaceId = “myworkspace”;

To read from the KDP datasetId of the dataset you want to read is required.

String datasetId = "bca220e3-67fc-4504-9ae8-bedee9d68e6c";

You call the KDP Spark Connector in the format option and include the additional required options:

  • workspaceId

  • datasetId

      [Dataset<Row> randomDF = spark.read().format("com.koverse.spark.read")
    .option("workspaceId", workspaceId)
    .option("datasetId", datasetId)
    .load();]

To write to a dataset, the Connector will create a new dataset to write to. Pass in the name for your new dataset as well as the workspaceId.

String datasetName = "my-kdp-spark-test”;

counts.write().format("com.koverse.spark.write")
.option("workspaceId", workspaceId)
.option("datasetName", datasetName)
.mode("Append").save();

To configure your dataset for attribute-based access control (ABAC) you will need to include the required options of: parser, labeledField, HandlingPolicy, datasetName, and workspaceId. The option replacementString is required if you choose the handlingPolicy of REPLACE. See ABAC for more details.

    String parser = "simple-parser";
String abacDatasetName = “my-kdp-spark-abac-test”;

labeledData.write().format("com.koverse.spark.write")
.option("labelParser",parser)
.option("labeledFields","label")
.option("handlingPolicy","REPLACE")
.option("replacementString","NOPE")
.option("kdpUrl", kdpUrl)
.option("workspaceId", workspaceId)
.option("datasetName", abacDatasetName)
.mode("Append").save();

For the full example go to our examples repository on GitHub and see the Spark Connector - KdpCount.

Using KDP Spark Connector in Databricks

Note: Please ensure you set up an account and configure a cluster in Databricks prior to starting these instructions.

  1. In Databricks, select Compute on the left and click on the cluster that will be used to run KDP jobs.

  2. In the Cluster Configuration select Advanced Options.

  3. Select the Spark tab.

  4. Add the following Environment Variables:

         KDP_EMAIL=<kdp_workspace_email>
    KDP_PSWD=<kdp_workspace_pw>
    PYSPARK_PYTHON=/databricks/python3/bin/python3
    JNAME=zulu11-ca-amd64
  5. The spark-connector_2.12-<version>.jar will need to be uploaded to the databricks cluster that KDP jobs will be run on.

    1. Select the Libraries tab.

    2. Click Install New.

    3. Drag and drop the jar into the Install library dialog.

    4. Click Install.

    5. Restart the cluster.

  6. Now the cluster is ready to run KDP jobs.

  7. Create a new notebook.

    1. Here is an example of a notebook that reads a dataset called actorfilms, then transforms and writes back to KDP as a new dataset:

      NOTE: actorfilms.csv can be found here

          import com.koverse.spark.read._
      import com.koverse.spark.write._
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.expressions.Window

      val actorfilms = spark.read.format("com.koverse.spark.read").option("workspaceId", "<kdp_workspace_id>").option("datasetId","<kdp_dataset_id>").option("kdpUrl","https://api.dev.koverse.com").load()
      val top = actorfilms.filter($"Rating" >= 5).filter($"Year" >= 2000).toDF
      val ranked = top.drop(col("Actor")).drop(col("ActorID")).dropDuplicates.withColumn("Rank", rank().over(Window.orderBy(desc("Rating"), desc("Votes"), desc("Year")))).toDF
      val rounded = ranked.withColumn("Rounded", round($"Rating"))

      display(rounded)

      ranked.write.format("com.koverse.spark.write").option("kdpUrl", "https://api.dev.koverse.com").option("workspaceId", "emily").option("datasetName", "top_movies").mode("Append").save()
    2. After running the above Spark job you can create a visualization of the data by clicking New Visualization in the results table.

    3. Once the notebook is created, it can be run from the notebook cell.

    4. This notebook can also be run by creating a new job pointing to the notebook in Workflows.

      1. Select Workflows on the left.

      2. Click Create Job.

      3. Name the task.

      4. Select an existing cluster or use shared_job_cluster.

      5. If you select shared_job_cluster make sure you configure according to steps 1 through 5.iv.