Skip to main content

Spark Connector User Guide

Overview

The Koverse Spark Connector is built using the Spark 3 DatasourceV2 framework and it uses Scala 2.12. It works just like built-in data sources such as the JSON data source or the JDBC data source.

It’s an intuitive and powerful tool for using Koverse to store your datasets before and after Spark transformations.

Installation

The first step is to add the Spark Connector as a dependency to your Spark application. It can be found at the Maven Central Repository.

Add the Connector to your project configuration file:

Maven:

<dependency>
<groupId>com.koverse</groupId>
<artifactId>kdp-spark-connector_2.12</artifactId>
<version>4.1.2</version>
</dependency>

SBT:

libraryDependencies += "com.koverse" % "kdp-spark-connector_2.12" % "4.0.2"

Gradle:

implementation group: 'com.koverse', name: 'kdp-spark-connector_2.12', version: '4.0.2'

Examples

Using the Koverse Spark Connector in Your Spark Application

The below example code shows how to add the Koverse Spark Connector to your Spark application for reading and writing to Koverse. The Spark Connector requires you to save your connection information as the environment variables before running your application. It will authenticate with Koverse or Keycloak before performing any operations.

Koverse Credentials:
export KDP_EMAIL=<kdp_email>
export KDP_PSWD=<kdp_password>
Keycloak Credentials:
export AUTHENTICATE_WITH_KEYCLOAK=true
export KEYCLOAK_BASE_URL=<your_kdp_url>
export KEYCLOAK_REALM=<your_keycloak_realm>
export KEYCLOAK_CLIENT_ID=<your_keycloak_client_id>
export KEYCLOAK_CLIENT_SECRET=<your_keycloak_client_secret>
export KEYCLOAK_USERNAME=<your_keycloak_username>
export KEYCLOAK_PASSWORD=<your_keycloak_password>

The default setting for the KdpUrl is https://api.app.koverse.com, although KdpUrl can also be passed in as a data source “option”.

For all actions you will need to pass in the workspaceId for the datasets you are reading from.

String workspaceId = “myworkspace”;

To read from the Koverse datasetId of the dataset you want to read is required.

String datasetId = "bca220e3-67fc-4504-9ae8-bedee9d68e6c";

You call the Koverse Spark Connector in the format option and include the additional required options:

  • workspaceId

  • datasetId

      [Dataset<Row> randomDF = spark.read().format("com.koverse.spark.read")
    .option("workspaceId", workspaceId)
    .option("datasetId", datasetId)
    .load();]

To write to a dataset, the Connector will create a new dataset to write to. Pass in the name for your new dataset as well as the workspaceId.

String datasetName = "my-kdp-spark-test”;

counts.write().format("com.koverse.spark.write")
.option("workspaceId", workspaceId)
.option("datasetName", datasetName)
.mode("Append").save();

To configure your dataset for attribute-based access control (ABAC) you will need to include the required options of: parser, labeledField, HandlingPolicy, datasetName, and workspaceId. The option replacementString is required if you choose the handlingPolicy of REPLACE. See ABAC for more details.

    String parser = "simple-parser";
String abacDatasetName = “my-kdp-spark-abac-test”;

labeledData.write().format("com.koverse.spark.write")
.option("labelParser",parser)
.option("labeledFields","label")
.option("handlingPolicy","REPLACE")
.option("replacementString","NOPE")
.option("kdpUrl", kdpUrl)
.option("workspaceId", workspaceId)
.option("datasetName", abacDatasetName)
.mode("Append").save();

For the full example go to our examples repository on GitHub and see the Spark Connector - KdpCount.

Using Koverse Spark Connector in Databricks

Note: Please ensure you set up an account and configure a cluster in Databricks prior to starting these instructions.

  1. In Databricks, select Compute on the left and click on the cluster that will be used to run Koverse jobs.

  2. In the Cluster Configuration select Advanced Options.

  3. Select the Spark tab.

  4. Add the following Environment Variables:

         [Koverse or Keycloak Credentials] see above in the **Using the Koverse Spark Connector in Your Spark Application** section.
    PYSPARK_PYTHON=/databricks/python3/bin/python3
    JNAME=zulu11-ca-amd64
  5. The spark-connector_2.12-<version>.jar will need to be uploaded to the databricks cluster that Koverse jobs will be run on.

    1. Select the Libraries tab.

    2. Click Install New.

    3. Drag and drop the jar into the Install library dialog.

    4. Click Install.

    5. Restart the cluster.

  6. Now the cluster is ready to run Koverse jobs.

  7. Create a new notebook.

    1. Here is an example of a notebook that reads a dataset called actorfilms, then transforms and writes back to Koverse as a new dataset:

      NOTE: actorfilms.csv can be found here

          import com.koverse.spark.read._
      import com.koverse.spark.write._
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.expressions.Window

      val actorfilms = spark.read.format("com.koverse.spark.read").option("workspaceId", "<kdp_workspace_id>").option("datasetId","<kdp_dataset_id>").option("kdpUrl","https://api.dev.koverse.com").load()
      val top = actorfilms.filter($"Rating" >= 5).filter($"Year" >= 2000).toDF
      val ranked = top.drop(col("Actor")).drop(col("ActorID")).dropDuplicates.withColumn("Rank", rank().over(Window.orderBy(desc("Rating"), desc("Votes"), desc("Year")))).toDF
      val rounded = ranked.withColumn("Rounded", round($"Rating"))

      display(rounded)

      ranked.write.format("com.koverse.spark.write").option("kdpUrl", "https://api.dev.koverse.com").option("workspaceId", "emily").option("datasetName", "top_movies").mode("Append").save()
    2. After running the above Spark job you can create a visualization of the data by clicking New Visualization in the results table.

    3. Once the notebook is created, it can be run from the notebook cell.

    4. This notebook can also be run by creating a new job pointing to the notebook in Workflows.

      1. Select Workflows on the left.

      2. Click Create Job.

      3. Name the task.

      4. Select an existing cluster or use shared_job_cluster.

      5. If you select shared_job_cluster make sure you configure according to steps 1 through 5.iv.