Spark-Radiant : Apache Spark Performance and Cost Optimizer
Apache Spark is one of the most powerful, fastest and popular data processing engine in the Big Data world. Apache Spark has optimized algorithms to process Batch workloads, Streaming workload, ML processing etc. Many companies, whether big or startups try to maintain their own fork of Apache Spark and add some optimizations which they deploy in their Big data environment. Maintaining own fork of Apache Spark is certainly not an easy task, and includes a lot of challenges.
In this blog, we will briefly discuss some of the challenges generally faced in maintaining own fork of Spark. We will also discuss about the new project named, Spark-Radiant that will help address these challenges.
Providing Spark as a Service with your own Fork of Spark:
One of the major challenges in providing Spark as a Service is to effectively maintain own code base, while also keeping up with the many and frequent changes in the Open Source Spark.
Apache Spark releases its new version in every six months. In order to further build upon this new version of Spark, one needs to merge changes to the Spark branch, work on the codebase and resolve conflicts. This entire process requires a lot of efforts, and is also time consuming.
Why do organizations prefer maintaining their own fork of Spark?
- There are certain features that are only applicable to the use case of the organization.
- The feature request is important for majority of the customers, and open source community does not accept feature requests.
- Last, but most important, the organization does not want to open source the change for competitive reasons.
What if..?
- we had the liberty to keep up with the pace of changes in Apache Spark Master.
- we can release Spark the same day the new version of Apache Spark releases.
- we have our own Spark Optimizer Project that can be integrated with the Runtime Spark.
- we can maintain our own project and update as per the new changes in Spark.
- what if we need not modify our project with any new release by Apache Spark
Turning what ifs into reality !
Spark-Radiant is Apache Spark Performance and Cost Optimizer. The product, Spark-Radiant will help optimize performance and cost considering catalyst optimizer rules, Enhanced Auto-Scaling in Spark, Collecting Important Metrics related to spark job, BloomFilter Index in Spark etc.
Maintaining Spark-Radiant is much easier than maintaining the Spark Fork. With minimal changes in the Spark Fork, and considerable optimizations achieved using Spark-Radiant, we can easily reduce the time needed to release Spark as soon as the new version of the same is released by Apache Spark.
How to use Spark-Radiant
Build:
This project used the Maven Build.
git clone https://github.com/SaurabhChawla100/spark-radiant.git
cd spark-radiant
mvn clean install -DskipTests / mvn clean package -DskipTests
Add the Dependency to your project
Build the project locally and add the dependency to your project
For Sql Optimization(Ready to use)
<dependency>
<groupId>io.github.saurabhchawla100</groupId>
<artifactId>spark-radiant-sql</artifactId>
<version>1.0.1</version>
</dependency>
For Core Optimization (WIP)
<dependency>
<groupId>io.github.saurabhchawla100</groupId>
<artifactId>spark-radiant-core</artifactId>
<version>1.0.1</version>
</dependency>
Prerequisite
Spark-Radiant have the below prerequisites
a) This is supported with spark-3.0.x and newer version of spark
b) Supported Scala version 2.12.x
Use spark-radiant at runtime
This spark-radiant project has 2 modules, you can use those modules in your project
1) spark-radiant-sql — This contains the optimization related to performance of Spark Sql.
a) Using Dynamic Filtering in Spark: Spark-Radiant Dynamic Filter works well for the Join which is a type of star schema, where one table consists of a large number of records as compared to other tables. Dynamic Filtering works on runtime by using the predicates from the smaller table, filter out the join columns, using those predicates result on the bigger table and filters out the bigger table. This results in a less expensive join. Since the number of records on the left side is reduced resulting in improving the performance of the Spark SQL Queries.
Performance Improvement Factors
- Improved Network Utilization: Dynamic filter reduces the number of records involved in the join operation and this helps in reducing the shuffle data generated and minimizes network I/O.
- Improved Resource Utilization: The number of records involved in the join is reduced as a result of using the Dynamic Filtering in Spark. This reduces the system resource requirements since the number of tasks spawned for the Join operation is reduced. This results in the completion of jobs with lower number of resources.
- Improved Disk I/0: Push down the dynamic filter to the FileSourceScan / Datasource to read only the filter records.This will reduce the pressure on the Disk I/O.
Regular Join in Spark
val df = spark.sql(“select * from table, table1, table2 where table._1=table1._1 and table._1=table2._1
and table1._3 <= ‘value019’ and table2._3 = ‘value015’ ”)
df.show()
Dynamic Filter on Spark Join
val df = spark.sql(“select * from table, table1, table2 where table._1=table1._1 and table._1=table2._1
and table1._3 <= ‘value019’ and table2._3 = ‘value015’”)
df.show()
There was 8X benefit observed in this Dynamic Filtering query compared to the query run with regular spark join
For more Information refer the doc
b) Add multiple value in the withColumn Api of spark — Till now you can add one by one columns using withColumn call with respect to spark dataframe. If we need to multiple column in dataframe there is need to call this withColumn again and again.
For example
df.withColumn(“newCol”, lit(“someval”)).withColumn(“newCol1”, lit(“someval1”)).withColumn(“newCol2”, lit(“someval2”))
This will add the projection in each call of withColumn, resulting in the exception (stackoverflow error).
[SPARK-26224][SQL] issue of withColumn while using it multiple times
We can add all the new col in one call in one single projection using the method useWithColumnsOfSpark
import com.spark.radiant.sql.api.SparkRadiantSqlApi
val sparkRadiantSqlApi = new SparkRadiantSqlApi()
val withColMap = Map(“newCol” -> lit(“someval”), “newCol1” -> lit(“someval1”), “newCol2” -> lit(“someval2”))
val df1 = sparkRadiantSqlApi.useWithColumnsOfSpark(withColMap, inputDF)
2) spark-radiant-core : This contains the optimization related to total cost optimization. This is work in progress.
Conclusion:
Spark-Radiant is a new product and was brought into being to address many challenges, to turn ‘what ifs’ into ‘reality’ and to provide for performance and cost optimization. In near future, we will come up with new related blogs. Keep watching this space for more!