Java, NoSQL, SQL, REST API and other scary words

Spark Dataset API implementation

Dataframes

Spark introduced Dataframes in Spark 1.3 release. Dataframe overcomes the key challenges that RDDs had.

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a R/Python Dataframe. Along with Dataframe, Spark also introduced catalyst optimizer, which leverages advanced programming features to build an extensible query optimizer.

Dataframe Features

  • Distributed collection of Row Object: A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database, but with richer optimizations under the hood.
  • Data Processing: Processing structured and unstructured data formats (Avro, CSV, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, MySQL, etc). It can read and write from all these various datasources.
  • Optimization using catalyst optimizer: It powers both SQL queries and the DataFrame API. Dataframe use catalyst tree transformation framework in four phases,
  • 1.Analyzing a logical plan to resolve references 2.Logical plan optimization 3.Physical planning 4.Code generation to compile parts of the query to Java bytecode.
  • Hive Compatibility: Using Spark SQL, you can run unmodified Hive queries on your existing Hive warehouses. It reuses Hive frontend and MetaStore and gives you full compatibility with existing Hive data, queries, and UDFs.
  • Tungsten: Tungsten provides a physical execution backend whichexplicitly manages memory and dynamically generates bytecode for expression evaluation.
  • Programming Languages supported:
  • Dataframe API is available in Java, Scala, Python, and R.

Dataframe Limitations

  • Compile-time type safety: As discussed, Dataframe API does not support compile time safety which limits you from manipulating data when the structure is not know. The following example works during compile time. However, you will get a Runtime exception when executing this code.

Example:

case class Person(name : String , age : Int) val dataframe = sqlContect.read.json(“people.json”) dataframe.filter(“salary > 10000”).show => throws Exception : cannot resolve ‘salary’ given input age , name

This is challenging specially when you are working with several transformation and aggregation steps.

  • Cannot operate on domain Object (lost domain object): Once you have transformed a domain object into dataframe, you cannot regenerate it from it. In the following example, once we have create personDF from personRDD, we won’t be recover the original RDD of Person class (RDD[Person]).

Example:

case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person(“A”,10),Person(“B”,20))) val personDF = sqlContect.createDataframe(personRDD) personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

Datasets API

Dataset API is an extension to DataFrames that provides a type-safe, object-oriented programming interface. It is a strongly-typed, immutable collection of objects that are mapped to a relational schema.

At the core of the Dataset, API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. The tabular representation is stored using Spark internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization. Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.

Dataset Features

  • Provides best of both RDD and Dataframe: RDD(functional programming, type safe), DataFrame (relational model, Query optimazation , Tungsten execution, sorting and shuffling)
  • Encoders: With the use of Encoders, it is easy to convert any JVM object into a Dataset, allowing users to work with both structured and unstructured data unlike Dataframe.
  • Programming Languages supported: Datasets API is currently only available in Scala and Java. Python and R are currently not supported in version 1.6. Python support is slated for version 2.0.
  • Type Safety: Datasets API provides compile time safety which was not available in Dataframes. In the example below, we can see how Dataset can operate on domain objects with compile lambda functions.

Example:

case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person(“A”,10),Person(“B”,20))) val personDF = sqlContect.createDataframe(personRDD) val ds:Dataset[Person] = personDF.as[Person] ds.filter(p => p.age > 25) ds.filter(p => p.salary > 25) // error : value salary is not a member of person ds.rdd // returns RDD[Person]

  • Interoperable: Datasets allows you to easily convert your existing RDDs and Dataframes into datasets without boilerplate code.

Datasets API Limitation

  • Requires type casting to String: Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type. On the other hand, if we use map operation on Datasets, it will not use Catalyst optimizer.

Example:

ds.select(col(“name”).as[String], $”age”.as[Int]).collect()

The Datasets API brings in several advantages over the existing RDD and Dataframe API with better type safety and functional programming.With the challenge of type casting requirements in the API, you would still not the required type safety and will make your code brittle.

Implementation

So, let’s start with the simplest example of datastax + cassandra  + CQL:

package com.chatSparkConnactionTest;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import com.datastax.driver.core.Cluster;

import com.datastax.driver.core.Session;

import com.datastax.spark.connector.cql.CassandraConnector;

 

