Mastering PySpark Column Operations
Mastering PySpark Column Operations
Apache Spark is a powerful distributed computing framework, and PySpark is its Python API, widely used for big data processing. This blog post explores various column operations in PySpark, covering basic to advanced transformations with detailed explanations and code examples.
1. Installing and Setting Up PySpark
Before working with PySpark, install it using:
pip install pyspark
Import the required libraries and set up a Spark session:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lit, upper, lower, when, count, avg, sum, udf
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("PySparkColumnOperations").getOrCreate()
2. Understanding Row Objects
Basic Row Object Creation
from pyspark.sql import Row
r1 = Row(1, 'divya')
print(r1) # Output: Row(_1=1, _2='divya')
Since no field names are provided, PySpark assigns default field names: _1
and _2
.
Named Row Objects
Named row objects allow us to define field names explicitly:
r1 = Row(id=1, name="sanket")
print(r1) # Output: Row(id=1, name='sanket')
Accessing values:
r1['id'], r1['name'] # Output: (1, 'sanket')
3. Creating a DataFrame Using Row Objects
data = [Row(id=1, name='Alice'), Row(id=2, name='Bob')]
df = spark.createDataFrame(data)
df.show()
Output:
+---+-----+
| id| name|
+---+-----+
| 1|Alice|
| 2| Bob|
+---+-----+
4. Column Operations in PySpark
Selecting and Renaming Columns
Selecting a specific column:
df.select("name").show()
Renaming a column:
df = df.withColumnRenamed("name", "full_name")
df.show()
Adding, Modifying, and Dropping Columns
Adding a new column:
df = df.withColumn("age", lit(25))
df.show()
Modifying an existing column:
df = df.withColumn("full_name", upper(col("full_name")))
df.show()
Dropping a column:
df = df.drop("age")
df.show()
Filtering and Conditional Operations
Filtering rows:
df.filter(col("id") > 1).show()
Using when
for conditional transformations:
df = df.withColumn("category", when(col("id") == 1, "Admin").otherwise("User"))
df.show()
5. Using AND, OR, and LIKE Conditions
Using AND & OR Conditions
Filtering with AND condition:
df.filter((col("id") > 1) & (col("full_name") == "BOB")).show()
Filtering with OR condition:
df.filter((col("id") > 1) | (col("full_name") == "Alice")).show()
Using LIKE for Pattern Matching
Filtering names that start with 'A':
df.filter(col("full_name").like("A%"))
Filtering names containing 'li':
df.filter(col("full_name").like("%li%"))
6. Commonly Used PySpark Functions
1️⃣ lit()
– Literal Value Function
Purpose: Adds a constant value to a column.
from pyspark.sql.functions import lit
df = df.withColumn("Country", lit("India"))
df.show()
2️⃣ col()
– Column Reference Function
Purpose: Refers to a column in a DataFrame by name.
from pyspark.sql.functions import col
df.select(col("name"), col("id")).show()
3️⃣ when()
– Conditional Logic Function
Purpose: Implements IF-ELSE logic for conditional transformations.
from pyspark.sql.functions import when, col
df = df.withColumn("Performance", when(col("id") > 1, "Average").otherwise("Good"))
df.show()
7. Joining DataFrames in PySpark
Creating two sample DataFrames:
df1 = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
df2 = spark.createDataFrame([(1, "HR"), (2, "Finance")], ["id", "department"])
Performing an INNER JOIN:
df_joined = df1.join(df2, on="id", how="inner")
df_joined.show()
Performing a LEFT JOIN:
df_joined = df1.join(df2, on="id", how="left")
df_joined.show()
Performing a RIGHT JOIN:
df_joined = df1.join(df2, on="id", how="right")
df_joined.show()
8. User-Defined Functions (UDFs)
Custom transformations using UDFs:
def custom_greeting(name):
return f"Hello, {name}!"
greet_udf = udf(custom_greeting, StringType())
df = df.withColumn("greeting", greet_udf(col("full_name")))
df.show()
9. Handling Null Values
Filling null values:
df.fillna({"full_name": "Unknown"}).show()
Dropping rows with null values:
df.na.drop().show()
Conclusion
PySpark column operations are essential for efficient data processing. Whether selecting, renaming, filtering, applying UDFs, handling null values, or performing joins, these techniques enhance data analysis workflows.
Pls used these GitHub repositories for better understandings more concept.
https://github.com/DivyaRajurkar/pyspark.git
Tags: #PySpark #BigData #DataEngineering #SparkSQL #DataProcessing
Comments
Post a Comment