In my previous post I have talked about Apache Spark. We have also built an application for counting the number of words in a file, which is the hello world equivalent of the big data world.

Apache spark Java Logo’s

It has been over 18 months since that article and spark has changed quite a lot in this time. A new major release of spark, which is spark-2.0 came out and now the latest version is 2.2.1 And with a new version comes new API’s and improvements. In-fact the first thing you’ll probably notice is that, you don’t need to create SparkContext or JavaSparkContext objects anymore. The various context and configurations have been put together into a new class SparkSession. You can still access the SparkContext or the SqlContext from the SparkSession object itself. So, you’ll be starting your programs with this now:

SparkSession spark = SparkSession.builder().appName("Freblogg-Spark").master("local").getOrCreate();

And you can use this spark variable the way you’d use other context variables.

Another change in Spark 2.0 is that, there is a heavy emphasis on the usage of Dataset API’s, and for a good reason. Datasets are more performant and memory efficient than RDD’s. RDD (Resilient Distributed Datasets) have been pushed to second place now. You can still use RDD’s if you want but Datasets are the preferred API. In fact, datasets have some nice convenience methods that we can use them for even unstructured data like text as well. Let’s generate some cool lipsum from Malevole. It looks something like this:

Ulysses, Ulysses - Soaring through all the galaxies. In search of Earth, flying in to the night. Ulysses, Ulysses - Fighting evil 
and tyranny, with all his power, and with all of his might. Ulysses - no-one else can do the things you do. Ulysses - like a bolt of
thunder from the blue. Ulysses - always fighting all the evil forces bringing peace and justice to all....

Now, you might try to use an RDD to read this, but let’s see what we can do with Datasets.

Dataset<String> lipsumDs = spark.read().textFile("fake-text.txt");
lipsumDs.show(5);

Here we are reading the text file using the spark object we created earlier and that gives us a Dataset<String> lipsumDs. The show() method on the dataset object prints the dataset. And we get the following output:

+--------------------+
|               value|
+--------------------+
|Ulysses, Ulysses ...|
|Ulysses, Ulysses ...|
| no-one else can ...|
|  always fighting...|
|                    |
+--------------------+

What we see here are the lines of the text file. Each line in the file is now a row in the Dataset. There are now a rich set of functions available to you in Datasets which weren’t in RDD’s. You can do filters on the rows for certain words, do a count on the table, perform groupBy operations, etc. all like you would on a Database table. For a full list of all the available operations on Dataset, read this: Dataset: Spark Documentation.

I hope that’s enough talk about unstructured data analysis. Let’s get to the main focus of this article, which is using Datasets for structured data. More specifically, csv and JSON. For this tutorial, I am using the data created from Mockaroo, an online data generator. I’ve created 1000 csv records that look like this:

id,first_name,last_name,email,gender,ip_address
1,Netti,McKirdy,nmckirdy0@slideshare.net,Female,148.3.248.193
2,Nickey,Curreen,ncurreen1@tripadvisor.com,Male,206.9.48.216
3,Allayne,Chatainier,achatainier2@trellian.com,Male,191.118.4.217
4,Tades,Emmett,temmett3@barnesandnoble.com,Male,153.113.87.195
5,Shawn,McGenn,smcgenn4@shop-pro.jp,Male,247.45.80.68
6,Giuseppe,Scobbie,gscobbie5@twitter.com,Male,123.114.131.200
...

We’ll use this data, which I’ve put in a file named fake-people.csv, to work with Datasets. Let’s create a Dataset out of this csv data.

Dataset<Row> peopleDs = spark.read().option("header", "true").csv("fake-people.csv");
peopleDs.show(5);

Since we’ve column headers in our data, we add the .option("header", "true") and the output is a nicely formatted table of the data with all the columns like this:

+---+----------+----------+--------------------+------+--------------+
| id|first_name| last_name|               email|gender|    ip_address|
+---+----------+----------+--------------------+------+--------------+
|  1|     Netti|   McKirdy|nmckirdy0@slidesh...|Female| 148.3.248.193|
|  2|    Nickey|   Curreen|ncurreen1@tripadv...|  Male|  206.9.48.216|
|  3|   Allayne|Chatainier|achatainier2@trel...|  Male| 191.118.4.217|
|  4|     Tades|    Emmett|temmett3@barnesan...|  Male|153.113.87.195|
|  5|     Shawn|    McGenn|smcgenn4@shop-pro.jp|  Male|  247.45.80.68|
+---+----------+----------+--------------------+------+--------------+

You can read in JSON data similarly as well. So, I generated some JSON this time from Mockaroo.

