This is one of the important concepts where we will see how an end-to-end pipeline will work in AWS. π
We are going to see how to continuously monitor a common source like S3/Redshift from Lambda (using Boto3 code) and initiate a trigger to start some Glue job (Spark code) and perform some action. π‘
Let's assume that:
1️⃣ AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file gets uploaded in an AWS S3 bucket.
2️⃣ Lambda should pass this file information as well to Glue so that the Glue job will perform some transformation and upload that transformed data into AWS RDS (MySQL). π ️
Understanding the Flow Chart π
1️⃣ A client uploads files (.csv/.json) into an AWS storage location (e.g., S3).
2️⃣ Once uploaded, a trigger is initiated in AWS Lambda using Boto3 code.
3️⃣ AWS Glue (ETL Tool) starts a PySpark job to process this file, perform transformations, and load it into AWS RDS (MySQL).
4️⃣ When a file is uploaded into S3, Lambda triggers Glue to initiate a PySpark job to process and store the data in S3/RDS.
5️⃣ Finally, the sales team can use Power BI to visualize the data (not covered here, just for reference). π
AWS Lambda ⚡
AWS Lambda is a serverless compute service that runs code in response to events. It automatically manages the compute resources, so you don't need to provision or manage servers.
Steps to Create a Lambda Function π️
1️⃣ Navigate to the Lambda page in AWS.
2️⃣ Click Create function (top right corner).
3️⃣ Enter a function name.
4️⃣ Choose Python (3.9 recommended) as the runtime.
5️⃣ Under Execution role, select Use an existing role (ensure it has permissions for S3/Glue).
6️⃣ Click Create function.
AWS Lambda Configuration ⚙️
πΉ Default timeout: Lambda times out after 3 seconds (can be increased under the Configuration tab). πΉ Boto3 storage: Max 75MB (often used for alerting in production; can be extended using Step Functions with Lambda or using Glue directly).
Adding a Trigger πΉ
1️⃣ Click +Add Trigger.
2️⃣ Under Trigger Configuration, select S3.
3️⃣ Choose an S3 bucket.
4️⃣ Under Event Types, select file upload event.
5️⃣ Set the Prefix to the folder landingzone π.
6️⃣ Set the Suffix to .csv (triggers only for CSV files).
7️⃣ Click I acknowledge and Add.
✅ Trigger successfully added! π
Lambda Code to Trigger Glue π
import json
import boto3
glue_client = boto3.client('glue')
def lambda_handler(event, context):
for record in event['Records']:
bucket_name = record['s3']['bucket']['name']
object_key = record['s3']['object']['key']
print(f"New file added: s3://{bucket_name}/{object_key}")
# Start AWS Glue job with arguments
response = glue_client.start_job_run(
JobName='firstjob', # Replace with your Glue job name
Arguments={
'--s3p': bucket_name,
'--fn': object_key
}
)
AWS Glue: ETL Service π️
AWS Glue is a serverless data integration service that helps discover, prepare, and integrate data from multiple sources. π ️
Steps to Create a Glue Job π
1️⃣ Go to AWS Glue in AWS.
2️⃣ Under Data Integration and ETL, click ETL jobs.
3️⃣ Click Script Editor (right side).
4️⃣ Select Spark, then Create Script.
5️⃣ Name your Glue Job.
6️⃣ Save and see it listed on the home screen.
Glue Script to Process CSV Files π
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import re
import os
# Receive arguments from Lambda
args = getResolvedOptions(sys.argv, ['s3p','fn'])
bucket = args['s3p']
ob_name = args['fn']
input = f"s3://{bucket}/{ob_name}"
print(input)
# Extract table name from filename
tab = os.path.splitext(os.path.basename(input))[0]
tab = re.sub(r'[^a-zA-Z0-9]', '', tab)
# Ensure table name doesn't start with a digit
if tab[0].isdigit():
tab = f"a_{tab}"
print("Table name:", tab)
# Initialize Spark & Glue Context
from pyspark.sql.functions import *
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Read CSV file
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(input)
# Format column names
df = df.toDF(*[re.sub('[^A-Za-z0-9]', '', c) for c in df.columns])
# Add transformation column
df = df.withColumn("today", current_date())
df.show()
# Define MySQL (AWS RDS) connection details
host = "jdbc:mysql://diwakarmysql.cf08m8g4ysxd.ap-south-1.rds.amazonaws.com:3306/mysqldb"
df.write.mode("append").format("jdbc").option("url", host).option("user", "admin").option("password", "Mypassword.1").option("dbtable", tab).save()
Final Execution Flow ✅
1️⃣ Upload a .csv file in AWS S3 under the landingzone π.
2️⃣ Lambda detects the upload and triggers Boto3 code.
3️⃣ Lambda passes file info to Glue and starts the ETL process.
4️⃣ Glue processes the file and loads the transformed data into AWS RDS (MySQL).
5️⃣ Job succeeds! π― You can verify the table in SQL Workbench.
Let's explore more AWS topics in future blogs! π Have a great day! π
Comments
Post a Comment