Spark-Radiant is now available!

Saurabh Chawla
6 min readSep 19, 2021

--

Spark-Radiant is Apache Spark Performance and Cost Optimizer. The product, Spark-Radiant will help optimize performance and cost considering catalyst optimizer rules, Enhanced Auto-Scaling in Spark, Collecting Important Metrics related to spark job, BloomFilter Index in Spark etc.

The project, Spark Radiant was introduced in my previous blog. The Spark-Radiant 1.0.2 is now available and ready to use. The dependency for Spark-Radiant 1.0.2 is available in maven central. In this blog, I will discuss the availability of Spark-Radiant, and new features added.

How to Use Spark-Radiant-1.0.2 with Spark Jobs ?

For maven projects, use the below dependency in the pom.xml

Spark SQL Radiant

<dependency>
<groupId>io.github.saurabhchawla100</groupId>
<artifactId>spark-radiant-sql</artifactId>
<version>1.0.2</version>
</dependency>

Prerequisites
a) Spark-Radiant is supported with spark-3.0.x and newer version of spark
b) Supported Scala version 2.12.x.
c) Scala and Python support is available with Spark-Radiant-1.0.2.

Running Spark job with Spark-Radiant

Use the published spark-radiant-sql-1.0.2.jar, spark-radiant-core-1.0.2.jar from maven central at runtime while running the spark jobs

./bin/spark-shell --packages "io.github.saurabhchawla100:spark-radiant-sql:1.0.2,io.github.saurabhchawla100:spark-radiant-core:1.0.2"./bin/spark-submit 
--packages "io.github.saurabhchawla100:spark-radiant-sql:1.0.2,io.github.saurabhchawla100:spark-radiant-core:1.0.2"
--class com.test.spark.examples.SparkTestDF /spark/examples/target/scala-2.12/jars/spark-test_2.12-3.1.1.jar

Running Spark-Radiant-Sql Optimizer Rule

Spark-Radiant-Sql Optimizer Rule can be available at runtime in both scala and python.

In Scala

import com.spark.radiant.sql.api.SparkRadiantSqlApi
// adding Extra optimizer rule
val sparkRadiantSqlApi = new SparkRadiantSqlApi()
sparkRadiantSqlApi.addOptimizerRule(spark)

In PySpark

./bin/pyspark --packages io.github.saurabhchawla100:spark-radiant-1.0.2

// Importing the extra Optimizations rule
from sparkradiantsqlpy import SparkRadiantSqlApi
SparkRadiantSqlApi(spark).addExtraOptimizerRule()

OR

// Importing the extra Optimizations rule
spark._jvm.com.spark.radiant.sql.api.SparkRadiantSqlApi().addOptimizerRule(spark._jsparkSession)

Use Spark-Radiant at runtime
Below are some of the new features and improvements that are available with Spark-Radiant-1.0.2

a) Using Dynamic Filtering in Spark: Spark-Radiant Dynamic Filter works well for the Join which is a type of a star schema, where one table consists of a large number of records as compared to other tables. Dynamic Filtering works on runtime by using the predicates from the smaller table, filter out the join columns, using those predicates result on the bigger table and filters out the bigger table. This reduces the number of records on the bigger side of the join, resulting in a less expensive join, and also improved performance of the Spark SQL queries. This works with Inner join, Right outer join, Left semi join, Left outer join and Left anti join.

Performance Improvement Factors

  • Improved Network Utilization: Dynamic filter reduces the number of records involved in the join operation and this helps in reducing the shuffle data generated and minimizes network I/O.
  • Improved Resource Utilization: The number of records involved in the join is reduced as a result of using the Dynamic Filtering in Spark. This reduces the system resource requirements since the number of tasks spawned for the Join operation is reduced. This results in the completion of jobs with lower number of resources.
  • Improved Disk I/0: Push down the dynamic filter to the FileSourceScan / Datasource to read only the filter records. This will reduce the pressure on the Disk I/O.

Regular Join in Spark

