🌐Azure, end-to-end data pipeline into detailed baby steps

 I'll break down the entire process of creating this end-to-end data pipeline into detailed baby steps so you can follow along without any confusion. The pipeline will use:

  • Apache Nifi for ingesting data from a REST API
  • Apache Kafka for data streaming
  • Apache Spark Structured Streaming for processing data
  • Snowflake for storing processed data

🔧 Step 1: Set Up Apache NiFi

Purpose: To fetch data from the online server and send it to Kafka.

1.1 Download and Install NiFi

  1. Go to NiFi Download Page
  2. Download nifi-1.28.1-bin.zip
  3. Extract the ZIP file to:
    C:\bigdata
    
  4. Navigate to:
    C:\bigdata\nifi-1.28.1\bin
    
  5. Run NiFi:
    • Double-click the run-nifi.bat (Windows) or use the command line:
      nifi.bat start
      
  6. Wait for NiFi to initialize.

1.2 Access and Login to NiFi UI

  1. Open your browser and go to:
    https://localhost:8443/nifi/
    
  2. Locate nifi-app.log for login credentials:
    C:\bigdata\nifi-1.28.1\logs\nifi-app.log
    
  3. Login using the credentials found in the log.

🔗 Step 2: Configure Apache NiFi Processors

Purpose: To fetch data from randomuser.me API and send it to Kafka.

2.1 Configure InvokeHTTP Processor

  1. Drag InvokeHTTP processor onto NiFi canvas.
  2. Right-click → ConfigureProperties tab.
  3. Set the HTTP URL:
    https://randomuser.me/api/
    
  4. Leave other settings as default unless specified.

2.2 Configure PublishKafkaRecord_2_6 Processor

  1. Drag PublishKafkaRecord_2_6 onto NiFi canvas.
  2. Right-click → ConfigureProperties:
    • Record Reader: JsonTreeReader
    • Record Writer: JsonRecordSetWriter
    • Kafka Topic Name: feb11 (or your chosen topic)
    • Kafka Bootstrap Servers:
      localhost:9092
      
  3. Connect InvokeHTTPPublishKafkaRecord_2_6 (drag arrow from one processor to the other).

2.3 Start the Flow

  1. Right-click on the NiFi canvas → Start.

Step 3: Set Up Kafka and Zookeeper

Purpose: To stream data continuously.

3.1 Start Zookeeper

  1. In the command prompt:
    %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
    

3.2 Start Kafka Server

  1. Open a new command prompt:
    %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties
    

3.3 Create Kafka Topic

  1. Run this command (replace <topic_name> with your topic name):
    %KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic feb11
    

3.4 Start Kafka Producer

  1. To produce data on the topic:
    %KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic feb11
    

3.5 Start Kafka Consumer

  1. To consume data:
    %KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --topic feb11 --from-beginning --bootstrap-server localhost:9092
    

💡 Step 4: Set Up Snowflake

Purpose: To store processed data.

4.1 Create Snowflake Account

  1. Go to Snowflake Sign-Up and create a free account.

4.2 Set Up Snowflake Objects

  1. After login, run these SQL commands:
    CREATE DATABASE arundb;
    USE DATABASE arundb;
    CREATE SCHEMA public;
    CREATE WAREHOUSE compute_wh WITH WAREHOUSE_SIZE = 'XSMALL' WAREHOUSE_TYPE = 'STANDARD';
    

🔥 Step 5: Kafka Consumer & Spark Streaming Code

Purpose: To consume Kafka data, process it, and push it to Snowflake.

5.1 Set Up PyCharm / VSCode

  1. Install PyCharm or VSCode.
  2. Create a new Python project.
  3. Install required packages:
    pip install pyspark snowflake-sqlalchemy
    

5.2 Add the Python Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder.appName("KafkaSnowflakePipeline").master("local[*]").getOrCreate()

df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "feb11") \
  .load()

df = df.selectExpr("CAST(value AS STRING) as json_file")

schema = StructType([
    StructField("results", StringType(), True),
    StructField("info", StringType(), True)
])

parsed_df = df.withColumn("parsed_json", from_json(col("json_file"), schema)).select("parsed_json.*")

sfOptions = {
  "sfURL": "hfeasqz-ys73889.snowflakecomputing.com",
  "sfUser": "divyarajurkar",
  "sfPassword": "divya@9704117111",
  "sfDatabase": "divyadb",
  "sfSchema": "public",
  "sfWarehouse": "compute_wh"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

def foreach_batch_function(df, epoch_id):
    df = df.withColumn("ts", current_timestamp())
    df.write.mode("append").format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "livekafka").save()

parsed_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

🚀 Step 6: Run the Pipeline

  1. Start all services (NiFi, Kafka, Snowflake, PyCharm script).
  2. The data from randomuser.me should now stream to Snowflake's LIVEKAFKA table.
  3. Run in Snowflake:
    SELECT * FROM livekafka;
    

🎯 Step 7: Validate & Monitor

  • Confirm if records are inserted in LIVEKAFKA.
  • Check Spark logs for streaming status.
  • Ensure Kafka topic has live data.

💬 Let me know if you face any issues at any step! I can help debug and explain further.

Comments

Popular posts from this blog

🔥Apache Spark Architecture with RDD & DAG

🌐Filtering and Copying Files Dynamically in Azure Data Factory (ADF)

🌐End-to-End ETL Pipeline: MS SQL to MS SQL Using Azure Databricks