Developer Documentation

Introduction

Organizations are likely to require some custom software development to address their own unique set of data analytics requirements. These custom software features will require their own unique sources and methods that provide strategic insight and competitive advantage.

Each organization could have potentially some combination of unique data sets, mission-specific data processing requirement used to analyze and transform those data sets, and custom interactive user interfaces.

With this in mind, Koverse provides this developer documentation describing how to programmatically extend Koverse with additional functionality, and how to deploy that functionality into an operational Koverse instance.

Tip

The Javadocs for the Koverse SDK are available from the Koverse UI. From the UI, just navigate to the Info panel by clicking on the Info panel icon at bottom left of the screen. The Info panel will provide links to the Javadocs.

Customizations Types

Developers can customize and extend Koverse in several ways, such as:

  • Koverse Apps - Web Applications that are hosted by Koverse leverage the Javascript SDK to support interaction with Koverse for a large number of users. Apps may also include custom Transforms to help get Data Sets into a structure that the App expects.
  • AddOns - these are packages that extend Koverse with custom Sources, Transforms, and Sinks.
  • Koverse Clients - these are processes that interact with Koverse via an API and that can be embedded in other services etc.

Koverse Core Concepts

The following sections provide a basic introduction to the basic abstract concepts which build a foundation of knowledge for a developer before working with Koverse API.

Data Model

The Koverse data model has two main conceptual components: Records, and Data Sets. Logically, each Data Set contains a set of Records.

For those familiar with relational database management systems such as Oracle or MySQL, the analogy is that a Data Set is similar to a Table, a Record is similar to a Row, and the fields of a Record are similar to the Columns. However, unlike traditional relational databases, Records in a single Collection in Koverse do not have to all have the same fields, and fields can contain complex values, like lists and mappings of fields to values.

Records

The Koverse canonical unit of data is a Record. A Record is a map of keys/values, or fields, similar to a JSON document. Like a JSON document, a Record can have embedded lists or nested maps.

A Record belongs to a single Data Set. Different Records within the same Data Set do not have to have the same fields or structure. The values in a Record can be of many different types, including Strings, Doubles, geospatial points, and Dates. A Record also has an optional security label which can be used to provide Record-level access control.

Some key points to remember about Records are:

  • Each record is present in one and only one Data Set.

  • Records are maps of key/value pairs, similar to JSON
    • Example: {key1: valueA, key2: valueB}
  • Value types may vary across records with matching keys
    • Example Record A: { key1: stringValue}
    • Example Record B: { key1: 234 }
  • Records do not have a user designated id field. It is up to the application to designate and populate an identifier field. The application can submit queries to look up records by any field, including a field to which it has assigned unique identifiers.

  • The optional security label on a record is set programmatically through the Java API and effects how the record is stored and retrieved.

  • Records can contain nested value objects: * Example: { name: parent, children: [ { name: child1} ] }

  • Records can contain the following native value types:

Native Value Type Examples and support string formats
String “A string of text characters”
Integer 15
Long 10000000000L
Float 44.26
Double 200.05
Date

Unix Timestamp: 1371277293 UTC (GMT)

Epoch Timestamp: 1371277293

DTG: 271545ZFEB13

Other various date formats supported:

  • yyyyMMdd hh:mm:ss
  • EEE MMM d HH:mm:ss Z yyyy
  • EEE MMM d HH:mm:ss zzz yyyy
  • yyyy-MM-dd
  • yyyy/MM/dd
  • yyyy-MM
  • yyyy/MM/dd HH:mm:ss
  • yyyy-MM-dd HH:mm:ss
  • yyyy/MM/dd HH:mm:ss.SSS
  • yyyy-MM-dd HH:mm:ss.SSS
  • MM/dd/yyyy HH:mm
  • MM-dd-yyyy HH:mm
  • ddHHmm’Z’ MMM yy
KoverseGeoPoint

Well Known Text String Format: Point 1.23 60.423

Comma separated decimal lat,long: 1.23,60.423

Inet4Address 192.168.1.1
URL http://www.koverse.com
Boolean true
byte[] An array of binary bytes such as the original bytes of a file

Data Set

Data Sets are the basic container for data in Koverse. You can think of them like tables - but every record in a Data Set can be completely unique in structure.

A Koverse Data Set is a named set of Records. A Data Set has:

  • Configurable indexes to enable queries to quickly and efficiently find Records.
  • Permissions to control access to Records in the Data Set.
  • Automatically discovered statistics and samples to provide insight into the Records contained in the Data Set.

Data Sources

A data source is simply the source of the data. It can be a file, a particular database on a DBMS, or even a live data feed. The data might be located on the same computer as the Koverse application, or on another computer somewhere on a network.

Koverse establishes the connection to these data sources and provides the ability to import data in Koverse, breaking the data into records according to the external format of the data (i.e. JSON, XML, CSV, relational records, etc).

Custom sources are only necessary when talking to a new type of server, often using a new protocol. For example, Koverse ships with an FTP source, and and IMAP source. New sources are not necessary simply for new file types and certainly not for specific uses of known physical formats such as a particular type of XML file.

Transforms

In Koverse, transforms are a process by which one or more Data Sets leverage re-usable, configurable, multi-stage MapReduce jobs for data manipulation. These are highly scalable and customizable analytics that are reusable across all of your data.

Built-In Example Transforms

The following list is an example of built-in Koverse transforms:

  • Close Graph
  • Extract Time Series
  • Faceting
  • Entity Extraction (using Apache OpenNLP)
  • Geo Discovery
  • Geo Location (IP Addresses, Airports, Postal Codes (Canada))
  • Summarize Relationships
  • Sentiment Analysis
  • Copy Records
  • Nearest Neighbors
  • Text Cleanup
  • Summarize Field Values

Import-time Transforms

Import-time Transforms are one-stage transforms that operate like a single map() phase and are applied to Records as they are imported from a Source. Import-time Transforms can be chained together during a particular Import job.

Export-time Transforms

Export-time Transforms are one-stage transforms that operate like a single map() phase and are applied to Records as they are exported to a Sink. Export-time Transforms can be chained together during a particular Export job.

Export File Formats

Export File Formats define how Records are written to file-based Sinks such as FTP and HDFS Sinks.

Sinks

Sinks represent external destinations to which Records from Data Sets may be sent. For example, one can write out Records as JSON objects to a remote file system.

Queries

Whether developing a Koverse App or building a custom source, Koverse queries conform to a specific format. There are two types of syntax supported: a Lucene-like syntax and a more Object-based structure.

Lucene-like Query Syntax

These queries are represented as strings and passed as such into query methods. The Lucene query syntax is described on Apache Lucene

Object-based Queries

Search Criteria Query Syntax
Searching ‘any’ field for a value {$any: fmv}
Search specific field for a value {field.name: fmv}
Search AND {$and: [{$any: fmv}, {$any: blue}]}
Search OR {$or: [{$any: fmv}, {$any: blue}]}

Range Queries

Search Criteria Query Syntax
Any value greater than or equal to 160 {$any: {$gte:160}}
Date field less than a specific date {date_created: {$lt: “1980-01-01T00:00:00.000Z}}
Geo Range

{fieldName: {$box: [[sw-lat, sw-long],[ne-lat, ne-long]]}}

{fieldName: {$box :[[39.5, -104.9],[40, -104.5]]}}

Note that queries that combine a range with any other criteria, and queries that combine multiple ranges require Composite Indexes on the fields involved. See _CompositeIndexes for information on building these.

Aggregations

Aggregations allow you to easily maintain near real-time statistics on the Records in a Data Set. Aggregations run incrementally on new Records to maintain pre-computed, up-to-date results so that they can always be queried with sub-second latency.

Quick Start Java Project

Koverse ships with a koverse-sdk-project-<version>.zip file that contains an example Maven based Java project. This project defines some simple custom sources, sinks, transforms, and apps. The maven pom.xml file in this project builds an Addon that can be uploaded. Simply alter the Java and HTML/JS code in this project, then build and deploy the addon to Koverse.

GitHub Koverse SDK Project

Visit Koverse SDK Project to fork or download the latest koverse-sdk-project for your version of Koverse.

Koverse SDK Project Maven Archetype

A Maven Archetype project is available for easy deployment. Modify the version number (KOVERSE-VERSION-HERE) in the command below to configure and create a new instance of a Koverse project:

mvn archetype:generate  \
  -DarchetypeRepository=http://nexus.koverse.com/nexus/content/groups/public/  \
  -DarchetypeGroupId=com.koverse.sdk.project \
  -DarchetypeArtifactId=koverse-sdk-project-archetype \
  -DarchetypeVersion=KOVERSE-VERSION-HERE \
  -DkoverseVersion=KOVERSE-VERSION-HERE

