Kafka Integrated with Spark Structured Streaming

Kafka Integrated with Spark Structured Streaming

🚀 Apache Kafka

Apache Kafka is an open-source data streaming platform that stores, processes, and analyzes large amounts of real-time data. It is used to build real-time data pipelines and applications that can adapt to data streams.

🔥 Event Streaming

Event streaming is the digital equivalent of the human body's central nervous system. It is the technological foundation for the "always-on" world, where businesses are increasingly software-defined and automated.

Technically, event streaming involves capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and applications. It then stores, processes, and routes these events to different destinations as needed, ensuring continuous data flow.

⚡ Kafka as an Event Streaming Platform

Kafka combines three key capabilities:

  • 📤 Publishing and subscribing to streams of events.
  • 💾 Storing streams of events durably and reliably.
  • ⏳ Processing streams of events in real-time or retrospectively.

Kafka is highly scalable, fault-tolerant, and can be deployed on-premises or in the cloud. Users can self-manage Kafka or opt for fully managed services.

📌 More Information on Kafka

🔹 Examples of Kafka Use Cases

  • 📊 Tracking user activity data to monitor website usage in real-time.
  • 🛒 Feeding an application that tracks product sales in real-time.

🔹 Kafka Installation and Setup

📥 Download and Installation

  1. Open Google and search "Download Kafka index".
  2. Click on Apache Kafka Archive.
  3. Recommended version: Kafka 2.8.1.https://archive.apache.org/dist/kafka/2.8.1/ 
  4. Download the tgz file (kafka_2.12-2.8.1.tgz).
  5. Extract the file (extract twice if needed).
  6. Move the extracted Kafka folder to C:\bigdata\kafka_2.12-2.8.1.
  7. Add this path to System Environment Variables.

🔹 Kafka Lifecycle Components

  • 📍 Source: Data generation point.
  • 📌 Topic: A named event stream.
  • 📨 Messages: Actual event streams.
  • 🏭 Producers: Publish events to topics.
  • 🖥️ Brokers/Servers: Store and transform event streams.
  • 📥 Consumers: Read event streams.
  • 🎯 Sink: Receives processed streams.
  • 🔧 Zookeeper: Manages and synchronizes distributed systems.

🛠️ Running Kafka: A Sample POC

🔹 Start Kafka Services (Windows Commands)

Run each command in a separate command prompt.

1️⃣ Start Zookeeper Service

%KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties

2️⃣ Start Kafka Server

%KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties

3️⃣ Create a Kafka Topic

%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic_name>

%KAFKA_HOME%\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181    ------list down topic name

🔹 Test Kafka Producer and Consumer

🚀 Start Producer(3 commandline)

%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>

📥 Start Consumer(4 new command line)

%KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --topic <topic_name> --from-beginning --bootstrap-server localhost:9092

These commands verify that messages sent by the Producer are received by the Consumer. Please note above 2 commands are just for testing to see if data that we are generating at Producers end being received by Consumer or not. We won't do this in production, either we have to use an IDE like PyCharm, Visual Studio Code or use some kind of tools like Putty.

Reference information for working with Kafka using an IDE:
  • While working on IDE on PyCharm, please refer following website for more information: https://kafka-python.readthedocs.io/en/master/usage.html
  • Use following command to install Kafka in PyCharm
    • pip install kafka-python

🖥️ Kafka Integration with IDEs

🔹 Using Kafka with PyCharm

📌 Install Kafka-Python (terminal uhh have to past below code)

pip install Kafka-python

📥 Run Kafka Consumer in PyCharm

from kafka import KafkaConsumer
consumer = KafkaConsumer('feb17')
for msg in consumer:
    print(msg)

📌 Kafka Consumer Output Example

  • As you have already started the Kafka Consumer code, it will be continuously iterating and receiving the live information that's getting generated from Kafka Producer
  • The consumer iterator returns Consumer Records, which are simple named tuples that expose basic message attributes: topic, partition, offset, key, and value as below: 