val df = spark.sql("select * from table, table1, table2 where table._1=table1._1 and table._1=table2._1
and table1._3 <= 'value019' and table2._3 = 'value015'")
df.show()
Figure 1: Showing Regular Join in Spark

Dynamic Filter on Spark Join

val df = spark.sql("select * from table, table1, table2 where table._1=table1._1 and table._1=table2._1
and table1._3 <= 'value019' and table2._3 = 'value015'")
df.show()
Figure 2 Dynamic Filter in the Spark Join

It was found that using the Dynamic Filter on Spark Join improved the performance 8X as compared to the query run with regular spark join.

For more Information refer the doc .

b) Using Size Based Join ReOrdering in Spark: Spark-Radiant Size Based Join ReOrdering works well for the Join which is of type star schema, where one table consists of large number of records as compared to other tables and all the join condition of smaller table with a large table. Spark by default perform join left to right (whether its BHJ before the SMJ or vice versa). This optimizer rule allows the smaller table join first before the bigger table (BHJ first before the SMJ.).

spark.sql.support.sizebased.join.reorder : Config to add the support of SizeBasedJoinReOrdering for the sql queries. The default value is false.

Performance Improvement Factors

Improved Network Utilization: SizeBasedJoinReOrdering performs BHJ before SMJ , hence reduces the number of records involved in the join operation and this helps in reducing the shuffle data generated and minimizes Network I/O.

Improved Resource Utilization: The number of records involved in the join is reduced as the result of using the SizeBasedJoinReOrdering in spark.This reduces the system resource requirements since the number of tasks spawned for the Join operation is reduced. This results in the completion of jobs with lower number of resources.

Regular Join in Spark

val df = spark.sql("select * from t,t2, t1  where t._1=t1._1 and t._2 = t1._2 and t._1=t2._1 and t1._2 > 'value15' and t2._3 = 'value019'")
df.show
Figure 3 Regular join without Join Reordering

Size Based Join ReOrdering

val df = spark.sql("select * from t,t2, t1  where t._1=t1._1 and t._2 = t1._2 and t._1=t2._1 and t1._2 > 'value15' and t2._3 = 'value019'")
df.show
Figure 4: Size Based Join ReOrdering in Spark

SizeBasedJoinReOrdering join works 4X faster than the regular Spark Join for this query.

For more Information refer the doc .

c) UnionReuseExchangeOptimizeRule: This rule works for scenarios when union is present with aggregation having same grouping columns. The union is between the same table/datasource. In this scenario, instead of scanning twice the table/datasource, there will be one scan of table/datasource, and the other child of union will reuse this scan. This feature is enabled using

— conf spark.sql.optimize.union.reuse.exchange.rule=true

val df = spark.sql("select test11, count(*) as count from testDf1" +
" group by test11 union select test11, sum(test11) as count" +
" from testDf1 group by test11")
Figure 5: UnionReuseExchangeOptimizeRule

d) ExchangeOptimizeRule — This optimizer rule works for scenarios where partial aggregate exchange is present and also the exchange which is introduced by SMJ and other types of join that adds the shuffle exchange, so in total there are 2 shuffle exchange present in the executed plan. And in a scenario where the cost of creating both the exchange are almost the same, we will skip the exchange created by the partial aggregate and there will be only one exchange left instead of two, and the partial and complete aggregation will be done on the same exchange. This can be enabled using-

— conf spark.sql.skip.partial.exchange.rule=true

spark.sql("select a.* from (select _1, _2, count(*) from t1 group by _1, _2) a join t2 b on a._1= b._1")
Figure 6 : ExchangeOptimizeRule

Conclusion:

In this blog, I discussed how to use Spark-Radiant 1.0.2. The new features added to the Spark-Radiant 1.0.2 like Dynamic Filtering, SizeBasedJoinReOrdering, UnionReUseExchange etc will provide benefits related to Performance and Cost Optimization.

In near future, we will come up with new related blogs. Keep watching this space for more!

Sign up to discover human stories that deepen your understanding of the world.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Saurabh Chawla
Saurabh Chawla

Written by Saurabh Chawla

Big Data Systems - Scaling and Optimizing Big Data Engines (Apache Spark) on Cloud, Distributed Systems, Data Engineering

No responses yet

Write a response