End-to-End Data Flow Pipeline using Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake
๐ End-to-End Data Flow Pipeline using Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake
๐ฌ Personal Note:
๐ I was unwell for a while, which caused a pause in my blogging journey. However, I’m feeling much better now and back on track. From now on, I will be posting blogs consistently. Thank you all for your support! ๐✨
๐ Flow of Data in this Pipeline:
Server (https://randomuser.me/api/)
↓ (REST API)
Apache NiFi (InvokeHTTP Processor)
↓
Kafka (Kafka Brokers - PublishKafkaRecord_2_6 Processor)
↓
Consumer (Kafka Structured Streaming - Spark)
↓
Snowflake (Data Storage)
๐ Project Overview:
This project demonstrates a real-time data streaming pipeline that integrates data collection, processing, and storage using industry-standard tools:
- ๐ Data Collection: Fetched from randomuser.me using Apache NiFi’s InvokeHTTP processor.
- ๐ญ Streaming Data: Pushed into Kafka using PublishKafkaRecord_2_6.
- ⚡ Data Processing: Apache Spark Structured Streaming consumes and transforms the Kafka data stream.
- ๐ฆ Data Storage: The processed data is stored in Snowflake, a cloud-based data warehouse.
๐ Prerequisites:
- ๐ Apache NiFi: Automates and manages data flow between systems.
- ⚡ Apache Spark Structured Streaming: Facilitates real-time data stream processing.
- ☁️ Snowflake: Cloud platform for storing, processing, and analyzing data.
๐ Step-by-Step Guide: Apache NiFi Setup
-
Download & Install NiFi:
- ๐ Download from Apache NiFi Download.
- ๐ Extract the archive and run
nifi
from thebin
directory.
-
Configure NiFi:
- ๐ Access NiFi at NiFi URL.
- ๐ Retrieve login details from
nifi-app.log
in thelogs
folder.
-
Processor Configuration:
- ๐ InvokeHTTP: Add the source URL under HTTP URL.
- ๐ก PublishKafkaRecord_2_6:
- ๐งฉ Record Reader:
JSONTreeReader
- ๐ Record Writer:
JSONRecordSetWriter
- ๐งฉ Record Reader:
-
Start Workflow: ✅ Connect processors and initiate the flow from the NiFi canvas.
๐ฅ️ Kafka Setup & Testing:
- Start Zookeeper:
%KAFKA_HOME%\\bin\\windows\\zookeeper-server-start.bat %KAFKA_HOME%\\config\\zookeeper.properties
- Start Kafka Server:
%KAFKA_HOME%\\bin\\windows\\kafka-server-start.bat %KAFKA_HOME%\\config\\server.properties
- Create Kafka Topic:
%KAFKA_HOME%\\bin\\windows\\kafka-topics.bat --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic <topic_name>
- Start Producer & Consumer:
# Producer:
%KAFKA_HOME%\\bin\\windows\\kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>
# Consumer:
%KAFKA_HOME%\\bin\\windows\\kafka-console-consumer.bat --topic <topic_name> \
--from-beginning --bootstrap-server localhost:9092
๐ก Kafka Consumer with Spark Structured Streaming:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Creating Spark session for streaming data processing
spark = SparkSession.builder.appName("Kafka_Snowflake_Pipeline").master("local[*]").getOrCreate()
# Reading streaming data from Kafka
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "<topic_name>") \
.load()
# Converting Kafka data from binary to string
df = df.selectExpr("CAST(value AS STRING) as json_file")
# Defining schema for JSON data
schema = StructType([
StructField("results", StringType(), True),
StructField("info", StringType(), True)
])
# Parsing JSON data
df1 = df.withColumn("parsed_json", from_json(col("json_file"), schema)).select("parsed_json.*")
# Snowflake connection options
sfOptions = {
"sfURL": "<snowflake_url>",
"sfUser": "<username>",
"sfPassword": "<password>",
"sfDatabase": "<database>",
"sfSchema": "public",
"sfWarehouse": "compute_wh"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
# Function to write streaming data to Snowflake
def foreach_batch_function(df, epoch_id):
df = df.withColumn("ts", current_timestamp())
df.write.mode("append").format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "livekafka").save()
# Streaming write to Snowflake
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
๐ Code Comments Explained:
- ๐ก Spark Session: Initializes the environment for structured streaming.
- ๐ Kafka Stream Reading: Reads live data from Kafka topics.
- ๐งน Data Parsing: Converts Kafka’s binary data to a structured format using JSON schema.
- ๐ฆ Snowflake Integration: Writes processed streaming data directly into Snowflake tables.
๐ฆ Snowflake Verification:
- ✅ Verify that the database (e.g., Divya
DB
) exists in Snowflake. - ๐ท️ Check if the table
LIVEKAFKA
appears after the pipeline runs successfully.
๐ฏ Conclusion:
✨ You have now built a robust end-to-end data pipeline leveraging Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake. This pipeline enables real-time data streaming from an online API to a cloud-based data warehouse.
๐ฌ Stay tuned for more data engineering projects and cloud topics! ๐ฅ️ ๐
Comments
Post a Comment