ConsumerRecord(topic='feb17', partition=0, offset=7, timestamp=1739765185994, timestamp_type=0, key=None, value=b'ayansh 4-4', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=10, serialized_header_size=-1) ConsumerRecord(topic='feb17', partition=0, offset=8, timestamp=1739765197631, timestamp_type=0, key=None, value=b'mayu5-5', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=7, serialized_header_size=-1) ConsumerRecord(topic='feb17', partition=0, offset=9, timestamp=1739765210515, timestamp_type=0, key=None, value=b'aaku 29-29', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=10, serialized_header_size=-1) 
To display only values:
print(msg.value)


🔍 Troubleshooting Kafka Errors

Issue: Consumer errors in PyCharm.

Solution:

pip install git+https://github.com/dpkp/kafka-python.git

🔗 Kafka Integration with Spark Structured Streaming

By now, you should have a basic understanding of Kafka. Now, let's integrate Kafka with Spark Structured Streaming.

🔹 Download Required Dependency

  • Download Spark-Kafka integration JAR from Maven Repository.
  • https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.2
  • Copy the JAR file to Spark’s jars folder (C:\bigdata\spark-3.1.2-bin-hadoop3.2\jars).

🔹 Kafka Consumer with Spark Structured Streaming

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, current_timestamp

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

# Read Kafka stream
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "feb8") \
  .load()

df = df.selectExpr("CAST(value AS STRING)")

# Process Kafka messages
res = (df.withColumn("name", split(col("value"),',')[0])
    .withColumn("age", split(col("value"),',')[1]))

def for_each_batch_function(df, epoch_id):
  df = df.withColumn("ts", current_timestamp())
  df.write.mode("append").format("jdbc").option("url", "jdbc:mysql://<your-db-url>") \
    .option("user", "admin").option("password", "xxxxxx").option("dbtable", "livekafkafeb8").save()
  df.show()

res.writeStream.foreachBatch(for_each_batch_function).start().awaitTermination()

Explanation of the Code

  1. Import Required Libraries

    • Spark Session: The main entry point for Spark applications.
    • split (), col (), current_timestamp(): Functions for data transformation.
  2. Create a Spark Session

    • Sets up the Spark environment to run the streaming application.
    • Runs in local mode using all available CPU cores.
  3. Read Kafka Stream

    • Reads data from a Kafka topic named "feb8".
    • Connects to Kafka running on localhost:9092.
    • Uses Spark's built-in Kafka connector to continuously receive data.
  4. Convert Kafka Messages to String

    • Kafka messages are received in binary format.
    • Converts the value column (Kafka message) to a readable string.
  5. Process Kafka Messages

    • Assumes Kafka messages are in "name,age" format.
    • Splits the value column into two separate columns:
      • name: The first value before the comma.
      • age: The second value after the comma.
  6. Define the Batch Processing Function

    • Adds a timestamp column to track when the data is processed.
    • Writes the processed data into a MySQL table (livekafkafeb8).
    • Uses append mode to continuously insert new data without overwriting old records.
    • Prints the current batch data to the console
  7. Start Streaming and Write to MySQL

    • Uses foreachBatch() to apply the batch processing function to each micro-batch.
    • Starts the streaming job and keeps it running indefinitely.

This code enables real-time processing of Kafka messages and stores them in a MySQL database using PySpark Structured Streaming.

📌 Key Points to Remember

  • ✅ Kafka is a powerful event streaming platform for real-time data processing.
  • ✅ Kafka topics store streams of events.
  • ✅ Spark Structured Streaming integrates with Kafka to process real-time data.
  • ✅ Ensure Zookeeper and Kafka Server are running before executing any commands.
  • ✅ Test producer and consumer to validate data flow.
  • ✅ Use appropriate dependencies when integrating Kafka with Spark.

Stay tuned for more details on Spark Structured Streaming with Kafka Producer! 🚀

Comments

Popular posts from this blog

AWS Athena, AWS Lambda, AWS Glue, and Amazon S3 – Detailed Explanation

Azure Data Factory: Copying Data from ADLS to MSSQL