Building the Koverse SDK Project

The koverse-sdk-project is a standard Apache Maven <https://maven.apache.org> file that produces a shaded JAR - which means that it collapses all of its runtime dependencies into a single JAR file. This is necessary for running jobs in Koverse.

Use the following command from the root directory of the unzipped koverse-sdk-project:

mvn clean package

After a successful build, the resulting Addon JAR file is in the koverse-sdk-project/target/ directory. By default it is named koverse-sdk-project-<version>.jar

Modifying the Koverse SDK Project

You should modify the koverse-sdk-project to fit your needs. Here are some good starting points.

  1. Change the <artifactGroup> and <artifactId> values in the pom.xml file to match your organization and project.
  2. Change the Java package name from com.koverse.foo to your organization and project names.
  3. Change the <artifactGroup> and <artifactId> values in the pom.xml file to match your organization and project.
  4. Change the Java package name from com.koverse.foo to your organization and project names.
  5. Modify the Java classes to create your own custom sources, transforms, sinks, and application definitions.
  6. Delete any unused Java classes.
  7. Modify the /src/main/resources/classesToInspect.example file to match your Java classes and rename the file to classesToInspect.
  8. Modify the /src/main/resources/apps/ contents for your custom application.
  9. Modify the LICENSE and README file

Deploying the Addon to a Koverse Server

Addons can be deployed via a Maven command, or via the Koverse web interface.

