🔥Understanding and Using PySpark Window Functions

 

Understanding and Using PySpark Window Functions

Introduction

PySpark is a powerful tool for handling big data processing using the Spark framework. One of its important features is the ability to perform window functions, which allow calculations across a set of table rows related to the current row. These are particularly useful for ranking, cumulative calculations, and analytics.

In this blog, we will explore various PySpark SQL functions, especially window functions, and break down the given code step by step.


Step 1: Loading Data and Ordering by Salary

from pyspark.sql.functions import * 

data="/FileStore/tables/empmysql.csv"
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(data)
df=df.orderBy(col("sal").desc())

df.show()

Syntax:

DataFrame.orderBy(*cols, ascending=True)

Explanation:

  1. import * from pyspark.sql.functions - Imports all functions from PySpark.
  2. spark.read.format("csv") - Reads a CSV file.
  3. option("header","true") - Indicates that the first row is a header.
  4. option("inferSchema","true") - Automatically infers the data type of columns.
  5. orderBy(col("sal").desc()) - Sorts the dataframe in descending order based on the "sal" column.
  6. df.show() - Displays the data.

Step 2: Applying Window Functions (Rank, Dense Rank, Row Number)

from pyspark.sql.window import * 
win = Window.partitionBy("job").orderBy(col("sal").desc())
ndf = df.withColumn("rnk", rank().over(win))
         .withColumn("drank", dense_rank().over(win))
         .withColumn("rno", row_number().over(win))
ndf.show()

Syntax:

rank().over(window_spec)
dense_rank().over(window_spec)
row_number().over(window_spec)

Explanation:

  1. Window.partitionBy("job").orderBy(col("sal").desc()) - Creates a window partitioned by "job" and ordered by "sal" in descending order.
  2. rank() - Assigns ranks but leaves gaps in case of ties.
  3. dense_rank() - Assigns ranks without gaps (continuous ranking).
  4. row_number() - Assigns a unique sequential row number without duplicates.
  5. withColumn() - Adds a new column based on computed rank functions.
  6. ndf.show() - Displays the updated dataframe.

Difference Between Rank Functions:

  • dense_rank - Ensures continuous ranking when duplicates exist.
  • rank - Assigns the same rank to duplicates but leaves gaps.
  • row_number - Ensures a unique ranking without ties.

Step 3: Renaming Columns

data="/FileStore/tables/us_500.csv"
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(data)
df=df.withColumnRenamed("zip","salary").withColumnRenamed("phone1","cell")

Syntax:

df.withColumnRenamed("old_column_name", "new_column_name")

Explanation:

  1. withColumnRenamed(old, new) - Renames a single column.
  2. *toDF(cols) - Renames all columns at once if needed.

Step 4: Using NTILE() for Quartiles

win = Window.partitionBy("state").orderBy(col("salary").desc())
ndf = (df.withColumn("rnk", rank().over(win))
       .withColumn("drank", dense_rank().over(win))
       .withColumn("rno", row_number().over(win))
       .drop("web","email","phone2")
       .withColumn("ntile", ntile(4).over(win)))

Syntax:

ntile(n).over(window_spec)

Explanation:

  1. ntile(4) - Splits data into four groups (quartiles).
  2. drop("web","email","phone2") - Removes unnecessary columns.

Step 5: Loading Employee Data and Renaming Columns

data="/FileStore/tables/emp_manager_sal_wo_header.csv"
df=spark.read.format("csv").option("inferSchema","true").option("mode","DROPMALFORMED").load(data)
df=df.toDF("empid","empname","sal","managerid")
df.show()

Syntax:

df.toDF("col1", "col2", "col3", ...)

Explanation:

  1. option("mode","DROPMALFORMED") - Drops malformed rows during reading.
  2. toDF("col1","col2",...) - Renames all columns.

Step 6: Renaming All Columns with Suffix

data="/FileStore/tables/asl.csv"
df=spark.read.format("csv").option("header","true").option("inferSchema","true").option("mode","DROPMALFORMED").load(data)
allcols = [c + "_new" for c in df.columns]
df=df.toDF(*allcols)
df.show()

Syntax:

[c + "_new" for c in df.columns]

Explanation:

  1. df.columns - Returns a list of column names.
  2. List comprehension - Appends "_new" to each column name.

Step 7: Grouping and Aggregating Marks with Window Functions

data="/FileStore/tables/students_marks.csv"
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(data)
df = df.groupBy("studentid").agg(sum(col("marks")).alias("total_marks"))
win = Window.orderBy(col("total_marks").desc())
res=df.withColumn("grade", ntile(5).over(win)).withColumn("per", 100*percent_rank().over(win))
display(res)

Syntax:

groupBy("column_name").agg(sum("column").alias("new_column"))

Explanation:

  1. groupBy("studentid").agg(sum(col("marks"))) - Groups by "studentid" and calculates total marks.
  2. ntile(5) - Divides students into 5 groups based on total marks.
  3. percent_rank() - Calculates the percentage ranking of each student.

Conclusion

This blog covered key PySpark window functions such as rank, dense_rank, row_number, ntile, and percent_rank. These functions are essential for advanced analytics in big data processing.

For more details, refer to the official Databricks blog: Introducing Window Functions in Spark SQL.

GitHub Repository for Reference: Ranking and Dense Rank Examples.

Dataset Download Link: Download Dataset.

Comments

Popular posts from this blog

🌐Filtering and Copying Files Dynamically in Azure Data Factory (ADF)

🔥Apache Spark Architecture with RDD & DAG

🖥️☁️AWS Athena, AWS Lambda, AWS Glue, and Amazon S3 – Detailed Explanation