{"id":1,"first_name":"Zenia","last_name":"Joberne","email":"zjoberne0@foxnews.com","gender":"Female","ip_address":"214.207.159.43"}
{"id":2,"first_name":"Renard","last_name":"Kezor","email":"rkezor1@elpais.com","gender":"Male","ip_address":"199.3.18.104"}
{"id":3,"first_name":"Briant","last_name":"Patel","email":"bpatel2@odnoklassniki.ru","gender":"Male","ip_address":"111.184.217.23"}
{"id":4,"first_name":"Robinett","last_name":"Heasley","email":"rheasley3@tiny.cc","gender":"Female","ip_address":"21.40.190.226"}
{"id":5,"first_name":"Rosalinda","last_name":"Glandfield","email":"rglandfield4@indiegogo.com","gender":"Female","ip_address":"26.16.4.132"}
{"id":6,"first_name":"Haslett","last_name":"Culligan","email":"hculligan5@meetup.com","gender":"Male","ip_address":"201.191.72.10"}
....

Note: Spark can read JSON only of this format where we have one object per row. Otherwise you will see _corrupt_record when you print your dataset. That’s your cue to make sure the JSON is formatted as per spark’s need.

And you read JSON very similar to the way you read csv. Since in JSON we don’t have headers, we don’t need the header option.

Dataset<Row> peopleJsonDs = spark.read().JSON("fake-people.JSON");
peopleJsonDs.show(5);

And the output is,

+--------------------+----------+------+---+--------------+---------+
|               email|first_name|gender| id|    ip_address|last_name|
+--------------------+----------+------+---+--------------+---------+
|psurgison0@istock...|   Prissie|Female|  1| 48.151.89.171| Surgison|
| rsewell1@jalbum.net|    Robena|Female|  2| 184.16.37.210|   Sewell|
|aluxon2@list-mana...| Annamarie|Female|  3| 254.69.187.23|    Luxon|
|sodoherty3@twitpi...|   Shannah|Female|  4| 0.245.101.197|O'Doherty|
| alodford4@jigsy.com|     Alice|Female|  5|70.217.170.182|  Lodford|
+--------------------+----------+------+---+--------------+---------+

You can see the order of columns is jumbled. This is because JSON data doesn’t usually keep any specified order and so, when you read JSON data into a dataset, the order might not be same as what you’ve given. Of course if you want to display the columns in a particular order, you can always do a select operation.

peopleJsonDs.select("id", "first_name", "last_name", "email", "gender", "ip_address").show(5);

And that would print it in the right order. This is exactly like the SELECT query in SQL, if you’re familiar with it.

Now, that we have seen how to create Datasets, let’s see some of the operations we can perform on them.

Operations on Datasets

Datasets are built on top of Data frames. So, if you’re already familiar with Data frames in the spark 1.x releases you already know a ton about Datasets. Some of the operations you can perform on Dataset are as follows:

Column selection

Select one or more columns from the dataset.

peopleDs.select("email").show(5); // Selecting one column
peopleDs.select(col("email"), col("gender")).show(5); // Selecting multiple columns

Note: col is a static import of org.apache.spark.sql.functions.col;

Filtering on columns

Filter a subset of rows in the dataset based on conditions.

// Filter rows with id > 5 and \<= 10
peopleDs.filter(col("id").$less$eq(10).and(col("id").$greater(5))).show();

Dropping columns

Remove one or more columns from the dataset

peopleDs.drop("last_name", "ip_address").show(5);

Sorting on columns

peopleDs.sort(desc("first_name")).show(5);

And that sorts the dataset in the reverse order of the column first_name.

Output:

+---+----------+---------+--------------------+------+-------------+
| id|first_name|last_name|               email|gender|   ip_address|
+---+----------+---------+--------------------+------+-------------+
|685|  Zedekiah|  Brockie|zbrockiej0@mozill...|  Male|105.119.18.98|
|308|     Zarla| Bryceson|zbryceson8j@redif...|Female|55.118.168.15|
|636|  Zacherie|   Kermon|zkermonhn@prnewsw...|  Male| 120.36.10.87|

Those are some of the functions that you can use with Datasets. There are still several Database table type operations on Datasets, like group By, aggregations, joins, etc.. We’ll look at them in the next article on Spark as I think this article already has a lot of information already and I don’t want to overload you with information.

So, that is all for this article. If you’re someone that has never tried Datasets or Dataframes, I hope this article gave a good introduction on the topic to keep you interested in learning more.

The full code is available as gist.


This is the fifth article as part of my twitter challenge #30DaysOfBlogging. Twenty-five more articles on various topics including but not limited to Java, Git, Vim, Python, to come.

If you are interested in this, make sure to follow me on Twitter @durgaswaroop.


If you are interested in contributing to any open source projects and haven’t found the right project or if you were unsure on how to begin, I would like to suggest my own project, Delorean which is a Distributed Version control system, built from scratch in Scala. You can contribute not only in the form of code, but also with usage documentation and also by identifying any bugs in the functionality.


Thanks for reading. See you again in the next article.



Published

Category

Software

Tags