Mauro Krikorian
November 11, 2020
The last couple of months, here at SOUTHWORKS we have been involved in several Big Data projects. We have been building complete ETL and processing pipelines at large-scale that leverage among them some of the trending tools within the Big Data world: Apache Spark, and its close relative Databricks.
During one of our projects’ break from phase to phase, we took the opportunity to come up with this article and reference implementation to do our bit towards contributing with the big community of software & data engineers or anyone that wants to start walking their first steps into this huge world.
Before we get into it, I don’t want to skip mentioning the great group of people that brought this sample to life and contributed with feedback into putting this together: Lucas Nicolas Cabello, Santiago Calle, Pablo Costantini, Mauro Ezequiel Della Vecchia, Derek Nicolas Fernandez, Juan Manso & Alejandro Tizzoni.
We hope that you like it 😃
In this introductory article we are going to see initial steps you might take to start working with Apache Spark (Spark).
First, we are going to mention some introductory concepts, its main use-cases and advantages to finally introduce a complete sample application which makes use of Spark’s Dataframe API for processing large datasets in a distributed way.
For this application, we are performing some data analysis over the MovieLens dataset[¹], which consists of 25 million ratings given to 62,000 movies by 162,000 users, thus obtaining some statistics.
Such statistics include:
Lastly, we want to share with you some benchmarking we performed using different worker configurations and to talk about distributed processing.
Apache Spark is a fast, distributed data processing engine most commonly used for big data due to its capability of splitting data into chunks and distributing them across computational resources (workers).
Spark Architecture
Spark processing occurs completely in-memory (actually, if possible) avoiding the overhead of I/O calls. It incorporates libraries with APIs for several different use-cases. Mentioning some non-exhaustively:
Spark APIs
Some of the main advantages that Spark provides are:
A Dataframe is a data abstraction or a domain-specific language (DSL) for working with structured and semi-structured data, i.e. datasets with a schema. In few words, it’s a collection of rows with a schema that is the result of a structured query it describes.
It uses the immutable, in-memory, resilient, distributed, and parallel capabilities of RDDs, and applies a structure (called schema) to the data. The key difference between Dataframes and RDDs is that, when working with Dataframes, you have much more information about the structure of the data. Information such as names of the column, the data types, etc. are already known, which enables a whole bunch of optimizations which would not be possible in RDD. Thus, the manual parallelization performed over RDDs wouldn’t be necessary to be performed when working with Dataframes, since they are already optimized. Apart from that, Spark Dataframes API provides a much easier way to perform data analysis on potentially big amounts of data, by performing SQL-like operations in a dev-friendly manner.
As we previously mentioned, this sample application shows how to perform simple data analysis over a big dataset consisting on 25M+ entries of movie/rating pairs.
The scope of this application is to perform all needed operations for calculating a set of metrics in the most efficient way possible. To achieve the best performance, we are using the Dataframe API, which -as we’ve just mentioned- is heavily optimized for SQL-like operations over Dataframes.
Note: In this repository you can find the entire implementation of the sample application in Java, Python, Scala and .NET.
If you want to run the code locally please follow steps on the README.md file.
The Spark job reads two CSV files: movies.csv and ratings.csv. These files’ contents are at first instance loaded into two Dataframes, and then manipulated in order to obtain the proposed metrics, reducing original datasets into a single, small dataset for each metric to be calculated.
The Spark job entry point is the main method. The parameters accepted by this method should be added at the end of the spark-submit command used for execution, as explained on README.md.
Accepted parameters:
Within the run method, a SparkSession (entry point for all Spark functionalities) is created and then, we read the datasets into Dataframes. Once the Dataframes are ready, we proceed to perform the analysis, calculating the metrics defined above and printing the result.
Java
Follow the links to find code examples in Scala, Python, or .NET
To read data from CSV files, we created a private method called readCsvIntoDataframe. It wraps the SparkSession.read() method. As our datasets are comma-separated value files, the option that better suits our needs is csv. To see all available formats, check the‘What is a Dataframe?’ section above.
Java
Follow the links to find code examples in Scala, Python, or .NET
First of all, we want to give you a high-level overview of all operators used within the queries.
Operators used
Execution plan
Code implementation (Java code sample)
Results
Winner:
Best films by overall rating: The Shawshank RedemptionList obtained in: 17.24sAverage Score Title
4.413576 Shawshank Redemption, The (1994)
4.324336 Godfather, The (1972)
4.284353 Usual Suspects, The (1995)
4.261759 Godfather: Part II, The (1974)
4.254769 Seven Samurai (Shichinin no samurai) (1954)
4.247579 Schindler's List (1993)
4.243014 12 Angry Men (1957)
4.237948 Rear Window (1954)
4.228311 Fight Club (1999)
4.218662 One Flew Over the Cuckoo's Nest (1975)
Where is Pulp Fiction?
Execution plan
Code implementation (.NET code sample)
Results
Winner: Film-Noir
Genres by average rating
Results obtained in 39,21sAverage Rating Genre
3,925728 Film-Noir
3,791466 War
3,705281 Documentary
3,685044 Crime
3,677185 Drama
3,670169 Mystery
3,614946 Animation
3,603712 IMAX
3,585755 Western
3,554716 Musical
3,542712 Romance
3,522964 Thriller
3,517445 Adventure
3,511589 Fantasy
3,478143 Sci-Fi
3,466592 Action
3,432507 Children
3,423993 Comedy
3,326379 (no genres listed)
3,293563 Horror
We all love Film-Noir movies, don’t we?
Execution plan
Code implementation (Python code sample)
Results
Winner: Drama/Romance
Genres combinations:List obtained in: 9.95sTimes related Genre Most related genre
2406 Action Drama
1652 Adventure Action
1015 Animation Children
1169 Children Comedy
4603 Comedy Drama
2996 Crime Drama
245 Documentary Drama
4654 Drama Romance
836 Fantasy Comedy
258 Film-Noir Drama
2181 Horror Thriller
100 IMAX Action
519 Musical Comedy
1466 Mystery Thriller
4654 Romance Drama
1185 Sci-Fi Action
3510 Thriller Drama
1348 War Drama
300 Western Action
Drama and romance? Wasn’t it obvious from the beginning?
We obtained six different metrics from the same data origin and benchmarked executions running with three different worker configurations:
As we can appreciate in the table below, every calculation had its fastest execution time using the greatest amount of workers. Meanwhile, the slowest execution for every metric is the one with a single worker.
Metric Workers Elapsed time (min:sec)
Best films by ratings * 00:17.244
Best films by ratings 2 00:27.314
Best films by rating 1 00:39.630
Most rated films * 00:14.066
Most rated films 2 00:19.650
Most rated films 1 00:28.065
Global ratings by user, by category * 00:58.422
Global ratings by user, by category 2 01:06.391
Global ratings by user, by category 1 01:35.265
Users with overall lowest ratings given * 00:17.027
Users with overall lowest ratings given 2 00:24.413
Users with overall lowest ratings given 1 00:40.309
Genres by overall rating * 00:15.859
Genres by overall rating 2 00:26.676
Genres by overall rating 1 00:41.311
Most commonly correlated genres * 00:09.958
Most commonly correlated genres 2 00:10.976
Most commonly correlated genres 1 00:17.365
Throughout this article, we have been able to understand how to use Spark to perform data analysis using Java, Python, Scala and .NET, by performing simple SQL-like operations to obtain six different metrics from a big dataset, also correlating data from different sources.
We benchmarked the execution on regular computers with three different worker threads configurations:
Comparing those results, we were able to confirm that the six metrics ran faster with a free amount of workers. Thus, we can conclude that Spark takes advantage of parallel processing out-of-the-box, by using the Dataframe API, which is heavily optimized towards performance with low-to-none manual tweaks to be implemented. There is no recommended number of workers as it will depend on each specific case, most related with dataset size & operations performed to reach the expected results.
There’s no golden rule regarding when to use or not to use Spark, but let’s list some of the most common go-to scenarios:
Finally, let’s mention two single no go-to scenarios:
[¹]: F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1–19:19. https://doi.org/10.1145/2827872
Originally published by Mauro Krikorian for SOUTHWORKS on Medium 11 November 2020