Threaded Tasks in PySpark Jobs

There are circumstances when tasks (Spark action, e.g. save, count, etc) in a PySpark job can be spawned on separate threads. Doing so, optimizes distribution of tasks on executor cores.

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.

Threading tasks addresses this particular situation: if the jobs at the head of the queue are large, then later jobs may be delayed significantly, through,

Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish.

Simple But Very Common Use Case

Saving multiple results to parquet files is a common action. Here’s how to thread three write.parquet action.

An important reminder is to set set(‘spark.scheduler.mode’,’FAIR’) in the sparkContext.

import threading
def set_spark_context(app_name=None): conf = SparkConf().\ setAppName(app_name).\ set('spark.hadoop.mapreduce.output.fileoutputformat.compress', 'false').\ set('spark.sql.parquet.compression.codec','uncompressed').\ set('spark.scheduler.mode','FAIR') # need to set this on the sparkContext sc = SparkContext(conf=conf) try: sc._jvm.org.apache.hadoop.hive.conf.HiveConf() sqlCtx = sqlContext = HiveContext(sc) except py4j.protocol.Py4JError: sqlCtx = sqlContext = SQLContext(sc) return sc, sqlContext
#function to be called on spawned threads*
def saveParquet(df,path,pool_id,sc): sc.setLocalProperty("spark.scheduler.pool", str(pool_id)) df.repartition(1).write.parquet(path) sc.setLocalProperty("spark.scheduler.pool", None)
sc, sqlContext = set_spark_context("Simple TASK")
# Do processing here to produce three dataframes to be written as parquets.
# The tree results are df1, df2, df3
path1 = "/path/1"
path2 = "/path/2"
path3 = "/path/3"
df1_write = threading.Thread(target=saveParquet, args=(df1,path1,1,sc,))
df2_write = threading.Thread(target=saveParquet, args=(df2,path2,2,sc,))
df3_write = threading.Thread(target=saveParquet, args=(df3,path3,3,sc,))
df1.start()
df2.start()
df3.start()
df1.join()
df2.join()
df3.join()

When this job is submitted on the cluster, at first it will show 1 scheduler pool.

However, at the stage where the actual snippet above is already being interpreted, it will show the 3 pools spawned and the active pools currently being processed.

Hopefully, this is something useful for us trying to speed up running jobs.

Posted by Web Monkey