public class JavaDemoDataSetCQL {

public static void main(String[] args) {

     SparkConf conf = new SparkConf();

    conf.setAppName(“Chat”);

    conf.setMaster(“local[1]”);

    conf.set(“spark.cassandra.connection.host”, “127.0.0.1”);

      

     JavaSparkContext sc = new JavaSparkContext(conf);

    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

try (Session session = connector.openSession()) {

   session.execute(“DROP TABLE chat.dictionary;”);

session.execute(“CREATE TABLE chat.dictionary (value_id text, d_value text,d_name text,PRIMARY KEY(value_id, d_name)) WITH comment = ‘dictionary values’ AND CLUSTERING ORDER BY(d_name ASC);”);

session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Requested’,’1′);”);

session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Friends’,’2′);”);

}

sc.stop();

}

}

Note: it’s very common problem with cassandra for today – service is not starting at all. To fix this – open the cassandra.yaml file which can be found at C:\INSTALLPATH\apache-cassandra\conf

  1. Open the cassandra.yaml file and press Ctrl + F and look for #cdc_raw_directory:
  2. Uncomment the line by removing the “#” symbol and replace the contents with the following:“C:/INSTALLPATH/data/cdc_raw”

So your full line should look like: cdc_raw_directory: “C:/Program Files (x86)/DataStax Community/data/cdc_raw”

Dataset API Example

Read

