🌐Azure, end-to-end data pipeline into detailed baby steps
I'll break down the entire process of creating this end-to-end data pipeline into detailed baby steps so you can follow along without any confusion. The pipeline will use:
- Apache Nifi for ingesting data from a REST API
- Apache Kafka for data streaming
- Apache Spark Structured Streaming for processing data
- Snowflake for storing processed data
🔧 Step 1: Set Up Apache NiFi
Purpose: To fetch data from the online server and send it to Kafka.
✅ 1.1 Download and Install NiFi
- Go to NiFi Download Page
- Download
nifi-1.28.1-bin.zip
- Extract the ZIP file to:
C:\bigdata
- Navigate to:
C:\bigdata\nifi-1.28.1\bin
- Run NiFi:
- Double-click the
run-nifi.bat
(Windows) or use the command line:nifi.bat start
- Double-click the
- Wait for NiFi to initialize.
✅ 1.2 Access and Login to NiFi UI
- Open your browser and go to:
https://localhost:8443/nifi/
- Locate nifi-app.log for login credentials:
C:\bigdata\nifi-1.28.1\logs\nifi-app.log
- Login using the credentials found in the log.
🔗 Step 2: Configure Apache NiFi Processors
Purpose: To fetch data from randomuser.me
API and send it to Kafka.
✅ 2.1 Configure InvokeHTTP
Processor
- Drag InvokeHTTP processor onto NiFi canvas.
- Right-click → Configure → Properties tab.
- Set the HTTP URL:
https://randomuser.me/api/
- Leave other settings as default unless specified.
✅ 2.2 Configure PublishKafkaRecord_2_6
Processor
- Drag PublishKafkaRecord_2_6 onto NiFi canvas.
- Right-click → Configure → Properties:
- Record Reader:
JsonTreeReader
- Record Writer:
JsonRecordSetWriter
- Kafka Topic Name:
feb11
(or your chosen topic) - Kafka Bootstrap Servers:
localhost:9092
- Record Reader:
- Connect InvokeHTTP → PublishKafkaRecord_2_6 (drag arrow from one processor to the other).
✅ 2.3 Start the Flow
- Right-click on the NiFi canvas → Start.
⚡ Step 3: Set Up Kafka and Zookeeper
Purpose: To stream data continuously.
✅ 3.1 Start Zookeeper
- In the command prompt:
%KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
✅ 3.2 Start Kafka Server
- Open a new command prompt:
%KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties
✅ 3.3 Create Kafka Topic
- Run this command (replace
<topic_name>
with your topic name):%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic feb11
✅ 3.4 Start Kafka Producer
- To produce data on the topic:
%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic feb11
✅ 3.5 Start Kafka Consumer
- To consume data:
%KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --topic feb11 --from-beginning --bootstrap-server localhost:9092
💡 Step 4: Set Up Snowflake
Purpose: To store processed data.
✅ 4.1 Create Snowflake Account
- Go to Snowflake Sign-Up and create a free account.
✅ 4.2 Set Up Snowflake Objects
- After login, run these SQL commands:
CREATE DATABASE arundb; USE DATABASE arundb; CREATE SCHEMA public; CREATE WAREHOUSE compute_wh WITH WAREHOUSE_SIZE = 'XSMALL' WAREHOUSE_TYPE = 'STANDARD';
🔥 Step 5: Kafka Consumer & Spark Streaming Code
Purpose: To consume Kafka data, process it, and push it to Snowflake.
✅ 5.1 Set Up PyCharm / VSCode
- Install PyCharm or VSCode.
- Create a new Python project.
- Install required packages:
pip install pyspark snowflake-sqlalchemy
✅ 5.2 Add the Python Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder.appName("KafkaSnowflakePipeline").master("local[*]").getOrCreate()
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "feb11") \
.load()
df = df.selectExpr("CAST(value AS STRING) as json_file")
schema = StructType([
StructField("results", StringType(), True),
StructField("info", StringType(), True)
])
parsed_df = df.withColumn("parsed_json", from_json(col("json_file"), schema)).select("parsed_json.*")
sfOptions = {
"sfURL": "hfeasqz-ys73889.snowflakecomputing.com",
"sfUser": "divyarajurkar",
"sfPassword": "divya@9704117111",
"sfDatabase": "divyadb",
"sfSchema": "public",
"sfWarehouse": "compute_wh"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
def foreach_batch_function(df, epoch_id):
df = df.withColumn("ts", current_timestamp())
df.write.mode("append").format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "livekafka").save()
parsed_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
🚀 Step 6: Run the Pipeline
- Start all services (NiFi, Kafka, Snowflake, PyCharm script).
- The data from
randomuser.me
should now stream to Snowflake's LIVEKAFKA table. - Run in Snowflake:
SELECT * FROM livekafka;
🎯 Step 7: Validate & Monitor
- Confirm if records are inserted in LIVEKAFKA.
- Check Spark logs for streaming status.
- Ensure Kafka topic has live data.
💬 Let me know if you face any issues at any step! I can help debug and explain further.
Comments
Post a Comment