The objective of this notebook is to:
Give a proper understanding about the different PySpark functions available. A short introduction to Google Colab, as that is the platform on which this notebook is written on.
Once you complete this notebook, you should be able to write pyspark programs in an efficent way. The ideal way to use this is by going through the examples given and then trying them on Colab. At the end there are a few hands on questions which you can use to evaluate yourself.
Although some theory about pyspark and big data will be given in this notebook, I recommend everyone to read more about it and have a deeper understanding on how the functions get executed and the relevance of big data in the current scenario. A good understanding on python will be an added bonus.
This tutorial was made using Google Colab so the code you see here is meant to run on a colab notebook.
It goes through basic PySpark Functions and a short introduction on how to use Colab.
If you want to view my colab notebook for this particular tutorial, you can view it here. The viewing experience and readability is much better there.
If you want to try out things with this notebook as a base, feel free to download it from my repo here and then use it with jupyter notebook.
Big data usually means data of such huge volume that normal data storage solutions cannot efficently store and process it. In this era, data is being generated at an absurd rate. Data is collected for each movement a person makes. The bulk of big data comes from three primary sources:
Some common examples for the sources of such data include internet searches, facebook posts, doorbell cams, smartwatches, online shopping history etc. Every action creates data, it is just a matter of of there is a way to collect them or not. But what's interesting is that out of all this data collected, not even 5% of it is being used fully. There is a huge demand for big data professionals in the industry. Even though the number of graduates with a specialization in big data are rising, the problem is that they don't have the practical knowledge about big data scenarios, which leads to bad architecutres and inefficent methods of processing data.
If you are interested to know more about the landscape and technologies involved, here is an article which I found really interesting!
If you are working in the field of big data, you must have definelty heard of spark. If you look at the Apache Spark website, you will see that it is said to be a Lightning-fast unified analytics engine
. PySpark is a flavour of Spark used for processing and analysing massive volumes of data. If you are familiar with python and have tried it for huge datasets, you should know that the execution time can get ridiculous. Enter PySpark!
Imagine your data resides in a distributed manner at different places. If you try brining your data to one point and executing your code there, not only would that be inefficent, but also cause memory issues. Now let's say your code goes to the data rather than the data coming to where your code. This will help avoid unneccesary data movement which will thereby decrease the running time.
PySpark is the Python API of Spark; which means it can do almost all the things python can. Machine learning(ML) pipelines, exploratory data analysis (at scale), ETLs for data platform, and much more! And all of them in a distributed manner. One of the best parts of pyspark is that if you are already familiar with python, it's really easy to learn.
Apart from PySpark, there is another language called Scala used for big data processing. Scala is frequently over 10 times faster than Python, as it is native for Hadoop as its based on JVM. But PySpark is getting adopted at a fast rate because of the ease of use, easier learning curve and ML capabilities.
I will briefly explain how a PySpark job works, but I strongly recommend you read more about the architecture and how everything works. Now, before I get into it, let me talk about some basic jargons first:
Cluster is a set of loosely or tightly connected computers that work together so that they can be viewed as a single system.
Hadoop is an open source, scalable, and fault tolerant framework written in Java. It efficiently processes large volumes of data on a cluster of commodity hardware. Hadoop is not only a storage system but is a platform for large data storage as well as processing.
HDFS (Hadoop distributed file system). It is one of the world's most reliable storage system. HDFS is a Filesystem of Hadoop designed for storing very large files running on a cluster of commodity hardware.
MapReduce is a data Processing framework, which has 2 phases - Mapper and Reducer. The map procedure performs filtering and sorting, and the reduce method performs a summary operation. It usually runs on a hadoop cluster.
Transformation refers to the operations applied on a dataset to create a new dataset. Filter, groupBy and map are the examples of transformations.
Actions Actions refer to an operation which instructs Spark to perform computation and send the result back to driver. This is an example of action.
Alright! Now that that's out of the way, let me explain how a spark job runs. In simple terma, each time you submit a pyspark job, the code gets internally converted into a MapReduce program and gets executed in the Java virtual machine. Now one of the thoughts that might be popping in your mind will probably be: So the code gets converted into a MapReduce program. Wouldn't that mean MapReduce is faster than pySpark?
Well, the answer is a big NO. This is what makes spark jobs special. Spark is capable of handling a massive amount of data at a time, in it's distributed environment. It does this through in-memory processing, which is what makes it almost 100 times faster than Hadoop. Another factor which amkes it fast is Lazy Evaluation. Spark delays its evaluation as much as it can. Each time you submit a job, spark creates an action plan for how to execute the code, and then does nothing. Finally, when you ask for the result(i.e, calls an action), it executes the plan, which is basically all the transofrmations you have mentioned in your code. That's basically the gist of it.
Now lastly, I want to talk about on more thing. Spark mainly consists of 4 modules:
Hopefully this image gives a better idea of what I am talking about:
In the words of Google:
Colaboratory, or “Colab” for short, is a product from Google Research. Colab allows anybody to write and execute arbitrary python code through the browser, and is especially well suited to machine learning, data analysis and education. More technically, Colab is a hosted Jupyter notebook service that requires no setup to use, while providing free access to computing resources including GPUs.
The reason why I used colab is because of its shareability and free GPU and TPU. Yeah you read that right, FREE GPU AND TPU! For using TPU, your program needs to be optimized for the same. Additionally, it helps use different Google services conveniently. It saves to Google Drive and all the services are very closely related. I recommend you go through the offical overview documentation if you want to know more about it. If you have more questions about colab, please refer this link.
While using a colab notebook, you will need an active internet connection to keep a session alive. If you lose the connection you will have to download the datasets again.
2*3
from collections import Counter
print("This is a tutorial!")
Hello world!
ls
pwd
Install Dependencies:
If you have issues with spark version, please upgrade to the latest version from here.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark
Set Environment Variables:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"
!ls
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark
# Downloading and preprocessing Cars Data downloaded origianlly from https://perso.telecom-paristech.fr/eagan/class/igr204/datasets
# Many of these datasets have been cleaned up by Petra Isenberg, Pierre Dragicevic and Yvonne Jansen
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
!ls
# Load data from csv to a dataframe.
# header=True means the first row is a header
# sep=';' means the column are seperated using ''
df = spark.read.csv('cars.csv', header=True, sep=";")
df.show(5)
The above command loads our data from into a dataframe (DF). A dataframe is a 2-dimensional labeled data structure with columns of potentially different types.
There are a couple of ways to view your dataframe(DF) in PySpark:
df.take(5)
will return a list of five Row objects. df.collect()
will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node. df.show()
is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, df.show(5, False)
or df.show(5, truncate=False)
will show the entire data wihtout any truncation.df.limit(5)
will return a new DataFrame by taking the first n rows. As spark is distributed in nature, there is no guarantee that df.limit()
will give you the same results each time.Let us see some of them in action below:
df.show(5, truncate=False)
df.limit(5)
df.columns
There are two methods commonly used to view the data types of a dataframe:
df.dtypes
df.printSchema()
We can use the parameter inferschema=true
to infer the input schema automatically while loading the data. An example is shown below:
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
df.printSchema()
As you can see, the datatype has been infered automatically spark with even the correct precison for decimal type. A problem that might arise here is that sometimes, when you have to read multiple files with different schemas in different files, there might be an issue with implicit inferring leading to null values in some columns. Therefore, let us also see how to define schemas explicitly.
from pyspark.sql.types import *
df.columns
# Creating a list of the schema in the format column_name, data_type
labels = [
('Car',StringType()),
('MPG',DoubleType()),
('Cylinders',IntegerType()),
('Displacement',DoubleType()),
('Horsepower',DoubleType()),
('Weight',DoubleType()),
('Acceleration',DoubleType()),
('Model',IntegerType()),
('Origin',StringType())
]
# Creating the schema that will be passed when reading the csv
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema
df = spark.read.csv('cars.csv', header=True, sep=";", schema=schema)
df.printSchema()
# The schema comes as we gave!
df.show(truncate=False)
As we can see here, the data has been successully loaded with the specified datatypes.
We will go over the following in this section:
There are multiple ways to do a select in PySpark. You can find how they differ and how each below:
# 1st method
# Column name is case sensitive in this usage
print(df.Car)
print("*"*20)
df.select(df.Car).show(truncate=False)
NOTE:
We can't always use the dot notation because this will break when the column names have reserved names or attributes to the data frame class. Additionally, the column names are case sensitive in nature so we need to always make sure the column names have been changed to a paticular case before using it.
# 2nd method
# Column name is case insensitive here
print(df['car'])
print("*"*20)
df.select(df['car']).show(truncate=False)
# 3rd method
# Column name is case insensitive here
from pyspark.sql.functions import col
df.select(col('car')).show(truncate=False)
# 1st method
# Column name is case sensitive in this usage
print(df.Car, df.Cylinders)
print("*"*40)
df.select(df.Car, df.Cylinders).show(truncate=False)
# 2nd method
# Column name is case insensitive in this usage
print(df['car'],df['cylinders'])
print("*"*40)
df.select(df['car'],df['cylinders']).show(truncate=False)
# 3rd method
# Column name is case insensitive in this usage
from pyspark.sql.functions import col
df.select(col('car'),col('cylinders')).show(truncate=False)
We will take a look at three cases here:
# CASE 1: Adding a new column
# We will add a new column called 'first_column' at the end
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)
# CASE 2: Adding multiple columns
# We will add two new columns called 'second_column' and 'third_column' at the end
df = df.withColumn('second_column', lit(2)) \
.withColumn('third_column', lit('Third Column'))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)
# CASE 3: Deriving a new column from an exisitng one
# We will add a new column called 'car_model' which has the value of car and model appended together with a space in between
from pyspark.sql.functions import concat
df = df.withColumn('car_model', concat(col("Car"), lit(" "), col("model")))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)
As we can see, the new column car model has been created from existing columns. Since our aim was to create a column which has the value of car and model appended together with a space in between we have used the concat
operator.
We use the withColumnRenamed
function to rename a columm in PySpark. Let us see it in action below:
#Renaming a column in PySpark
df = df.withColumnRenamed('first_column', 'new_column_one') \
.withColumnRenamed('second_column', 'new_column_two') \
.withColumnRenamed('third_column', 'new_column_three')
df.show(truncate=False)
Here, we see the Dataframe API way of grouping values. We will discuss how to:
# Group By a column in PySpark
df.groupBy('Origin').count().show(5)
# Group By multiple columns in PySpark
df.groupBy('Origin', 'Model').count().show(5)
#Remove columns in PySpark
df = df.drop('new_column_one')
df.show(5,truncate=False)
#Remove multiple columnss in one go
df = df.drop('new_column_two') \
.drop('new_column_three')
df.show(5,truncate=False)
We will discuss the follwoing in this section:
# Filtering rows in PySpark
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
europe_filtered_count = df.filter(col('Origin')=='Europe').count()
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)
# Filtering rows in PySpark based on Multiple conditions
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
europe_filtered_count = df.filter((col('Origin')=='Europe') &
(col('Cylinders')==4)).count() # Two conditions added here
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)
#Get Unique Rows in PySpark
df.select('Origin').distinct().show()
#Get Unique Rows in PySpark based on mutliple columns
df.select('Origin','model').distinct().show()
# Sort Rows in PySpark
# By default the data will be sorted in ascending order
df.orderBy('Cylinders').show(truncate=False)
# To change the sorting order, you can use the ascending parameter
df.orderBy('Cylinders', ascending=False).show(truncate=False)
# Using groupBy aand orderBy together
df.groupBy("Origin").count().orderBy('count', ascending=False).show(10)
You will see three main methods for performing union of dataframes. It is important to know the difference between them and which one is preferred:
union()
– It is used to merge two DataFrames of the same structure/schema. If schemas are not the same, it returns an errorunionAll()
– This function is deprecated since Spark 2.0.0, and replaced with union()unionByName()
- This function is used to merge two dataframes based on column name.Since
unionAll()
is deprecated,union()
is the preferred method for merging dataframes.
The difference betweenunionByName()
andunion()
is thatunionByName()
resolves columns by name, not by position.
In other SQLs, Union eliminates the duplicates but UnionAll merges two datasets, thereby including duplicate records. But, in PySpark, both behave the same and includes duplicate records. The recommendation is to use distinct()
or dropDuplicates()
to remove duplicate records.
# CASE 1: Union When columns are in order
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
europe_cars = df.filter((col('Origin')=='Europe') & (col('Cylinders')==5))
japan_cars = df.filter((col('Origin')=='Japan') & (col('Cylinders')==3))
print("EUROPE CARS: "+str(europe_cars.count()))
print("JAPAN CARS: "+str(japan_cars.count()))
print("AFTER UNION: "+str(europe_cars.union(japan_cars).count()))
Result:
As you can see here, there were 3 cars from Europe with 5 Cylinders, and 4 cars from Japan with 3 Cylinders. After union, there are 7 cars in total.
# CASE 1: Union When columns are not in order
# Creating two dataframes with jumbled columns
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
Result:
As you can see here, the two dataframes have been successfully merged based on their column names.
# Functions available in PySpark
from pyspark.sql import functions
# Similar to python, we can use the dir function to view the avaiable functions
print(dir(functions))
# Loading the data
from pyspark.sql.functions import col
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
Display the Car column in exisitng, lower and upper characters, and the first 4 characters of the column
from pyspark.sql.functions import col,lower, upper, substring
# Prints out the details of a function
help(substring)
# alias is used to rename the column name in the output
df.select(col('Car'),lower(col('Car')),upper(col('Car')),substring(col('Car'),1,4).alias("concatenated value")).show(5, False)
Concatenate the Car column and Model column and add a space between them.
from pyspark.sql.functions import concat
df.select(col("Car"),col("model"),concat(col("Car"), lit(" "), col("model"))).show(5, False)
Show the oldest date and the most recent date
from pyspark.sql.functions import min, max
df.select(min(col('Weight')), max(col('Weight'))).show()
Add 10 to the minimum and maximum weight
from pyspark.sql.functions import min, max, lit
df.select(min(col('Weight'))+lit(10), max(col('Weight')+lit(10))).show()
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()
df.printSchema()
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('DOB'),'yyyy-MM-dd HH:mm:ss'))
df.show()
df.printSchema()
df = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'dd/MMM/yyyy HH:mm:ss'), to_timestamp(col('DOB'),'dd/MMM/yyyy HH:mm:ss'))
df.show()
df.printSchema()
What is 3 days earlier that the oldest date and 3 days later than the most recent date?
from pyspark.sql.functions import date_add, date_sub
# create a dummy dataframe
df = spark.createDataFrame([('1990-01-01',),('1995-01-03',),('2021-03-30',)], ['Date'])
# find out the required dates
df.select(date_add(max(col('Date')),3), date_sub(min(col('Date')),3)).show()
# Create two dataframes
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()
# Executing an inner join so we can see the id, name and price of each car in one row
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'],cars_df['car_name'],car_price_df['car_price']).show(truncate=False)
As you can see, we have done an inner join between two dataframes. The following joins are supported by PySpark:
SQL has been around since the 1970s, and so one can imagine the number of people who made it their bread and butter. As big data came into popularity, the number of professionals with the technical knowledge to deal with it was in shortage. This led to the creation of Spark SQL. To quote the docs:
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.
Basically, what you need to know is that Spark SQL is used to execute SQL queries on big data. Spark SQL can also be used to read data from Hive tables and views. Let me explain Spark SQL with an example.
# Load data
df = spark.read.csv('cars.csv', header=True, sep=";")
# Register Temporary Table
df.createOrReplaceTempView("temp")
# Select all data from temp table
spark.sql("select * from temp limit 5").show()
# Select count of data in table
spark.sql("select count(*) as total_count from temp").show()
As you can see, we registered the dataframe as temporary table and then ran basic SQL queries on it. How amazing is that?!
If you are a person who is more comfortable with SQL, then this feature is truly a blessing for you! But this raises a question:
Should I just keep using Spark SQL all the time?
And the answer is, it depends.
So basically, the different functions acts in differnet ways, and depending upon the type of action you are trying to do, the speed at which it completes execution also differs. But as time progress, this feature is getting better and better, so hopefully the difference should be a small margin. There are plenty of analysis done on this, but nothing has a definite answer yet. You can read this comparative study done by horton works or the answer to this stackoverflow question if you are still curious about it.
With map, you define a function and then apply it record by record. Flatmap returns a new RDD by first applying a function to all of the elements in RDDs and then flattening the result. Filter, returns a new RDD. Meaning only the elements that satisfy a condition. With reduce, we are taking neighboring elements and producing a single combined result. For example, let's say you have a set of numbers. You can reduce this to its sum by providing a function that takes as input two values and reduces them to one.
Some of the reasons you would use a dataframe over RDD are:
cars = spark.sparkContext.textFile('cars.csv')
print(cars.first())
cars_header = cars.first()
cars_rest = cars.filter(lambda line: line!=cars_header)
print(cars_rest.first())
How many cars are there in our csv data?
cars_rest.map(lambda line: line.split(";")).count()
Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in Europe
# Car name is column 0
(cars_rest.filter(lambda line: line.split(";")[8]=='Europe').
map(lambda line: (line.split(";")[0],
line.split(";")[1],
line.split(";")[2],
line.split(";")[5],
line.split(";")[8])).collect())
Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in either Europe or Japan
# Car name is column 0
(cars_rest.filter(lambda line: line.split(";")[8] in ['Europe','Japan']).
map(lambda line: (line.split(";")[0],
line.split(";")[1],
line.split(";")[2],
line.split(";")[5],
line.split(";")[8])).collect())
PySpark User-Defined Functions (UDFs) help you convert your python code into a scalable version of itself. It comes in handy more than you can imagine, but beware, as the performance is less when you compare it with pyspark functions. You can view examples of how UDF works here. What I will give in this section is some theory on how it works, and why it is slower.
When you try to run a UDF in PySpark, each executor creates a python process. Data will be serialised and deserialised between each executor and python. This leads to lots of performance impact and overhead on spark jobs, making it less efficent than using spark dataframes. Apart from this, sometimes you might have memory issues while using UDFs. The Python worker consumes huge off-heap memory and so it often leads to memoryOverhead, thereby failing your job. Keeping these in mind, I wouldn't recommend using them, but at the end of the day, your choice.
I personally prefer PyCharm while coding in Python/PySpark. It's based on IntelliJ IDEA so it has a lot of features! And the main advantage I have felt is the ease of installing PySpark and other packages. You can customize it with themes and plugins, and it lets you enhance productivity while coding by providing some features like suggestions, local VCS etc.
The python syntax for running jobs is: python <file_name>.py <arg1> <arg2> ...
But when you submit a spark job you have to use spark-submit to run the application.
Here is a simple example of a spark-submit command:
spark-submit filename.py --named_argument 'arguemnt value'
Here, named_argument is an argument that you are reading from inside your script.
There are other options you can pass in the command, like:
--py-files
which helps you pass a python file to read in your file,
--files
which helps pass other files like txt or config,
--deploy-mode
which tells wether to deploy your worker node on cluster or locally
--conf
which helps pass different configurations, like memoryOverhead, dynamicAllocation etc.
There is an entire page in spark documentation dedicated to this. I highly recommend you go through it once.
When getting started with dataframes, the most common question is: 'How do I create a dataframe?'
Below, you can see how to create three kinds of dataframes:
from pyspark.sql.types import StructType
sc = spark.sparkContext
#Create empty df
schema = StructType([])
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty.show()
from pyspark.sql.types import StructType, StructField
#Create empty df with header
schema_header = StructType([StructField("name", StringType(), True)])
empty_with_header = spark.createDataFrame(sc.emptyRDD(), schema_header)
empty_with_header.show()
from pyspark.sql import Row
mylist = [
{"name":'Alice',"age":13},
{"name":'Jacob',"age":24},
{"name":'Betty',"age":135},
]
spark.createDataFrame(Row(**x) for x in mylist).show()
# You can achieve the same using this - note that we are using spark context here, not a spark session
from pyspark.sql import Row
df = sc.parallelize([
Row(name='Alice', age=13),
Row(name='Jacob', age=24),
Row(name='Betty', age=135)]).toDF()
df.show()
As mentioned earlier, there are two easy to remove duplicates from a dataframe. We have already seen the usage of distinct under Get Distinct Rows section.
I will expalin how to use the dropDuplicates()
function to achieve the same.
drop_duplicates()
is an alias fordropDuplicates()
from pyspark.sql import Row
from pyspark.sql import Row
mylist = [
{"name":'Alice',"age":5,"height":80},
{"name":'Jacob',"age":24,"height":80},
{"name":'Alice',"age":5,"height":80}
]
df = spark.createDataFrame(Row(**x) for x in mylist)
df.dropDuplicates().show()
dropDuplicates()
can also take in an optional parameter called subset which helps specify the columns on which the duplicate check needs to be done on.
df.dropDuplicates(subset=['height']).show()
Before we begin, please note that this entire section is written purely based on experience. It might differ with use cases, but it will help you get a better understanding of what you should be looking for, or act as a guidance to achieve your aim.
Spark Performance Tuning refers to the process of adjusting settings to record for memory, cores, and instances used by the system. This process guarantees that the Spark has a flawless performance and also prevents bottlenecking of resources in Spark.
Considering you are using Amazon EMR to execute your spark jobs, there are three aspects you need to take care of:
Sizing your EMR is extremely important, as this affects the efficency of your spark jobs. Apart from the cost factor, the maximum number of nodes and memory your job can use will be decided by this. If you spin up a EMR with high specifications, that obviously means you are paying more for it, so we should ideally utilize it to the max. These are the guidelines that I follow to make sure the EMR is rightly sized:
Look at the above criteria against the memory you need to process, and the disk space you would need. Start with a small configuration, and keep adding nodes to arrive at an optimal configuration. In case you are wondering about the Execution time vs EMR configuration factor, please understand that it is okay for a job to run longer, rather than adding more resources to the cluster. For example, it is okay to run a job for 40 mins job on a 5 node cluster, rather than running a job in 10 mins on a 15 node cluster.
Another thing you need to know about EMRs, are the different kinds of EC2 instance types provided by Amazon. I will briefly talk about them, but I strongly recommend you to read more about it from the official documentation. There are 5 types of instance classes. Based on the job you want to run, you can decide which one to use:
Instance Class Description General purpose Balance of compute, memory and networking resources Compute optimized Ideal for compute bound applications that benefit from high performance processors Memory optimized Designed to deliver fast performance for workloads that process large data sets in memory Storage optimized For workloads that require high, sequential read and write access to very large data sets on local storage GPU instances Use hardware accelerators, or co-processors, to perform high demanding functions, more efficiently than is possible in software running on CPUs
The configuration (memory, storage, cpu, network performance) will differ based on the instance class you choose.
To help make life easier, here is what I do when I get into a predicament about which one to go with:
This will easily help you undesrstand what you are getting into, and thereby help you make the best choice! The site was built by Garret Heaton(founder of Swoot), and has helped me countless number of times to make an informed decision.
There are a ton of configurations that you can tweak when it comes to Spark. Here, I will be noting down some of the configurations which I use, which have worked well for me. Alright! let's get into it!
When you submit your job in a cluster, it will be given to Spark Schedulers, which is responsible for materializing a logical plan for your job. There are two types of job scheduling:
I personally prefer using the FAIR mode, and this can be set by adding
.config("spark.scheduler.mode", "FAIR")
when you create your SparkSession.
We have two types of serializers available:
Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
Java serialization is used by default because if you have custom class that extends Serializable it can be easily used. You can also control the performance of your serialization more closely by extending java.io.Externalizable
The general recommendation is to use Kyro as the serializer whenver possible, as it leads to much smaller sizes than Java serialization. It can be added by using
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
when you create your SparkSession.
It is generally a good idea to compress the output file after the map phase. The spark.shuffle.compress
property decides whether to do the compression or not. The compression used is spark.io.compression.codec
.
The property can be added by using
.config("spark.shuffle.compress", "true")
when you create your SparkSession.
There are 4 defaiult codecs spark provides to compress internal data such as RDD partitions, event log, broadcast variables and shuffle outputs. They are:
The decision on which to use rests upon the use case. I generally use the
snappy
compression. Google created Snappy because they needed something that offered very fast compression at the expense of final size. Snappy is fast, stable and free, but it increases the size more than the other codecs. At the same time, since compute costs will be less, it seems like balanced trade off. The property can be added by using.config("spark.io.compression.codec", "snappy")
when you create your SparkSession.
This session explains the best practice of compression/decompression codes in Apache Spark. I recommend you to take a look at it before taking a decision.
The property spark.speculation
performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. Speculative execution will not stop the slow running task but it launches the new task in parallel.
I usually disable this option by adding
.config("spark.speculation", "false")
when I create the SparkSession.
There are mainly two application properties that you should know about:
spark.driver.memoryOverhead - The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). This option is currently supported on YARN and Kubernetes.
spark.executor.memoryOverhead - The amount of off-heap memory to be allocated per executor, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes.
If you ever face an issue like
Container killed by YARN for exceeding memory limits
, know that it is because you have not specified enough memory Overhead for your job to successfully execute. The default value for Overhead is 10% of avaialbe memory (driver/executor sepearte), with minimum of 384.
Lastly, I want to talk about Dynamic Allocation. This is a feature I constantly use while executing my jobs. This property is by defualt set to False. As the name suggests, it sets whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. Truly a wonderful feature, and the greatest benefit of using it is that it will help make the best use of all the resources you have! The disadvantage of this feature is that it does not shine well when you have to execute tasks in parallel. Since most of the resources will be used by the first task, the second one will have to wait till some resource gets released. At the same time, if both get submitted at the exact same time, the resources will be shared between them, although not equally. Also, it is not guaranteed to always use the most optimal configurations. But in all my tests, the results have been great!
If you are planning on using this feature, you can pass the configurations as required through the spark-submit command. The four configurations which you will have to keep in mind are:
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors --conf spark.dynamicAllocation.minExecutors --conf spark.dynamicAllocation.maxExecutors
Apart from EMR and Spark tuning, there is another way to approach opttimizations, and that is by tuning your job itself to produce results efficently. I will be going over some such techniques which will help you achieve this. The Spark Programming Guide talks more about these concepts in detail. If you guys prefer watching a video over reading, I highly recommend A Deep Dive into Proper Optimization for Spark Jobs by Daniel Tomes from Databricks, which I found really useful and informative!
For some jobs, the efficenecy can be increased by caching them in memory. Broadcast Hash Join(BHJ) is such a technique which will help you optimize join queries when the size of one side of the data is low.
BroadCast joins are the fastest but the drawaback is that it will consume more memory on both the executor and driver.
This following steps give a sneak peek into how it works, which will help you understand the use cases where it can be used:
Some things to keep in mind about BHJ:
A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are the basic units of parallelism in Spark. Having too large a number of partitions or too few is not an ideal solution. The number of partitions in spark should be decided based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all. Generally, spark partitioning can be broken down in three ways:
Spark usually does a good job of figuring the ideal configuration for this one, except in very particular cases. It is advisable to use the spark default unless:
spark.sql.files.maxpartitionBytes
: This property indicates the maximum number of bytes to pack into a single partition when reading files (Default 128 MB) . Use this to increase the parallelism in reading input data. For example, if you have more cores, then you can increase the number of parallel tasks which will ensure usage of the all the cores of the cluster, and increase the speed of the task.
One of the major reason why most jobs lags in performance is, for the majority of the time, because they get the shuffle partitions count wrong. By default, the value is set to 200. In almost all situations, this is not ideal. If you are dealing with shuffle satge of less than 20 GB, 200 is fine, but otherwise this needs to be changed. For most cases, you can use the following equation to find the right value:
Partition Count = Stage Input Data / Target Size
where
Largest Shuffle Stage (Target Size) < 200MB/partition
in most cases.
spark.sql.shuffle.partitions
property is used to set the ideal partition count value.
If you ever notice that target size at the range of TBs, there is something terribly wrong, and you might want to change it back to 200, or recalculate it. Shuffle partitions can be configured for every action (not transformation) in the spark script.
Let us use an example to explain this scenario:
Assume shuffle stage input = 210 GB.
Partition Count = Stage Input Data / Target Size = 210000 MB/200 MB = 1050.
As you can see, my shuffle partitions should be 1050, not 200.
But, if your cluster has 2000 cores, then set your shuffle partitions to 2000.
In a large cluster dealing with a large data job, never set your shuffle partitions less than your total core count.
Shuffle stages almost always precede the write stages and having high shuffle partition count creates small files in the output. To address this, use localCheckPoint just before write & do a coalesce call. This localCheckPoint writes the Shuffle Partition to executor local disk and then coalesces into lower partition count and hence improves the overall performance of both shuffle stage and write stage.
There are different methods to write the data. You can control the size, composition, number of files in the output and even the number of records in each file while writing the data. While writing the data, you can increase the parallelism, thereby ensuring you use all the resources that you have. But this approach would lead to a larger number of smaller files. Usually, this isn't a problem, but if you want bigger files, you will have to use one of the compaction techniques, preferably in a cluster with lesser configuration. There are multiple ways to change the composition of the output. Keep these two in mind about composition:
Try to incorporate these to your coding habits for better performance:
© Jacob Celestine 2021
This content is fully original. All the write-ups is either original or have been properly credited to their sources, if any, and the original link to the dataset has also been provided.
All the code has been written by myself without referring anywhere, using knowledge gained through my experience and multiple courses I have attended over the years. If you have any issues, please write to me before anything else.
The dataset has been sourced from Project Datasets and have been cleaned up by Petra Isenberg, Pierre Dragicevic and Yvonne Jansen.