Maven Addon Deployment

  1. Login to your Koverse server

  2. Navigate to the “System Administration” application

  3. Click the “API” tab

  4. Click “Add API Token” button

  5. Add a name such as “developer”

  6. Click “Administrators” button

  7. Click “Create Token” button

  8. Note the API Token that was created.

  9. Add the following settings to your ~/.m2/settings.xml profile:

    <properties>
            <koverse.apitoken>API-TOKEN-HERE</koverse.apitoken>
            <koverse.serverurl>KOVERSE-URL-HERE (ex: http://koversevm/Koverse)</koverse.serverurl>
    </properties>
    
  10. Use this single command to build and deploy the plugin for testing:

    mvn clean package org.apache.maven.plugins:koverse-maven-plugin:deploy
    

Web interface Addon Deployment

  1. Navigate to the “System Administration App”
  2. Click the “Addons” tab
  3. Click “Browse” or “Choose File”, and select the addon file from the <basedir>/target for your maven project.
  4. Click upload

Addons

Any custom code, whether it be one or more applications, transforms, or custom sources or sinks, can be packaged up into a simple JAR - referred to in Koverse as an Addon. Addons are uploaded to Koverse, via the System Administration app, for deployment.

Koverse reads the contents of the JAR file and extracts necessary metadata from any classes extending Koverse known types, such as Application, Transform, Source, and Sink.

Creating an Addon

Addons are simply JAR files with some specific files and a well formed directory structure. The koverse-sdk-project provides a complete example maven project that builds an appropriately constructed Addon JAR. You may use any assembly framework you like to produce a JAR file with the following attributes

  • Java binary .class files in the normal Java package directory structure.
  • Koverse Application HTML and JavaScript should be placed in the /apps/<applicationId> folder - where applicationId matches the string your CustomApplication.getApplicationId() method returns.
  • A file named classesToInspect can optionally be placed at the root level of the JAR. This file is a line separated list of all Applications, Transforms, Sources, and Sink Classes. Including this file causes Koverse to inspect only the classes listed in this file. This is useful when your Addon includes classes whose dependencies are not present in the JAR.

Example Addon JAR directory structure:

MyCustomAddon.jar
|
| -- classesToInspect
| -- com
           | -- mycompany
                 | -- myproject
                        | -- MyCustomTransform.class
                        | -- MyCustomApplication.cass
|-- some
        | -- other
                | -- dependency
                        | -- OtherDependency.class


| -- apps
        | -- myApplicationId
                | -- index.html
                | -- css
                        |-- index.css
                | -- javascript
                        |-- index.js
        | -- mySecondApplicationId
                | -- index.html
                | -- someFolder
                        | -- some.file

Uploading an Addon to Koverse

See the Addons section.

Applications may be auto deployed, and immediately ready for use - if so defined by the developer of the application. Sources, Transforms, and Sinks are also now ready for immediate use as well.

Managing Versions for Custom Components

The applicationId, sourceTypeId, transformTypeId, and sinkTypeId property of the Applications, Sources, Transforms, and Sinks, are used by Koverse to identify these custom components across their versions. This means that, except in extreme cases, all versions of a custom component should share a single typeId string. This allows Koverse to identify when a newly installed custom component should override an exisisting custom component.

Here is an example life cycle of a single Addon containing a single custom source.

  1. An administrator or developer user uploads a MyCustomAddon-1.0.0.jar Addon into a Koverse installation. This JAR contains a MyCustomSource with a sourceTypeId of myCustomSource.
  2. The source is used by many other end users over time to import data from various systems.
  3. A developer releases a new updated version of the Source. This source is now named My New Custom Source, has a sourceTypeId of myCustomSource, and is in an Addon named MyNewCustomAddon-2.0.0.jar.
  4. An administrator or developer uploads this new Addon JAR file.
  5. Koverse inspects the MyNewCustomAddon at install time, and discovers that the MyNewCustomSource has the same sourceTypeId as the existing MyCustomSource.
  6. Koverse automatically disables the old MyCustomSource. All instances of this source now execute the MyNewCustomSource code. This means end users may need to consider the changes in parameters or behavior.
  7. When all of the components of an Addon have been disabled, either manually or via uploading of new overlapping components, the old addon itself is disabled - and is therefore removed from the administration interface. In this case, MyCustomAddon-1.0.0.jar is disabled.
  8. Koverse does not discard the logging or reference to the old Addon. These items remain for auditing and provenance purposes.

The Version Property

The version properties of these custom components are simply used to identify the active installed version for troubleshooting and verification purposes. Koverse uses a last installed methodology when selecting the implementation version for custom Application, Source, Transforms, and Sinks. This means that the end user can simply upload any version of an Addon, and be assured they are using the last installed. The version string itself has no affect on which version is executed.

Change Control Across Versions

Developers should consider that customers upgrading from one version to the next, or down grading, may have already established Source, Transform, or Sink instances that have existing parameter values. This means the developer may need to handle outdated parameter configurations. The most appropriate method to handle changing parameter sets across versions is to inform the user that new configuration is needed, when simple error checking of parameters fails.

Defining Custom Apps in Addons

Addons enable developers to deliver custom “Apps” that are managed and deployed in Koverse installations. When a system administrator uploads an Addon JAR file, it is inspected for custom Application definitions. The custom application contents are included included in the JAR, so that it’s contents can then be delivered to the end user.

Application Definition

See the koverse-sdk-project/src/main/com/koverse/foo/MyCustomApplication.java file for an example of defining a custom application. That file defines the presence of a custom application type.

HTML/JS code in Addons

See the Creating an Addon section for the structure of an HTML/JS app in side an addon. The top directory name of the app’s html/js code should match the output of getApplication() method.

Sources API

Koverse Sources are designed to read data from a specific type of data source, such as a relational database or a remote file system.

Koverse uses MapReduce to import from sources when there are multiple items to be read and when processing those items on multiple machines will speed up the overall import. Examples include having many map workers read and process one file each from a directory on a remote file system.

Other sources are read from a single thread running on the same server on which Koverse is installed. These are referred to as inline sources.

Once a connection to a source is established, the next job of the source is to create a RecordStream that produces a set of Java objects, representing raw records or files obtained from the source.

One example is a CSV file. When a source is processing a CSV file, it simply breaks the file into distinct lines by tokenizing on the newline character, and provides those as Java Strings to the next phase.

Finally, Sources employ RecordFactory classes to convert Java Objects into Koverse Records. Often RecordFactories can be shared across sources, such as a factory used to convert lines from a CSV file to a Koverse Record. There are many types of sources that may provide CSV files: NFS, local file systems, remote HDFS instances.

To use the RecordFactory classes as well as others, be sure to inlcude the following dependency in your pom.xml:

<dependency>
        <groupId>com.koverse.addon</groupId>
        <artifactId>koverse-addon-file-source-deps</artifactId>
        <version>${project.parent.version}</version>
</dependency>

Sources are configured through defining parameters that are presented to users via the User Interface. This way the source can obtain necessary information such as the hostname and port of the server containing the source data, or a username and password.

See the Quick Start SDK Project section for details about a ready made project for creating custom sources.

Source Types

SimpleSource.java

The SimpleSource class should be extended when users would like the ability to import one or more records or files from a single external server. The see Koverse SDK Project contains an example MyCustomSource that extends SimpleSource.

The methods to implement are the following:

/* Using the end-user supplied parameter values, establish a connection to the outside source */
public abstract void connect();

/* Retrieve the next record from the outside source. Return null when no more records exist */
public abstract Map<String, Object> getNext();


// Cleanup the connection to the outside source
public abstract void disconnect();

/* Return a list of Parameter objects that describe the
 * end-user supplied parameters necessary for establishing
 * a connection and producing records from this source. */
public abstract List<Parameter> listParameters();

Transforms API

Koverse Transforms can operate over one or more data collections to perform advanced algorithmic processing or create analytic summaries. Koverse tracks all transform relationships between input and output Data Sets so the provenance of any given Data Set is traceable to its derivative Data Sets or Import Sources.

Koverse uses Apache Hadoop MapReduce to execute Transforms over data collections and handles all the details of scheduling, running, stopping, and monitoring the individual Hadoop jobs. To transform a data set, users implement a simplified MapReduce API that allows for reading records from one or more input Data Sets, potentially filtered according to user authorizations and writes output to a new Data Set, applying security labels appropriately.

Using the Koverse Transform API his has several advantages over using Hadoop directly:

  • Developers can focus on the details of their algorithm, rather than worrying about the details of handling many different input and output data formats and managing multiple jobs.
  • Transforms are parameterized so that, once a Transform is written, it can be configured and run by non-developers on the Data Sets they are authorized to read.
  • The details of how a transformed result data set is stored and labeled are handled by the Transform framework. This ensures that result sets will be automatically queryable and that access control policies are maintained.

Koverse transforms build on the MapReduce processing functions map(), combine(), and reduce().

See the Koverse SDK Project section for details about a ready made project for creating custom transforms.

Transform Stages

Many algorithms involve more than just one map() and one reduce() function. Koverse Transforms are organized into stages which are run one after the other, the output of the previous stage becoming the input to the following stage.

Transforms are specified by the developer defining the individual stages, and then specifying the order in which the stages should be run. The Transform framework handles the details of scheduling map, combine, and reduce stages into jobs to submit to Hadoop.

For example, if the first stage of a Transform is a reduce stage, the framework knows to set up an identity mapper in the first Hadoop job created to pass records directly to the reducer.

The only restriction on the order in which stages are run is that Combine stages must be followed by a Reduce stage.

Another item to note is that the first Map stage of a transform receives Record objects from the input Koverse Data Sets. Subsequent stages receive whatever objects are emitted by previous stages.

If a stage fails, the errors are reported to the User Interface and subsequent stages are cancelled.

Stages are defined by subclassing one of the Stage types described below.

RecordMapStage

This type of stage operates on Records from the input Data Sets specified when the Transform was configured.

public void map(Record record)

KVMapStage

This type of stage is used when mapping over the output of a previous stage.

public void map(Object key, Object value)

CombineStage

A CombineStage is used to locally combine the output of a previous map stage before the keys and values are sent to a ReduceStage. A CombineStage must be followed by a ReduceStage

public void combine(Object key, Iterable<Object> values)

ReduceStage

A ReduceStage takes a key and a set of values and emits one or more new key value pairs for consumption by a subsequent Stage, or writes Koverse Records to the output Collection in the data store.

public void reduce(Object key, Iterable<Object> values)

Emitter

The emitter is used to either send key value pairs to the next Stage or to write Records to the output collection. Usually all but the last Stage emit key value pairs and the last Stage writes Records.

Key value pairs emitted by emit() are sent to HDFS where they are read by a subsequent Stage and then deleted whereas Records emitted from writeRecord are written to the output Collection of the Transform and are indexed and made searchable according to the configuration of the output Collection.:

emit(Object key, Object value)


writeRecord(Record record)

Transform Runner

The transform runner is reponsible for assembling MapReduce jobs out of stages and incrementing a given transform job’s current stage. The runner will peek at proceeding stages in an attempt to execute map, combine and reduce stages as parts of a single job. After configuring a job, it will submit the job to the cluster.

Transform class

Stages are packaged up into a single Transform by defining a subclass of the Transform class.

Security

Koverse ensures that a Transform only reads records from collections from which the submitting user is authorized to read. In addition, any restrictions on the imported with additional security labels is applied so that individual records that the user is not authorized to see are not delivered to the Transform for processing.

The output Records of each Transform are labeled by the framework so that access to them is controlled.

Tips and Tricks

  • When writing transform logic, keep in mind that Koverse Records may vary in structure. As such, one cannot assume that certain fields will be present, or that the content of fields will conform to any particular format. Code must be defensive against variation in fields and their values.

Import Transforms API

Koverse ImportTransforms allow Records to be transformed during an Import job.

ImportTransforms can be parameterized to allow users to configure the ImportTransform at runtime. Parameters can be accessed via the setup method thus:

public void setup(Map<String, Object> params) throws IOException

Developers can grab the values of Parameters and store them for use in the transform method.

The core of an ImportTransform is the transform method:

public Iterable<SimpleRecord> transform(SimpleRecord inputRecord)

The transform method takes one input SimpleRecord and returns zero or more SimpleRecords.

Export Transforms API

Koverse ExportTransforms can be used to transform Records as they are being written to a Koverse Sink.

ExportTransforms can be parameterized to allow users to configure the ExportTransform at runtime. Parameters can be accessed via the setup method thus:

public void setup(Map<String, Object> params) throws IOException

Developers can grab the values of Parameters and store them for use in the transform method.

The core of an ExportTransform is the transform method:

public Iterable<SimpleRecord> transform(SimpleRecord inputRecord)

The transform method takes one input SimpleRecord and returns zero or more SimpleRecords.

Export File Formats API

Developers can extend ExportFileFormat to easily create new ways to export Koverse Records to file-based Sinks. ExportFileFormats are parameterized like other classes.

There are three primary methods to define when creating an ExportFileFormat:

public void startFile()

startFile is used to do initialization. The method getOutputStream() can be used to get a reference to the OutputStream to which SimpleRecords are written. Some ExportFileFormats wrap the OutputStream object with other objects to make it easier to output records.

This method can also be used to write out header information to the output file.

public void writeRecordToFormat(SimpleRecord record) throws IOException

This writeRecordToFormat method is used to output individual records to the output file. SimpleRecord objects can be converted into the bytes that the file format requires.

public void endFile()

The endFile function is used to write out any footer information required by the file format. It is not necessary to close the OutputStream as this is done automatically by the super class.

Parameters

Koverse Transforms, Sources, and Sinks are all configured via Parameters. Parameters are defined by the developer and allow specific instances of Transforms, Sources, and Sinks to be configured and deployed into varying environments by authorized non-developer users.

When creating a specific implementation of a Transform, Source, or Sink, developers provide a list of Parameters to present to the end-user via the User Interface.

Parameters are created with the following fields:

  • String parameterName (required) - uniquely identifies the parameter within the class.
  • String displayName (required) - the name of the parameter that is shown to the user.
  • String type (required) - one of the possible types defined in Parameter (see below).
  • String defaultValue (optional) - a value set as the default.
  • String referencedParameterNames (optional) - any parameterName that should be referenced. For example, for Parameters of the type TYPE_COLLECTION_FIELD, the possible values presented to the user in the UI are taken from the parameter defined in the referencedParameterName.
  • Boolean required (optional) - whether the parameter must be set by the user. The default is false
  • Boolean hideInput (optional) - whether the value of the parameter should be hidden in the UI. Used for sensitive parameters such as passwords.
  • String hint (optional) - a string of text to be shown to the user as an additional hint for applying a value to the parameter.

For example, a Source may define a parameter in its constructor as follows:

private static final String URL_PARAMETER = url;


public NewsFeedSource() {
inputParameters.add(
new Parameter(
URL_PARAMETER,
"RSS Feed URL",
Parameter.TYPE_STRING,
"http://rssfeedurl.xml"));
}

Parameters can be of the following types:

  • TYPE_STRING - for passing in single line short strings such as a hostname or URL.
  • TYPE_TEXT - for passing in longer multi-line strings, such as an entire script.
  • TYPE_BOOLEAN - presents a checkbox to the user and is set to true or false.
  • TYPE_INTEGER - allows the user to specify an integer value.
  • TYPE_FILE - Allows the to user choose a file from the local file system. The file is uploaded, and its contents are made available as a stream at execution time to the custom component.
  • TYPE_COLLECTION_FIELD - allows the user to select a single field from a collection. The referencedParameterName must be equal to the parameterName of an TYPE_INPUT_COLLECTION or TYPE_OUTPUT_COLLECTION parameterName. This is useful for informing classes of a specific field to use.
  • TYPE_COLLECTION_MULTIPLE_FIELD - allows the user to choose a set of fields from a collection selected as an input or output collection parameter. This is useful for informing classes of a specific set of fields to use.

There are additional Parameter types used primarily by the system:

  • TYPE_INPUT_COLLECTION - an input collection parameter presents the user with a list of collections from which the user is authorized to read. The UI then fills in this parameter with the internal unique ID of the collection the user chose. This component generally allows the end-user to select multiple input collections. The contents of all input collections are read into transform and export jobs for example.
  • TYPE_OUTPUT_COLLECTION - an output collection parameter presents the user with a list of collections to which the user is authorized to write. The UI then fills in this parameter the internal ID of the collection the user chose. This parameter generally only allows the user to select a single collection.
  • TYPE_SECURITY_LABEL_PARSER - presents the user with a list of Security Label parser options. Security label parsers are responsible for translating from a source security label to a Koverse record security label.

Transforms are pre-configured with parameters for input and output Data Sets. Sources and Sinks are pre-configured with output or input collections, respectively.

Sinks API

Koverse Sinks are designed to write Koverse Records to external data stores. For example, customers often want transformed data exported into HDFS for follow-on processing by down stream systems. Java developers can create custom Sinks to support specific destination data stores.

Sinks are executed as MapReduce jobs with only a map phase. The sinks API provides an interface that allows the developer to open a connection to an outside system, deliver records, and then close that connection.

See the Koverse SDK Project Maven Archetype section for details about a ready made project for creating custom sinks.

REST API

Note: Clients written in Javascript can use the Javascript SDK rather than interacting directly with the REST API.

Koverse provides an HTTP REST API for providing access to third party tools and integrations. This documentation explains how to access the REST API, and provide third party integrations such as widgets and data management. All responses, and HTTP payload requests are encoded in JSON.

See the REST API generated documentation for a complete list of methods and their signatures. The REST API documentation is hosted in koverse itself open https://<host:port>/docs/rest/

Response Messages

All response messages from the REST API are encoded in JSON, and include common attributes on the base response object. The most important attribute is the success boolean flag, that indicates whether the requested operation was successful. If the success value is false, then there will be a failureMessage attribute that provides a plain english statement as to the reason.

Example:

{“success”: false, “failureMessage”: “Something went wrong.”}

Commonly used methods

Almost all applications will require the following functionality

  • User Authentication and Authorization
  • Fetching Data Sets
  • Performing Queries

Additional Methods

  • User management
  • Collection management
  • Index management
  • Kicking off imports, transforms, exports
  • Many others

API Tokens

Koverse Administrators can create API Tokens, which are used by outside systems to authenticate. These are generally unused outside of the context of a direct users request. For example, a server that periodically updates it’s own cache using a Koverse query.

All REST API methods can be called using an API token to authenticate. The API Token takes precedence over any other method of authentication. Here is an example of using an API token to authenticate:

``http://<host:port>/api/system/status?apiToken=API-TOKEN-HERE``

Example REST API Methods

Ping

http://<host:port>/api/ping

A ping request shows that the Koverse HTTP REST API is available, and responsive. Use the ping response method to monitor basic system availability.

Example Ping Request

The following URL shows a ping request, for a Koverse server running on localhost.

http://localhost:8080/api/ping

Example Ping Response

{"recordCountEstimate":0,"responseTime":0,"success":true,"recordsWritten":0,"bytesWritten":0,"importSampleReady":false}

Session Authentication (Login)

http://<host:port>/api/login

POST data:

{"email":"username@example.com","password":"password"}

Example login failure response:

{"success":false,"failureMessage":"Login denied. Check username and password"}

Example login success response:

{"success":true,"user":{"id":1,"emailAddress":"admin","groups":
[{"id":1,"name":"Administrators","staticPermissions":["manageUsersAndGroups","manageDataCollections"
,"manageSystemSettings","audit","manageSources","manageLockDown","manageMapReduceJobs"]}]}}

Before using other REST API methods, an HTTP session must be established. Below is the URL and an example for login. The HTTP response to the login will include a JSESSIONID cookie that must be included in all future REST API calls.

Example Login URL

The following cURL command would retrieve an HTTP response with a JSESSIONID token for the default administrative user and password.

curl 'http://localhost:8080/api/login' -i -H 'Content-Type: application/json;charset=UTF-8' -H 'Accept: application/json, text/plain, */*'  --data-binary '{"email":"admin","password":"admin"}'

Example login response:

HTTP/1.1 200 OK
Access-Control-Allow-Origin: *
Access-Control-Allow-Headers: Origin, X-Requested-With, Content-Type, Accept
Set-Cookie: JSESSIONID=1e0absgbti8151fn0ip59b3kj4;Path=/
Expires: Thu, 01 Jan 1970 00:00:00 GMT
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(8.1.18.v20150929)

{"id":4,"firstName":"admiral","lastName":"admin","email":"admin","groups":[],"externalGroups":[],
"groupIds":[],"tokens":[],"disabled":false,"creationDate":null,"passwordResetHash":null,
"authenticatorUserId":"koverseDefault_admin","authenticatorTypeId":"koverseDefault",
"newPassword":null,"newPasswordConfirm":null}

Querying for data

The most basic feature of the Koverse REST API is to provide query/search access to data collections. Below is an example of querying all data collections for a logged-in user.

http://<host:port>/api/search/results?query=<queryHere>

Example Query

The following would query a Koverse instance running on localhost, port 8080, for the term test.

http://localhost:8080/api/search/results?query=test

Additional Methods

See the Koverse REST API Generated Docs for details about the many other methods available.

Pig Scripts

Koverse supports using Pig <http://pig.apache.org> as a transform. Pig transforms are simple pig scripts - where Koverse defines the load and store functions. To use Pig, follow these steps.

  1. Open the Data Flow app.
  2. Click Add Transform
  3. Choose ‘Pig’ from the transform type drop down.
  4. Choose Input and Output collections.
  5. Write the Pig script in provided text area.

Koverse automatically provides the “load” and “store” functions. You’ll simply need to write a Pig script that references the input collections by name, and assigns a value to the output collection by name. Pig variables are case sensitive, and have some restrictions. Therefore Koverse transforms Data Set names to use only case sensitive alphanumeric and underscore characters. Also, Pig table names cannot start with a non Alpha character (A-Z or a-z) - therefore Koverse prepends the character A when a data collection name starts with a non alpha character. Here are some example data collection name conversions.

  • “My 1st Data Set” = My_1st_Data_Collection
  • “P*22” = P_22
  • “cAsE sEnSiTiVe” = cAsE_sEnSiTiVe
  • “9Items” = “A9Items”
  • “_Items” = “A_Items”

Koverse records are converted into tuples. The field names are applied as the Pig field names with the same conversion as above.

While Koverse allows unstructured data, Pig requires highly structured data. The schema defined for Pig fields is derived using the data type(s) seen in the Collection Details Field’s page in Koverse. If a field has only a single type detected, the conversions in the table below are used directly. If a field has more than one type detected, and one of those types is a String, all values for that field will take a chararray type in Pig. Otherwise, if more than one type is detected but none are Strings, for example, if a field is 90% Number and 10% Date, it will be defined as a double value type in Pig. Here are the conversions of Koverse data types to Pig data types.

Koverse Pig
String chararray
Number double
Date DateTime
KoverseGeoPoint [double,double]
Byte[] bytearray
Object map

Example Pig Scripts

The following is a simple pig script that would copy the contents of DataCollection1 to DataCollection2:

DataCollection2 = DataCollection1

This more complex Pig script would perform a Group By operation on fieldA with a sum on fieldb:

A = GROUP DataCollection1 BY fieldA;
DataCollection2 = FOREACH A GENERATE FLATTEN(group) as fieldA, SUM(fieldB) as fieldBSum;

Pig Transforms Special Considerations

Pig transforms are executed as multiple stage map reduce jobs. They’re considered “non-incremental” transforms in Koverse. Never restart the koverse-server process while a Pig transform is executing - as the job’s state will be lost and the job will never finish.

3rd Party Authentication and Authorization

Koverse can be extended to integrate with existing enterprise authentication and authorization systems that may be required for a given production environment. While an extensible component that is built against the Koverse SDK, these authentication and authorization modules are not like other extensible components like Sources and Transform and packaged into a Koverse AddOn. Instead, these modules need to be built into a JAR and placed in the classpath of the Koverse Webapp or Koverse Server. Additionally, the koverse-webapp.properties or koverse-server.properties files need to be modified to identify the module(s) that Koverse should use for authentication and authorization.

Implementing an Authentication and Authorization Module To implement an authentication and authorization module for the webapp, a developer will extend the AbstractWebAppAuthModule class. This is a Guice module that enables the injection of new authentication and authorization implementations. There are two ways to implement authentication, either with the HttpServletRequestAuthenticator or the WebAppParameterAuthenticator. The HttpServletRequestAuthenticator enables authentication based on information in the HttpServletRequest, such as an X.509 certificate. The WebAppParameterAuthenticator enables authentication based on custom, named parameters. To pass external groups or security tokens to Koverse, implement a WebAppAuthorizer.

To implement an authorization module for the server, a developer will extend the AbstractServerAuthModule class, which is also a Guice module. Currently, only authorizers can be created for the Koverse Server. To create one, implement the interface ServerAuthorizer in your own class. The Server authorizer can do many of the same things that the Web App authorizer can do, so you may decide to create a Server authorizer instead of a Web App authorizer. The only time that an authorizer must be used in the webapp and not the server is when some information avaialble from the HTTP call is required to do the authorization.

Application Server Configuration The module and implementations described above need to be built into a JAR file which is placed in the classpath of the Koverse Webapp or Koverse Server, depending o what kind of authorizer it is. Authenticators are only supported in the webapp and not in the server. This can be done by simply putting the JAR into the lib directory of the Koverse Web App or Server.

Koverse Webapp Configuration To update the active authentication and authorization modules used by the Koverse Webapp, set the com.koverse.webapp.auth.modules property in koverse-webapp.properties to a comma separated list of Guice module class names.

Koverse Server Configuration To update the active authorization modules used by the Koverse Server, set the com.koverse.server.security.auth.modules property in koverse-server.properties to a comma separated list of Guice module class names.

Java Client

Introduction

The Java Client allows JVM based software to connect to and interact directly with Koverse through the REST API. It is capable of interacting with the REST API using plain HTTP connections, SSL, and SSL with client PKI certificates.

The general concept is to instantiate an implementation of the com.koverse.client.java.KoverseClient interface and invoke its methods. There are two such implementations, one for interacting with the Koverse web application via REST and another directly to the Koverse server using Thrift.

These instructions will focus on the REST based implementation because the Thrift based implementation is still a work-in-progress.

Basics

To use the Java client in your software, modify your project’s Maven pom.xml to include the koverse java client dependency. First, include a repositories section in your pom.xml file and add the koverse repository, for example:

<repositories>
        <repository>
        <id>koverse</id>
        <name>Koverse Public Repo</name>
        <url>http://nexus.koverse.com/nexus/content/groups/public/</url>
        <layout>default</layout>
        </repository>
</repositories>

This will allow maven to download the Koverse java client dependency. This dependency has the groupId com.koverse and the artifactId koverse-client-java. Set the version of that dependency to match the version of Koverse your software will be communicating with. Your pom.xml file should now have a section similar to:

<dependencies>
        <dependency>
        <groupId>com.koverse</groupId>
        <artifactId>koverse-client-java</artifact>
        <version>1.2.0</version>
        </dependency>
</dependencies>

Note that if your IDE integrates with Maven, it should be able to download JavaDocs for the koverse client software and display them for you. If you’d like to download the JavaDocs yourself, visit http://nexus.koverse.com/nexus/content/groups/public/com/koverse/koverse-client-java/ in your browser, select the folder for your version of Koverse, and download the Javadoc archive.

Lastly:

In your Java code, you will be instantiating an instance of com.koverse.client.java.KoverseConnection. Note that the constructor takes an argument of a com.koverse.client.java.KoverseConnection. You choose which implementation of KoverseConnection to use in order to specify whether to use plain un-encrypted HTTP or encrypted HTTP (e.g. HTTP over SSL, TLS).

Unencrypted HTTP Connections

Begin by creating an instance of com.koverse.client.java.PlainKoverseConnector. Note that its constructor requires you to provide a valid Koverse API Token and the base URL of Koverse (e.g. http://www.myserver/Koverse). The, create an instance of KoverseConnection, supplying the PlainKoverseConnector you just created as the sole constructor argument.

Now, you may use the create KoverseConnection object to perform operations such as:

  • Retrieve collection, including names and collection identifiers.
  • Insert, update, and delete records.
  • Get user and system information.
  • Retrieve collection statistics and download records in bulk.
  • Perform queries.
  • Perform auto-complete queries.

Please view the JavaDocs for the interface com.koverse.client.java.KoverseClient for further details on these operations.

Encrypted HTTP Connections

Configuring the KoverseClient to use SSL is somewhat more involved. IT can be further complicated by using client side certificate authentication. As such, let’s being with just setting up a SSL connection for now. Client side certificate authentication will be explained in the next section.

Before I begin, please note that this information is also documented in the JavaDocs for the com.koverse.client.java.SecureKoverseConnector class. Please feel free to reference that as well.

The important thing to realize is that the use of SSL is configured through the standard JVM mechanism of using special system properties and a Java Key Store.

Since it is most likely the case that the server is not assigned a certificate issued by a trusted CA (Certificate Authority), we must configure your Java software to use a self-signed certificate used by the Koverse server.

As such, the first thing you must do is create a Java keystore that will contain the certificate for the Koverse server. That is done by using the Java keytool command, such as so:

keytool -import -alias koverseserver -file koverseserver.crt -storepass $PASS -keystore koverseserver.keystore

In the above example, we are creating an entry named koverseserver in the keystore located in the file koverseserver.keystore from the contents of the certificate file koverseserver.crt. Additionally, we are protecting the contents of the keystore by encrypting it with the password stored in the environment variable $PASS.

Getting this certificate stored into the keystore is the first step.

The next step is to define special Java system properties when your program is executed so that Java will use the information in the keystore. Those system properties are:

-Djavax.net.ssl.trustStoreType=jks

-Djavax.net.ssl.trustStore=koverseserver.keystore

-Djavax.net.ssl.trustStorePassword=$PASS

Your program must either be run with the above command line properties or you must programmatically add them to the JVM’s System Properties at runtime.

With this done, your software should be capable of interacting with a SSL enabled Koverse Server. However, in the case that things don’t seem to be working for you, there are some tips that can help.

  1. Be sure to contact us for support
  2. Apply the system property -Djavax.net.debug=all to get lots of good SSL debugging output.

Encrypted HTTP Connections with Client Side Certificates

To use client side certificates, do the same as in the previous section, but also make sure the following system properties are set in your software as well:

-Djavax.net.ssl.keyStoreType=pkcs12

-Djavax.net.ssl.keyStore=clientcertificate.p12

-Djavax.net.ssl.keyStorePassword=$PASS

Where you are specifying your client certificate that is located in the file clientcertificate.p12. This file is a pkcs12 formatted file, protected by the password stored in the system environment variable $PASS

Spark SQL Introduction

Koverse 1.4 supports Transforms written using the Apache Spark API, including Spark SQL. Spark SQL allows Koverse records to be processed using the popular SQL language, which is useful for many common operations such as reshaping data, filtering, combining, and aggregation.

Spark SQL can be used in two ways in Koverse Transforms: first, using the generic Spark SQL transform, developers can simply paste a SQL script into a new instance of a Spark SQL Transform in the Koverse UI.

Second, transform developers can create Koverse AddOns which include Spark SQL statements as part of a Java or Scala class. These can be packaged, uploaded to the Koverse platform, and reused to transform multiple input collections.

Using the generic Spark SQL Transform

Koverse ships with a generic Spark SQL Transform that allows users to simply paste a Spark SQL statement into a text parameter and applies that script to the input collection specified.

To create a transform like this, start in the Data Flow application from the main menu. Click ‘Add Transform’ and select ‘Spark SQL Transform’ from the drop down list.

Configure the input collections desired.

In the text input marked ‘SQL select statement’, paste in a SQL statement. When specifying a table, use position parameters to identify which input collection should be used. For example, if you’ve selected an input collection ‘stocks’ in the input collections control it will be referenced in the SQL statement as $1. The second input collection is referenced as $2 and so on.

_images/DataFlow.png

For a description of the SQL statements that are supported see https://spark.apache.org/docs/latest/sql-programming-guide.html

Building a new AddOn that includes Spark SQL statements

Developers can also build Koverse AddOns that can leverage the Spark SQL API in transforms. To create a Spark SQL transform, create a Java class that extends JavaSparkSQLTransform and implement the required methods.

An abbreviated example is as follows. Create a new class extending JavaSparkSQLTransform:

…
import com.koverse.sdk.transform.spark.sql.JavaSparkSqlTransform;
import com.koverse.sdk.transform.spark.sql.JavaSparkSqlTransformContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
…

public class MySparkSqlTransform extends JavaSparkSqlTransform {

Provide any parameters you wish to expose to users to configure this transform, and the basic information about the transform that will help users identify it in the UI:

public Iterable<Parameter> getParameters()
public String getName()
public String getTypeId()
public Version getVersion()
.....

Now in the execute() method, access is provided to a SqlContext. Input collections are already loaded as data frames into the Sql Context and only need to be referenced in SQL statements. The input collection IDs can be accessed via the JavaSparkSqlTransformContext:

protected DataFrame execute(JavaSparkSqlTransformContext context) {{

final SQLContext sql = context.getSqlContext();

final List<String> inputCollections =

context.getJavaSparkTransformContext()

.getInputCollectionIds();

…

Collection Schemas are also available, but they do not need to be used to execute SQL statements. They are there in case additional DataFrames need to be created.

Map<String, CollectionSchema> schemas =

context.getJavaSparkTransformContext().getInputCollectionSchemas();

SQL statements should be edited to reference the input collection IDs, and can be executed simply by passing the SQL string to the SqlContext. The resulting data frame should be returned and Koverse will persist the output as a new collection.

return sql.sql(sqlStatement)

Spark Transform API

Introduction

Koverse now supports the Apache Spark cluster computing framework through a set of native Koverse APIs that leverage much of the Spark primitives. The Koverse Spark APIs allow a developer of Koverse a set of routines, protocols, and tools for building software applications based upon the Koverse technology.

See the Addons section for information about building an addon that contains a class that uses the Koverse Spark API.

The following is a high-level outline of the Koverse Spark API framework:

Interface SparkTransform

com.koverse.sdk.transform.spark

@ThreadSafe

@Immutable

public interface SparkTransfor

Description: The following methods, when executed in order, obtain information on how to execute the transform: getName(), getVersion() and getParameters(). These methods are used to configure the transform before performing execution using execute(com.koverse.sdk.transform.spark.SparkTransformContext) which is passed a SparkTransformContext to give it the information needed to run the spark transform.

Modifier and Type Method Description
org.apache.spark.rdd.RDD <SimpleRecord> getName() Koverse calls this method to execute your transform.
String getName() Get the name of this transform.
Iterable<Parameter> getParameters() Get the parameters of this transform.
String getTypeId() Get a programmatic identifier for this transform.
Version getVersion() Get the version of this transform.
boolean supportsIncrementalProcessing() Whether the transform supports incremental output.
Example
final RDD<SimpleRecord> actual; actual = se.execute(sparkTransformContext);

Interface SparkTransformContext

com.koverse.sdk.transform.spark

@NotThreadSafe

@Immutable

public interface SparkTransformContext

Description: Given to a SparkTransform when it is executed. Provides context information to assist in the execution.

Modifier and Type Method Description
Map<String,org.apache.spark.rdd.RDD <SimpleRecord>> getInputCollectionRDDs() Get all Koverse input collection RDDs from the parameters that were input by the user.
Map<String,Collection Schema> getInputCollectionSchemas() Get the schemas for all input collections.
Map<String,String> getInputCollectionSchemas() Get all parameters that is input by the user, with the exception of collection parameters (which are given as RDDs). None of the keys or values in the returned map will be null.
org.apache.spark.SparkContext getSparkContext() Get the spark context to use during execution

Class JavaSparkTransform

com.koverse.sdk.transform.spark

@ThreadSafe

@Immutable

public abstract class JavaSparkTransform extends Object implements SparkTransform, Serializable

Description: A version of of spark transforms that are easier to work with when the spark code is written in Java.

Modifier and Type Method Description
protected abstract org.apache.spa rk.api.java.JavaRDD<SimpleRecord> execute(JavaSparkTransformContext sparkTransformContext) Koverse calls this method to execute your transform
org.apache.spark.rdd.RDD <SimpleRecord> execute(SparkTransformContext sparkTransformContext) Invokes execute(com.koverse.sdk.transform.spark.JavaSparkTransformContext) after wrapping up the Scala specific types into Java friendly types.
boolean supportsIncrementalProcessing() Override this method if transform supports incremental processing - i.e.

Class JavaSparkTransformContext

com.koverse.sdk.transform.spark

@Immutable

@NotThreadSafe

public final class JavaSparkTransformContext extends Object

Description: A version of the Spark Transform Context more tailored for use with pure Java Spark code.

Modifier and Type Method Description
Map<String,org.apache.spark. api.java.JavaRDD<SimpleRecord>> getInputCollectionRDDs() Get all Koverse input collection RDDs from the parameters that were input by the user.
Map<String,CollectionSchema> getInputCollectionSchemas() Get the schemas for all input collections.
Map<String,String> getParameters() Get all parameters that is input by the user, with the exception of collection parameters (which are given as RDDs) None of the keys or values in the returned map will be null.
org.apache.spark.api.java. JavaSparkContext getSparkContext() Get the spark context to use during execution.

Class SparkTransformLoader

com.koverse.sdk.transform.spark

public class SparkTransformLoader extends Object

Description:

Modifier and Type Method Description
String getName() Get name
List<Parameter> getParmeters() Get all the parameters input by user
String getTypeId() Get Type Id
Version getVersion() Get the spark version

Spark SQL Transform API

Koverse now supports the Apache Spark SQL via a set of native Koverse Spark SQL APIs that let the user query structured data as a distributed dataset (RDD). This makes it easy to run SQL queries.

See the Addons section for information about building an addon that contains a class that uses the Koverse Spark SQL API.

The following is a high-level outline of the Koverse Spark SQL API framework:

Class JavaSparkSqlTransform

com.koverse.sdk.transform.spark.JavaSparkTransform

@Immutable

@ThreadSafe

public abstract class JavaSparkSqlTransform extends JavaSparkTransform

Description: A transform for executing Spark SQL query transforms

Modifier and Type Method Description
protected abstract org.apache.spark.sql.DataFrame execute(JavaSparkSqlTransformContext context) Execute the Spark SQL query.
protected org.apache.spark.api.java.JavaRDD<SimpleRecord> execute(JavaSparkTransformContext sparkTransformContext) Koverse calls this method to execute your transform

Class JavaSparkSqlTransformContext

com.koverse.sdk.transform.spark.JavaSparkTransform

@NotThreadSafe

public final class JavaSparkSqlTransformContext extends Object

Description: The context for a JavaSparkSqlTransform

Modifier and Type Method Description
JavaSparkTransformContext getSparkTransformContext() Get the JAva spark tranform context, if needed.
org.apache.spark.sql.SQLContext getSqlContext() Get the SQL context, which is ready to go and loaded with the schemas for the input collections.

Class KoverseSparkSql

com.koverse.sdk.transform.spark.sql

public class KoverseSparkSql extends Object

Description:

Modifier and Type Method Description
static org.apache.spark.sql.DataFrame createDataFrame(org.apache.spark.api.java.JavaRDD<org.apache.spark.sql.Row> rowdRdd, org.apache.spark.sql.SQLContext sqlContext, org.apache.spark.sql.types.StructType schema) Create a new Data Frame from an RDD of rows, a SQL Context, and a struct type (the Spark SQL schema)
static org.apache.spark.sql.DataFrame | getSqlContext() createDataFrame(org.apache.spark.api.java.JavaRDD<SimpleRecord> recorddRdd, org.apache.spark.sql.SQLContext sqlContext, FlatCollectionSchema collectionSchema) Create a new Data Frame from an RDD of records, a SQL Context, and a flat collection schema
static org.apache.spark.api.java.JavaRDD<org.apache.spark.sql.Row> createRowRdd(org.apache.spark.api.java.JavaRDD<SimpleRecord> recordRdd, FlatCollectionSchema collectionSchema) Converts a RDD of records and a flat collection schema into a RDD of rows.
static org.apache.spark.sql.SQLContext| getSqlContext() createSqlContext(org.apache.spark.SparkContext sparkContext, Map<String,org.apache.spark.api.java.JavaRDD<SimpleRecord>> recordRdds, Map<String,FlatCollectionSchema> collectionSchemas) Converts two maps keyed by collection name, one containing record RDDs and the other containing collection schema, into a SQLContext ready for query.
static org.apache.spark.sql.types.StructType createSqlSchema(FlatCollectionSchema collectionSchema) cGiven a flat collection schema, create s Spark SQL Struct type, which the SQL schema.

For a reference of the supported query syntax in the Spark Java SQL see:

http://savage.net.au/SQL/sql-99.bnf.html

http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

Spark Scala Transform API with Examples

These examples give a quick overview of the Koverse Spark Scala Transform API. Koverse 1.4 currently provides a single transform API in Scala.

The Koverse transform class called: SimpleSparkTransform()

To use the Scala Transform, simply run the SimpleSparkTransform.execute() method with the proper arguments; JavaSparkContext and org.apache.spark.api.java.JavaRDD.

Please refer to JavaDoc’s for full detailed usage description.

The transform consists of the following high level steps:

  1. Get the ‘projectField’ field property from the JavaSparkContext
  2. Map the input collection RDDs to Scala map
  3. Get the collection from the Scala map
  4. Scan and pull out java records/objects from RDD
  5. Output the total record count for Java records
  6. Output the total record count for Scala records

Here is an example of a Spark Scala execute() method:

  protected def execute(context: JavaSparkTransformContext): JavaRDD[SimpleRecord] = {
      val field = context.getParameters.get(C.FIELD_PARAM)
      println(s"looking for field $field in the records")

      val map = mapAsScalaMap(context.getInputCollectionRDDs)
      println("mapped input collection RDDs to scala map")

      val collectionKV = map.iterator.next
      println(s"got collection ${collectionKV._1} from map")

      val rdd = JavaRDD.toRDD(collectionKV._2)
      println("pulled out RDD from tuple")

      val transformRDD = rdd
          .filter(r => r.containsKey(field))
          .map(r => {
          val outputRecord: SimpleRecord = new SimpleRecord

          if(r.containsKey(field)) {
          outputRecord.put(field, r.get(field))
          } else {
          outputRecord.put(field, "NOTHING")
          }

          println(s"${field} => ${r.get(field)}")
          outputRecord
    })
  println(s"total java records ${transformRDD.count()}")

  val output = JavaRDD.fromRDD(transformRDD)
  println(s"total scala records ${output.count}")

  output
}

You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:

./bin/run-example <scala class>

For a description of the Spark Scala statements that are supported see the Scala Docs at:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package

Spark Java API with Examples

Koverse 1.4 supports Transforms written using the Apache Spark API. Koverse APIs leverages much of the Spark primitive abilities that can be applied by writing a custom Transform or use an existing Transform provided by the Koverse API.

public class MySparkSqlTransform extends JavaSparkSqlTransform {

Provide any parameters you wish to expose to users to configure this transform, and the basic information about the transform that will help users identify it in the UI:

public Iterable<Parameter> getParameters()

Example:

@Override
    public Iterable<Parameter> getParameters() {
            ArrayList<Parameter> params = new ArrayList<Parameter>();
            params.add(new Parameter(FIELD_PARAM, "Field to project", Parameter.TYPE_COLLECTION_FIELD));
            return params;
    }

public String getName()

Example:

@Override
    public String getName() {
            return "Spark Java";
    }

public String getTypeId()

Example:

@Override
    public String getTypeId() {
            return "Spark Java Transform";
    }

public Version getVersion()

Example:

@Override
    public Version getVersion() {
            return new Version(0, 1, 0);
    }

Here is an example of usage (Create a new class extending JavaSparkTransform()):

final JavaSparkTransform javaSparkTransform;
final SparkTransformContext sparkTransformContext
final RDD<SimpleRecord> actual;

javaSparkTransform = new JavaSparkTransform() {
        @Override
        public Iterable<Parameter> getParameters() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public String getName() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public String getTypeId() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public Version getVersion() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        protected JavaRDD<SimpleRecord> execute(JavaSparkTransformContext sparkTransformContext) {
            return sparkTransformContext.getInputCollectionRDDs().get("input");
        }
   };

   actual = javaSparkTransform.execute(sparkTransformContext);
}

For a complete description of the Spark Java APIs that are supported see the Spark Java Docs at: https://spark.apache.org/docs/latest/api/java/index.html

Custom Transforms Code Examples

This code example is provided as a bootstrap to developing your own ‘custom transform’. The ‘companyTransform’ class presented here can be used as a template.

Custom Transform Example:

  package com.company.transform;

  import com.koverse.sdk.data.Parameter;
  import com.koverse.sdk.data.Record;
  import com.koverse.sdk.data.TermTypeDetector;
  import com.koverse.sdk.transform.AbstractRecordMapStage;
  import com.koverse.sdk.transform.AbstractReduceStage;
  import com.koverse.sdk.transform.AbstractTransform;
  import com.koverse.sdk.transform.TransformStage;

  import java.io.DataInput;
  import java.io.DataOutput;
  import java.io.IOException;
  import java.net.InetAddress;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.List;
  import java.util.Map;

  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.Writable;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

  import java.util.Map.Entry;

  public class CompanyTransform extends AbstractTransform {

  private static Logger logger = LoggerFactory
               .getLogger(CompanyTransform.class);

  public static class CompanyCustomValue implements Writable {

       public String companyCustomValue;

       public CompanyCustomValue() {
       }

       public CompanyCustomValue(String myCustomValue) {
               this.companyCustomValue = myCustomValue;
       }

      @Override
       public void write(DataOutput out) throws IOException {
               out.writeUTF(companyCustomValue);
       }

       @Override
       public void readFields(DataInput in) throws IOException {
               companyCustomValue = in.readUTF();
       }
  }

  private static final String PARAM_MY_CUSTOM = "myCustomParam";

  public CompanyTransform() {
  } // Necessary for Hadoop

  public static class CompanyCustomMapStage extends AbstractRecordMapStage {

       private String companyCustomParam = null;

       /**
        * Perform Mapper setup here. Read parameters, setup data structures,
        * etc
        */
       @Override
       public void setup() {
               companyCustomParam = getStageContext().getParameterValue(
                               PARAM_MY_CUSTOM);
       }

       /*
        * This mapper will takes in a list of IP address for each record and
        * create all unique combinations in any direction i.e.
        * 127.0.0.1,255.255.255.255 is the same as 255.255.255.255, 127.0.0.1
        */
       public void map(Record inputRecord) throws IOException,
                       InterruptedException {

               HashMap<String, ArrayList<String>> mapOfIps = new HashMap<String, ArrayList<String>>();

               ArrayList<String> ipsArrayList = new ArrayList<String>();

               // System.out.println("********This is the mapper running!*********");

               for (Entry<String, Object> fields : inputRecord.fields.entrySet()) {

                       Object value = inputRecord.get(fields.getKey());

                       // Get the record value and nested values
                       checkIP(value, ipsArrayList);

               }

               // call to get unique pairs
               uniquePairs(ipsArrayList, mapOfIps);

               /*
                * emit resulting map using key and custom class in the format of
                * {"127.0.0.1,255.255.255.255", count} The sort and group function
                * will then combine all identical keys and create larger lists,
                * which are then sent to reducer to do the final count for each
                * grouping
                */
               CompanyCustomValue myCustomValueClass = null;

               for (Entry<String, ArrayList<String>> fields : mapOfIps.entrySet()) {

                       // System.out.println("this is the new data structure");

                       String key = fields.getKey();

                       ArrayList<String> ips = (ArrayList<String>) mapOfIps.get(key);

                       // System.out.println("for emit new key is:" + key);

                       myCustomValueClass = new CompanyCustomValue(
                                       Integer.toString(ips.size()));

                       getStageContext().emit(new Text(key.toString()),
                                       myCustomValueClass);
               }
       }

      @Override
       public Class<Text> getMapOutputKeyClass() {
               return Text.class;
       }

       @Override
       public Class<CompanyCustomValue> getMapOutputValueClass() {
               return CompanyCustomValue.class;
       }

       // recursive function takes record and then continues to iterate through
       public void checkIP(Object value, ArrayList<String> ipsArrayList) {
               if (value instanceof List) {

                       try {
                               Iterator<?> iterator = ((List<?>) value).iterator();

                               while (iterator.hasNext()) {
                                       Object listValue = (Object) iterator.next();
                                       checkIP(listValue, ipsArrayList);

                               }

                               // System.out.println("this value is instance of list");

                       } catch (Exception e) {
                               e.printStackTrace();
                       }

               } else if (value instanceof Map) {

                       try {

                               Map<?, ?> result = (Map<?, ?>) value;

                               Iterator<?> iterator = result.keySet().iterator();

                               while (iterator.hasNext()) {
                                       Object resultValue = result.get(iterator.next());

                                       checkIP(resultValue, ipsArrayList);
                               }

                               // System.out.println("this value is instance of map");

                       } catch (Exception e) {
                               e.printStackTrace();
                       }

               } else if (value instanceof InetAddress) {
                       ipsArrayList.add(((InetAddress) value).getHostAddress());

                       // System.out.println("check it is INET:" + ((InetAddress)
                       // value).getHostAddress());

               } else if (value instanceof String) {

                       String removedSlash = ((String) value).replace("/", "");

                       if (TermTypeDetector.typify(removedSlash) instanceof InetAddress) {
                               ipsArrayList.add(removedSlash);
                       } else {
                               // System.out.println("This is not INET!:" + removedSlash);
                       }
               }
       }

       public void uniquePairs(ArrayList<String> ipsArrayList,
                       HashMap<String, ArrayList<String>> mapOfIps) {

               // go through list and build unique ip address pairs
               String ipAddress = "";
               String ipAddress2 = "";
               String pair = "";

               ArrayList<String> pairs = new ArrayList<String>();

               for (int i = 0; i < ipsArrayList.size(); i++) {
                       ipAddress = (String) ipsArrayList.get(i);

                       for (int j = i; j < ipsArrayList.size(); j++) {
                               if (j == i)
                                       continue;

                               ipAddress2 = (String) ipsArrayList.get(j);

                               pair = ipAddress + "," + ipAddress2;

                               // System.out.println(pair);

                               pairs.add(pair);
                       }

               }

               // take unique list of pairs that is any directional and build a
               // HashMap with ArrayList of ip pairs
               for (int i = 0; i < pairs.size(); i++) {
                       String testPair = (String) pairs.get(i);

                       String[] indIps = testPair.split(",");
                       String firstPart = (String) indIps[0];
                       String secondPart = (String) indIps[1];
                       String testReversePair = secondPart + "," + firstPart;

                       if (mapOfIps.get(testPair) != null) {
                               ArrayList<String> testList = (ArrayList<String>) mapOfIps
                                               .get(testPair);
                               testList.add(testPair);
                       } else if (mapOfIps.get(testReversePair) != null) {
                               ArrayList<String> testList = (ArrayList<String>) mapOfIps
                                               .get(testReversePair);
                               testList.add(testReversePair);
                       } else {
                               ArrayList<String> testList = new ArrayList<String>();
                               testList.add(testPair);
                               mapOfIps.put(testPair, testList);
                       }

               }

       }

  }

  /*
   * The reduce will count all of the ip pairs and write them through Record.
   * The count for each grouping will occur and then records will be written
   * out independent of other reduce tasks.
   */
  public static class CompanyCustomReduceStage extends AbstractReduceStage {

       private String companyCustomParam;

       /** Perform setup here */
       public void setup() {
               companyCustomParam = getStageContext().getParameterValue(
                               PARAM_MY_CUSTOM);
       }

       /** Perform main work here */
       @Override
       public void reduce(Object feature, Iterable<Object> entities)
                       throws IOException {

               // System.out.println("******This is the reduce running!*******");

               // System.out.println("feature: " + feature.toString());

               Iterator<Object> i = entities.iterator();

               int mergedCount = 0;

               while (i.hasNext()) {
                       CompanyCustomValue count = (CompanyCustomValue) i.next();

                       mergedCount += Integer.parseInt(count.companyCustomValue);

                       // System.out.println("count: " + count.companyCustomValue);
                       // System.out.println("merge count: " + mergedCount);

               }

               String[] splitIPs = feature.toString().split(",");
               ArrayList<String> listIPs = new ArrayList<String>();

               for (int l = 0; l < splitIPs.length; l++) {
                       listIPs.add("/" + splitIPs[l]);
               }

               Record myCustomRecord = new Record();
               myCustomRecord.addField("IP_ADDRESS", listIPs);
               myCustomRecord.addField("count", mergedCount);

               try {
                       // Write the record to the data store, if this is the last stage
                       getStageContext().writeRecord(myCustomRecord);

               } catch (InterruptedException e) {
                       logger.error(e.getMessage(), e);
               }

       }

       public Class<Text> getMapOutputKeyClass() {
               return Text.class;
       }

       public Class<CompanyCustomValue> getMapOutputValueClass() {
               return CompanyCustomValue.class;
       }

  }

  @Override
  protected void fillInParameters(List<Parameter> parameters) {
       // Add custom parameters
       parameters.add(new Parameter(PARAM_MY_CUSTOM, "Custom Parameter",
                       Parameter.TYPE_STRING));
  }

  @Override
  public String getName() {
       return "Company IP Address Transform";
  }

  @Override
  public String getJobTypeId() {
       return "companyTransform";
  }

  @Override
  protected void fillInStages(List<Class<? extends TransformStage>> stages) {
       /**
        * Add all stages in order here
        */
       stages.add(CompanyCustomMapStage.class);
       stages.add(CompanyCustomReduceStage.class);
  }

  @Override
  public String getVersion() {
       return "1.0.0";
  }
}

Aggregation Query API

The sections above have gone into detail about how to configure Aggregations on the Records in a Data Set. As originally stated, the primary use case for Aggregations is to maintain precomputed statistics over time to support interactive (sub-second) queries from applications such as analytic dashboards. This section will provide detail on the query API. The REST API will be discussed, but a Thrift API is also available and is very similar.

Queries are submitted via HTTP POST requests to http://<host:port>/api/query/aggregate. The Content-Type header should be set to “application/json”. An example query for the first example above might look like:

{
  "collectionId":"web_logs_20150828_212035_291",
  "dimensionValuesPairs":[
    {
      "dimensionValues":[{"field":"1mBin","value":"1440785460000"}],
      "producer":{"type":"count"}
    }
  ],
  "generateTotal":true,
}

This will query the web log Data Set for the event count in the 1-min bin of 1440785460000. This would have been the events that occured between 18:11:00 and 18:12:00 GMT on Fri, 28 Aug 2015. The dimensionValuesPairs property is an array so a single query may contain many dimensionValues which enables you to batch requests which can be useful when pulling the data for a timeseries graph for example. There currently is no range query, so instead you would batch together all of the 1mBin values that you need to render your graph. The requests are also batched on the server so this ends up being fast even if your query has 100s of dimensionValues.

Below we show another query, but this one is for the 3rd Aggregation example from above, the number of unique users per country per day:

{
  "collectionId":"web_logs_20150828_212035_291",
  "dimensionValuesPairs":[
    {
      "dimensionValues":[{"field":"1dBin","value":"1441166400000"}, {"field":"country", "value":"USA"}],
      "producer":{"type":"cardest", "relation":"userId"}
    }
  ],
  "generateTotal":true,
}

Here we see how the field (or relation) is specified in conjunction with the aggregation function. We also see how additional dimensions can be added to the query easily. Below is a table mapping the Scala aggregation function to the type used in the query API.

Function Type String
Count count
CountMap countmap
TopK topk
SumInteger sumint
SumDecimal sumdec
Min min
Max max
Average ave
StringSet set
CardinalityEstimate cardest
QuantileEstimate quantest

The generateTotal property above enables the query to request a final reduction on the server for when the query returns more than one value. This can be very useful in certain cases where the client can’t perform the reduction itself. For example, you could aggregate and query for the individual event counts for each day of a week and then add these values together on the client to get a total number of events for the week. What if you were trying to get the total number of unique users for the week? You are likely to get a very wrong answer if you simply add up the unique users for each day of the week as the same users may access the web site on several days during the week. By requesting the final reduction on the server, it can properly merge the data structures that hold the cardinality estimates and then return the total.

The query response looks very similar to the query, but with values:

{
  "recordCountEstimate": 0,
  "responseTime": 0,
  "success": true,
  "recordsWritten": 0,
  "aggregateQueryResult": {
      "collectionId": "web_logs_20150828_212035_291",
      "aggregateValues": [
          {
              "dimensionValuesProducerPair": {
                  "dimensionValues": [
                      {
                          "field": "1mBin",
                          "value": "1440785460000"
                      }
                  ],
                  "producer": {
                      "type": "count"
                  }
              },
              "value": "3"
          }
      ],
      "total": "38",
      "lastAggregationExecuted": 1440797400467
  }
}

Here we see there were 38 events for the 1-minute bin that was queried. The query response also shows the last time an aggregation job was run and completed, which provides a “freshness” to the results.

Glossary of Koverse Terminology

Data Set
Data Sets are the basic container for data in Koverse. You can think of them like tables - but every record in a data collection can be completely unique in structure.
Data Flow
Visualize, configure, and execute the movement of data within the Koverse system.
File Upload
Upload one or more files from the browser and import it into a collection.

Introduction

Index

Index