This is a simple example of the Java class with comments (This one link was really helpfull for me: http://spark.apache.org/docs/latest/sql-programming-guide.html).

package com.chatSparkConnactionTest;

import java.util.HashMap;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

 

public class JavaDemoDataSet {

public static void main(String[] args) {

SparkSession spark = SparkSession

         .builder()

         .appName(“JavaDemoDataSet”)

         .config(“spark.sql.warehouse.dir”, “/file:C:/temp”)

         .config(“spark.cassandra.connection.host”, “127.0.0.1”)

         .config(“spark.cassandra.connection.port”, “9042”)

         .master(“local[2]”)

         .getOrCreate();

 

//Read data

Dataset<Row> dataset = spark.read()

.format(“org.apache.spark.sql.cassandra”)

.options(new HashMap<String, String>() {

           {

            put(“keyspace”, “chat”);

               put(“table”, “dictionary”);

           }

       }) //to set keyspace and keyspace table

.load() //to load data from the keyspace

.filter(“value_id BETWEEN 1 AND 5”); // to filter values as WHERE cql statement

//.groupBy(“value_id”); // to count values with the same value_id

 

//Print data

  dataset.show();

  spark.stop();

}

}

JSON

There is a very simple way to get JSON from Cassandra via one simple query:

SELECT JSON * FROM dictionary;

To implement java class which will return JSON:

Dataset<String> dataset = spark

.read()

.format(“org.apache.spark.sql.cassandra”)

.options(new HashMap<String, String>() {

           {

            put(“keyspace”, “chat”);

               put(“table”, “dictionary”);

           }

       }) //to set keyspace and keyspace table

.load() //to load data from the keyspace

.toJSON();

Write

I found an answer from Software Engineer at DataStax that there is no need to use DataSet to add a one new row to the table. But if there are multiple items, the best way to do that is to form DataSet and save it

datasetWrite

.write()

.format(“org.apache.spark.sql.cassandra”)

.options(new HashMap<String, String>() {

    {

        put(“keyspace”, “chat”);

        put(“table”, “dictionary”);

    }

})

.mode(SaveMode.Append)

.save();

 

CRUD

Here is the full CRUD example code (I got a mix of SparkConf + JavaSparkContext + CassandraConnector and SparkSession to handle DataSets and CQL queries):

 

package com.chatSparkConnactionTest;

 

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SaveMode;

import org.apache.spark.sql.SparkSession;

import com.datastax.driver.core.Session;

import com.datastax.spark.connector.cql.CassandraConnector;

 

public class JavaDemoDataSetCassandraCRUD {

public static void main(String[] args) {

//create connector via SparkConf

SparkConf conf = new SparkConf();

    conf.setAppName(“Chat”);

    conf.setMaster(“local[1]”);

    conf.set(“spark.cassandra.connection.host”, “127.0.0.1”);

    JavaSparkContext sc = new JavaSparkContext(conf);

    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

    

    //create Spark session

    SparkSession spark = SparkSession

       .builder()

       .appName(“JavaDemoDataSet”)

       .config(“spark.sql.warehouse.dir”, “/file:C:/temp”)

       .config(“spark.cassandra.connection.host”, “127.0.0.1”)

       .config(“spark.cassandra.connection.port”, “9042”)

       .master(“local[1]”)

       .getOrCreate();

 

    //create Dataset

    Dataset<Row> dataset;

    

try (Session session = connector.openSession()) {

    //session.execute(“DROP KEYSPACE IF EXISTS chat”);

    //session.execute(“CREATE KEYSPACE chat WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 1}”);

    //session.execute(“CREATE TABLE chat.chat (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)”);

session.execute(“DROP TABLE chat.dictionary”);

session.execute(“CREATE TABLE chat.dictionary (value_id text, d_value text,d_name text,PRIMARY KEY(value_id, d_name)) WITH comment = ‘dictionary values’ AND CLUSTERING ORDER BY(d_name ASC);”);

//session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Requested’,’1′);”);

//session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Friends’,’2′);”);

//session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Ignored by first’,’3′);”);

//session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Ignored by second’,’4′);”);

//session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Ignored by both’,’5′);”);

//session.execute(“INSERT INTO chat.dictionary (d_name,d_value,value_id) VALUES (‘Friendship Status’,’Ignored by both’,’6′);”);

}

//Save DataSet to Cassandra

List<Dictionary> dictionaryItems = new ArrayList<Dictionary>();

addDictionaryItemToArray(dictionaryItems, “1”, “Requested”, “Friendship Status”);

addDictionaryItemToArray(dictionaryItems, “2”, “Friends”, “Friendship Status”);

addDictionaryItemToArray(dictionaryItems, “3”, “Ignored by first”, “Friendship Status”);

addDictionaryItemToArray(dictionaryItems, “4”, “Ignored by second”, “Friendship Status”);

addDictionaryItemToArray(dictionaryItems, “5”, “Ignored by both”, “Friendship Status”);

addDictionaryItemToArray(dictionaryItems, “6”, “Ignored by both”, “Friendship Status”);

System.out.println(“Amount of items to add: ” + dictionaryItems.size());

Dataset<Dictionary> datasetWrite = spark.createDataset(dictionaryItems, Encoders.bean(Dictionary.class));

System.out.println(“dataset items amount: ” + datasetWrite.count());

//Save DataSet to Cassandra

datasetWrite

.write()

.format(“org.apache.spark.sql.cassandra”)

.options(new HashMap<String, String>() {

    {

        put(“keyspace”, “chat”);

        put(“table”, “dictionary”);

    }

})

.mode(SaveMode.Append)

.save();

 

//Read data

  showDictionaryData(spark);

  //update one row

  try (Session session = connector.openSession()) {

  session.execute(“UPDATE chat.dictionary SET d_value =’TO DELETE’ WHERE value_id = ‘6’ AND d_name = ‘Friendship Status’ IF EXISTS;”);

  }

  showDictionaryData(spark);

  //delete one row

  try (Session session = connector.openSession()) {

  session.execute(“DELETE FROM chat.dictionary WHERE value_id=’6′ AND d_name = ‘Friendship Status’ IF EXISTS;”);

  }

  showDictionaryData(spark);

  spark.stop();

  sc.stop();

  }

 

public static List<Dictionary> addDictionaryItemToArray(List<Dictionary> dictionaryItems, String value_id,

String d_value, String d_name) {

Dictionary dictionary = new Dictionary();

dictionary.setValue_id(value_id);

dictionary.setD_value(d_value);

dictionary.setD_name(d_name);

dictionaryItems.add(dictionary);

return dictionaryItems;

}

 

public static void showDictionaryData(SparkSession spark) {

Dataset<Row> dataset;

dataset = spark.read()

.format(“org.apache.spark.sql.cassandra”)

.options(new HashMap<String, String>() {

   {

    put(“keyspace”, “chat”);

       put(“table”, “dictionary”);

   }

})

.load();

dataset.show();

}

}

Advertisements
Standard

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s