Spark-Radiant is now available!
Spark-Radiant is Apache Spark Performance and Cost Optimizer. The product, Spark-Radiant will help optimize performance and cost considering catalyst optimizer rules, Enhanced Auto-Scaling in Spark, Collecting Important Metrics related to spark job, BloomFilter Index in Spark etc.
The project, Spark Radiant was introduced in my previous blog. The Spark-Radiant 1.0.2 is now available and ready to use. The dependency for Spark-Radiant 1.0.2 is available in maven central. In this blog, I will discuss the availability of Spark-Radiant, and new features added.
How to Use Spark-Radiant-1.0.2 with Spark Jobs ?
For maven projects, use the below dependency in the pom.xml
<dependency>
<groupId>io.github.saurabhchawla100</groupId>
<artifactId>spark-radiant-sql</artifactId>
<version>1.0.2</version>
</dependency>
Prerequisites
a) Spark-Radiant is supported with spark-3.0.x and newer version of spark
b) Supported Scala version 2.12.x.
c) Scala and Python support is available with Spark-Radiant-1.0.2.
Running Spark job with Spark-Radiant
Use the published spark-radiant-sql-1.0.2.jar, spark-radiant-core-1.0.2.jar from maven central at runtime while running the spark jobs
./bin/spark-shell --packages "io.github.saurabhchawla100:spark-radiant-sql:1.0.2,io.github.saurabhchawla100:spark-radiant-core:1.0.2"./bin/spark-submit
--packages "io.github.saurabhchawla100:spark-radiant-sql:1.0.2,io.github.saurabhchawla100:spark-radiant-core:1.0.2"
--class com.test.spark.examples.SparkTestDF /spark/examples/target/scala-2.12/jars/spark-test_2.12-3.1.1.jar
Running Spark-Radiant-Sql Optimizer Rule
Spark-Radiant-Sql Optimizer Rule can be available at runtime in both scala and python.
In Scala
import com.spark.radiant.sql.api.SparkRadiantSqlApi
// adding Extra optimizer rule
val sparkRadiantSqlApi = new SparkRadiantSqlApi()
sparkRadiantSqlApi.addOptimizerRule(spark)
In PySpark
./bin/pyspark --packages io.github.saurabhchawla100:spark-radiant-1.0.2
// Importing the extra Optimizations rule
from sparkradiantsqlpy import SparkRadiantSqlApi
SparkRadiantSqlApi(spark).addExtraOptimizerRule()
OR
// Importing the extra Optimizations rule
spark._jvm.com.spark.radiant.sql.api.SparkRadiantSqlApi().addOptimizerRule(spark._jsparkSession)
Use Spark-Radiant at runtime
Below are some of the new features and improvements that are available with Spark-Radiant-1.0.2
a) Using Dynamic Filtering in Spark: Spark-Radiant Dynamic Filter works well for the Join which is a type of a star schema, where one table consists of a large number of records as compared to other tables. Dynamic Filtering works on runtime by using the predicates from the smaller table, filter out the join columns, using those predicates result on the bigger table and filters out the bigger table. This reduces the number of records on the bigger side of the join, resulting in a less expensive join, and also improved performance of the Spark SQL queries. This works with Inner join, Right outer join, Left semi join, Left outer join and Left anti join.
Performance Improvement Factors
- Improved Network Utilization: Dynamic filter reduces the number of records involved in the join operation and this helps in reducing the shuffle data generated and minimizes network I/O.
- Improved Resource Utilization: The number of records involved in the join is reduced as a result of using the Dynamic Filtering in Spark. This reduces the system resource requirements since the number of tasks spawned for the Join operation is reduced. This results in the completion of jobs with lower number of resources.
- Improved Disk I/0: Push down the dynamic filter to the FileSourceScan / Datasource to read only the filter records. This will reduce the pressure on the Disk I/O.
Regular Join in Spark
val df = spark.sql("select * from table, table1, table2 where table._1=table1._1 and table._1=table2._1
and table1._3 <= 'value019' and table2._3 = 'value015'")
df.show()

Dynamic Filter on Spark Join
val df = spark.sql("select * from table, table1, table2 where table._1=table1._1 and table._1=table2._1
and table1._3 <= 'value019' and table2._3 = 'value015'")
df.show()

It was found that using the Dynamic Filter on Spark Join improved the performance 8X as compared to the query run with regular spark join.
For more Information refer the doc .
b) Using Size Based Join ReOrdering in Spark: Spark-Radiant Size Based Join ReOrdering works well for the Join which is of type star schema, where one table consists of large number of records as compared to other tables and all the join condition of smaller table with a large table. Spark by default perform join left to right (whether its BHJ before the SMJ or vice versa). This optimizer rule allows the smaller table join first before the bigger table (BHJ first before the SMJ.).
spark.sql.support.sizebased.join.reorder : Config to add the support of SizeBasedJoinReOrdering for the sql queries. The default value is false.
Performance Improvement Factors
Improved Network Utilization: SizeBasedJoinReOrdering performs BHJ before SMJ , hence reduces the number of records involved in the join operation and this helps in reducing the shuffle data generated and minimizes Network I/O.
Improved Resource Utilization: The number of records involved in the join is reduced as the result of using the SizeBasedJoinReOrdering in spark.This reduces the system resource requirements since the number of tasks spawned for the Join operation is reduced. This results in the completion of jobs with lower number of resources.
Regular Join in Spark
val df = spark.sql("select * from t,t2, t1 where t._1=t1._1 and t._2 = t1._2 and t._1=t2._1 and t1._2 > 'value15' and t2._3 = 'value019'")
df.show

Size Based Join ReOrdering
val df = spark.sql("select * from t,t2, t1 where t._1=t1._1 and t._2 = t1._2 and t._1=t2._1 and t1._2 > 'value15' and t2._3 = 'value019'")
df.show

SizeBasedJoinReOrdering join works 4X faster than the regular Spark Join for this query.
For more Information refer the doc .
c) UnionReuseExchangeOptimizeRule: This rule works for scenarios when union is present with aggregation having same grouping columns. The union is between the same table/datasource. In this scenario, instead of scanning twice the table/datasource, there will be one scan of table/datasource, and the other child of union will reuse this scan. This feature is enabled using
— conf spark.sql.optimize.union.reuse.exchange.rule=true
val df = spark.sql("select test11, count(*) as count from testDf1" +
" group by test11 union select test11, sum(test11) as count" +
" from testDf1 group by test11")

d) ExchangeOptimizeRule — This optimizer rule works for scenarios where partial aggregate exchange is present and also the exchange which is introduced by SMJ and other types of join that adds the shuffle exchange, so in total there are 2 shuffle exchange present in the executed plan. And in a scenario where the cost of creating both the exchange are almost the same, we will skip the exchange created by the partial aggregate and there will be only one exchange left instead of two, and the partial and complete aggregation will be done on the same exchange. This can be enabled using-
— conf spark.sql.skip.partial.exchange.rule=true
spark.sql("select a.* from (select _1, _2, count(*) from t1 group by _1, _2) a join t2 b on a._1= b._1")

Conclusion:
In this blog, I discussed how to use Spark-Radiant 1.0.2. The new features added to the Spark-Radiant 1.0.2 like Dynamic Filtering, SizeBasedJoinReOrdering, UnionReUseExchange etc will provide benefits related to Performance and Cost Optimization.
In near future, we will come up with new related blogs. Keep watching this space for more!