End-to-End Data Flow Pipeline using Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake

๐Ÿš€ End-to-End Data Flow Pipeline using Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake


๐Ÿ’ฌ Personal Note:

๐ŸŒŸ I was unwell for a while, which caused a pause in my blogging journey. However, I’m feeling much better now and back on track. From now on, I will be posting blogs consistently. Thank you all for your support! ๐Ÿ™✨


๐Ÿ”„ Flow of Data in this Pipeline:

Server (https://randomuser.me/api/) 
    ↓ (REST API)
Apache NiFi (InvokeHTTP Processor) 
    ↓
Kafka (Kafka Brokers - PublishKafkaRecord_2_6 Processor)
    ↓
Consumer (Kafka Structured Streaming - Spark) 
    ↓
Snowflake (Data Storage)

๐ŸŒ Project Overview:

This project demonstrates a real-time data streaming pipeline that integrates data collection, processing, and storage using industry-standard tools:

  • ๐ŸŒ Data Collection: Fetched from randomuser.me using Apache NiFi’s InvokeHTTP processor.
  • ๐Ÿญ Streaming Data: Pushed into Kafka using PublishKafkaRecord_2_6.
  • Data Processing: Apache Spark Structured Streaming consumes and transforms the Kafka data stream.
  • ๐Ÿฆ Data Storage: The processed data is stored in Snowflake, a cloud-based data warehouse.

๐Ÿ“š Prerequisites:

  • ๐Ÿ”— Apache NiFi: Automates and manages data flow between systems.
  • Apache Spark Structured Streaming: Facilitates real-time data stream processing.
  • ☁️ Snowflake: Cloud platform for storing, processing, and analyzing data.

๐Ÿƒ Step-by-Step Guide: Apache NiFi Setup

  1. Download & Install NiFi:

    • ๐Ÿ”— Download from Apache NiFi Download.
    • ๐Ÿ“‚ Extract the archive and run nifi from the bin directory.
  2. Configure NiFi:

    • ๐ŸŒ Access NiFi at NiFi URL.
    • ๐Ÿ”’ Retrieve login details from nifi-app.log in the logs folder.
  3. Processor Configuration:

    • ๐Ÿ”— InvokeHTTP: Add the source URL under HTTP URL.
    • ๐Ÿ“ก PublishKafkaRecord_2_6:
      • ๐Ÿงฉ Record Reader: JSONTreeReader
      • ๐Ÿ“ Record Writer: JSONRecordSetWriter
  4. Start Workflow: ✅ Connect processors and initiate the flow from the NiFi canvas.


๐Ÿ–ฅ️ Kafka Setup & Testing:

  1. Start Zookeeper:
%KAFKA_HOME%\\bin\\windows\\zookeeper-server-start.bat %KAFKA_HOME%\\config\\zookeeper.properties
  1. Start Kafka Server:
%KAFKA_HOME%\\bin\\windows\\kafka-server-start.bat %KAFKA_HOME%\\config\\server.properties
  1. Create Kafka Topic:
%KAFKA_HOME%\\bin\\windows\\kafka-topics.bat --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic <topic_name>
  1. Start Producer & Consumer:
# Producer:
%KAFKA_HOME%\\bin\\windows\\kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>

# Consumer:
%KAFKA_HOME%\\bin\\windows\\kafka-console-consumer.bat --topic <topic_name> \
--from-beginning --bootstrap-server localhost:9092

๐Ÿ’ก Kafka Consumer with Spark Structured Streaming:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Creating Spark session for streaming data processing
spark = SparkSession.builder.appName("Kafka_Snowflake_Pipeline").master("local[*]").getOrCreate()

# Reading streaming data from Kafka
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "<topic_name>") \
  .load()

# Converting Kafka data from binary to string
df = df.selectExpr("CAST(value AS STRING) as json_file")

# Defining schema for JSON data
schema = StructType([
    StructField("results", StringType(), True),
    StructField("info", StringType(), True)
])

# Parsing JSON data
df1 = df.withColumn("parsed_json", from_json(col("json_file"), schema)).select("parsed_json.*")

# Snowflake connection options
sfOptions = {
  "sfURL": "<snowflake_url>",
  "sfUser": "<username>",
  "sfPassword": "<password>",
  "sfDatabase": "<database>",
  "sfSchema": "public",
  "sfWarehouse": "compute_wh"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

# Function to write streaming data to Snowflake
def foreach_batch_function(df, epoch_id):
    df = df.withColumn("ts", current_timestamp())
    df.write.mode("append").format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "livekafka").save()

# Streaming write to Snowflake
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

๐Ÿ” Code Comments Explained:

  • ๐Ÿ’ก Spark Session: Initializes the environment for structured streaming.
  • ๐Ÿ”„ Kafka Stream Reading: Reads live data from Kafka topics.
  • ๐Ÿงน Data Parsing: Converts Kafka’s binary data to a structured format using JSON schema.
  • ๐Ÿฆ Snowflake Integration: Writes processed streaming data directly into Snowflake tables.

๐Ÿฆ Snowflake Verification:

  • ✅ Verify that the database (e.g., DivyaDB) exists in Snowflake.
  • ๐Ÿท️ Check if the table LIVEKAFKA appears after the pipeline runs successfully.

๐ŸŽฏ Conclusion:

✨ You have now built a robust end-to-end data pipeline leveraging Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake. This pipeline enables real-time data streaming from an online API to a cloud-based data warehouse.

๐Ÿ’ฌ Stay tuned for more data engineering projects and cloud topics! ๐ŸŒฅ️ ๐Ÿš€

Comments

Popular posts from this blog

AWS Athena, AWS Lambda, AWS Glue, and Amazon S3 – Detailed Explanation

Kafka Integrated with Spark Structured Streaming

Azure Data Factory: Copying Data from ADLS to MSSQL