🔥Smart Recharge Tracking: Calculating Expiry Dates with PySpark

Smart Recharge Tracking: Calculating Expiry Dates with PySpark

When working with telecom or subscription-based datasets, a common requirement is to calculate the expiry date of a recharge or subscription. In this blog post, we will walk through a simple PySpark solution to add an Expiry_Date column by computing it from the Recharge_Date and Remaining_Days columns.

Problem Statement

We have a dataset that includes the following fields:

  • Recharge_Id: A unique identifier for each recharge
  • Recharge_Date: The date when the recharge was done
  • Remaining_Days: The number of validity days left
  • Validity: Status (e.g., online)

Our goal is to compute the Expiry_Date, which is calculated as:

Expiry_Date = Recharge_Date + Remaining_Days

Sample Input Data

Recharge_Id Recharge_Date Remaining_Days Validity
123 2020-05-11 2 online
124 2020-06-12 67 online
125 2020-07-13 89 online
126 2020-08-14 78 online
127 2020-09-16 20 online

Expected Output

Recharge_Id Recharge_Date Remaining_Days Validity Expiry_Date
123 2020-05-11 2 online 2020-05-13
124 2020-06-12 67 online 2020-08-18
125 2020-07-13 89 online 2020-10-10
126 2020-08-14 78 online 2020-10-31
127 2020-09-16 20 online 2020-10-06

Solution Using PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_add

# Initialize Spark session
spark = SparkSession.builder.appName("ExpiryDateCalculation").getOrCreate()

# Sample data
data = [
    (123, "2020-05-11", 2, "online"),
    (124, "2020-06-12", 67, "online"),
    (125, "2020-07-13", 89, "online"),
    (126, "2020-08-14", 78, "online"),
    (127, "2020-09-16", 20, "online"),
]

# Define schema and create DataFrame
columns = ["Recharge_Id", "Recharge_Date", "Remaining_Days", "Validity"]
df = spark.createDataFrame(data, columns)

# Convert Recharge_Date to date type and Remaining_Days to int
df = (
    df.withColumn("Recharge_Date", to_date(col("Recharge_Date"), "yyyy-MM-dd"))
      .withColumn("Remaining_Days", col("Remaining_Days").cast("int"))  # Cast to int
      .withColumn("Expiry_Date", date_add(col("Recharge_Date"), col("Remaining_Days")))
)

# Show the result
df.show()

Explanation

to_date() Function

The to_date() function in PySpark converts a string representation of a date into a proper DateType. This ensures that all date operations work correctly. In our case, we use:

df = df.withColumn("Recharge_Date", to_date(col("Recharge_Date"), "yyyy-MM-dd"))

This converts Recharge_Date from a string to a date format.

date_add() Function

The date_add() function adds a specified number of days to a given date column. It is useful for calculating future or past dates. In our case:

df = df.withColumn("Expiry_Date", date_add(col("Recharge_Date"), col("Remaining_Days")))

This adds Remaining_Days to Recharge_Date, computing the Expiry_Date.

Expected Output in PySpark

+------------+-------------+--------------+--------+------------+
|Recharge_Id |Recharge_Date|Remaining_Days|Validity|Expiry_Date |
+------------+-------------+--------------+--------+------------+
|123         |2020-05-11   |2             |online  |2020-05-13  |
|124         |2020-06-12   |67            |online  |2020-08-18  |
|125         |2020-07-13   |89            |online  |2020-10-10  |
|126         |2020-08-14   |78            |online  |2020-10-31  |
|127         |2020-09-16   |20            |online  |2020-10-06  |
+------------+-------------+--------------+--------+------------+

Conclusion

This approach provides a straightforward way to calculate expiry dates in PySpark. The to_date() function ensures date format consistency, while the date_add() function efficiently performs date calculations. This method can be further extended for real-world use cases, such as automating recharge expiry notifications or analyzing subscription trends.

Feel free to try this solution and modify it based on your specific dataset requirements! 🚀

Comments

Popular posts from this blog

🔥Apache Spark Architecture with RDD & DAG

🌐Filtering and Copying Files Dynamically in Azure Data Factory (ADF)

🌐End-to-End ETL Pipeline: MS SQL to MS SQL Using Azure Databricks