Hello Everyone, Today i will teach you how can you optimize your code if you have joins in your pyspark code. These codes can be used in real time projects. I have faced such issues while developing. Glad to share with you. If you have any questions then please let me know in the comment section.
1.
from pyspark.sql.functions import broadcast

# Assume df1 and df2 are two dataframes to be joined
# and col1 and col2 are the join columns

# Identify the smaller dataframe
small_df = df1 if df1.count() < df2.count() else df2
large_df = df2 if df1.count() < df2.count() else df1

# Broadcast the smaller dataframe and perform a join
joined_df = large_df.join(broadcast(small_df), [col1, col2], 'full')

2. 
from pyspark.sql.functions import hash

# Assume df1 and df2 are two dataframes to be joined
# and col1 and col2 are the join columns

# Partition both dataframes by the join columns
num_partitions = 100 # adjust based on your cluster configuration
partitioned_df1 = df1.repartition(num_partitions, hash(col1, col2))
partitioned_df2 = df2.repartition(num_partitions, hash(col1, col2))

# Perform a join
joined_df = partitioned_df1.join(partitioned_df2, [col1, col2], 'full') 
3. 
from pyspark.sql.functions import broadcast, hash, col, floor

# Assume df1 and df2 are two dataframes to be joined
# and col1 and col2 are the join columns of type 'int'

# Define the number of buckets for each column
num_buckets_col1 = 100
num_buckets_col2 = 50

# Define a hash function to bucket the columns
hash_func = hash(col("col1") + col("col2"))

# Bucket the dataframes by the join columns
df1_bucketed = df1.withColumn("bucket_col1", floor(hash_func % num_buckets_col1)) \
                  .withColumn("bucket_col2", floor(hash_func % num_buckets_col2))

df2_bucketed = df2.withColumn("bucket_col1", floor(hash_func % num_buckets_col1)) \
                  .withColumn("bucket_col2", floor(hash_func % num_buckets_col2))

# Broadcast the smaller dataframe and perform a join
if df1_bucketed.count() < df2_bucketed.count():
    small_df = df1_bucketed
    large_df = df2_bucketed
else:
    small_df = df2_bucketed
    large_df = df1_bucketed
   
joined_df = large_df.join(broadcast(small_df), ["col1", "col2", "bucket_col1", "bucket_col2"], "full")

# Drop the bucket columns from the joined dataframe
joined_df = joined_df.drop("bucket_col1", "bucket_col2")

In the above code, we first define the number of buckets for each join column. We then define a hash function to bucket the dataframes by the join columns. We use the floor function to convert the hash value to an integer between 0 and the number of buckets.

We then bucket the dataframes by adding new columns bucket_col1 and bucket_col2 to each dataframe. We perform a full join between the bucketed dataframes, using both the join columns and the bucket columns as the join keys. Finally, we drop the bucket columns from the joined dataframe.

This technique can help reduce the amount of data shuffled across the network and improve the performance of the join operation. However, it's important to choose an appropriate number of buckets for each column based on the data distribution and the available resources in your cluster.

Yeah!! Now you know how to code this type of questions if an interviewer asked during the interview.

Thanks, if you learn something today, please do comment, share and like my pages.