Kafka Integrated with Spark Structured Streaming
Kafka Integrated with Spark Structured Streaming
🚀 Apache Kafka
Apache Kafka is an open-source data streaming platform that stores, processes, and analyzes large amounts of real-time data. It is used to build real-time data pipelines and applications that can adapt to data streams.
🔥 Event Streaming
Event streaming is the digital equivalent of the human body's central nervous system. It is the technological foundation for the "always-on" world, where businesses are increasingly software-defined and automated.
Technically, event streaming involves capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and applications. It then stores, processes, and routes these events to different destinations as needed, ensuring continuous data flow.
⚡ Kafka as an Event Streaming Platform
Kafka combines three key capabilities:
- 📤 Publishing and subscribing to streams of events.
- 💾 Storing streams of events durably and reliably.
- ⏳ Processing streams of events in real-time or retrospectively.
Kafka is highly scalable, fault-tolerant, and can be deployed on-premises or in the cloud. Users can self-manage Kafka or opt for fully managed services.
📌 More Information on Kafka
🔹 Examples of Kafka Use Cases
- 📊 Tracking user activity data to monitor website usage in real-time.
- 🛒 Feeding an application that tracks product sales in real-time.
🔹 Kafka Installation and Setup
📥 Download and Installation
- Open Google and search "Download Kafka index".
- Click on Apache Kafka Archive.
- Recommended version: Kafka 2.8.1.https://archive.apache.org/dist/kafka/2.8.1/
- Download the tgz file (kafka_2.12-2.8.1.tgz).
- Extract the file (extract twice if needed).
- Move the extracted Kafka folder to
C:\bigdata\kafka_2.12-2.8.1
. - Add this path to System Environment Variables.
🔹 Kafka Lifecycle Components
- 📍 Source: Data generation point.
- 📌 Topic: A named event stream.
- 📨 Messages: Actual event streams.
- 🏭 Producers: Publish events to topics.
- 🖥️ Brokers/Servers: Store and transform event streams.
- 📥 Consumers: Read event streams.
- 🎯 Sink: Receives processed streams.
- 🔧 Zookeeper: Manages and synchronizes distributed systems.
🛠️ Running Kafka: A Sample POC
🔹 Start Kafka Services (Windows Commands)
Run each command in a separate command prompt.
1️⃣ Start Zookeeper Service
%KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
2️⃣ Start Kafka Server
%KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties
3️⃣ Create a Kafka Topic
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic_name>
%KAFKA_HOME%\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 ------list down topic name
🔹 Test Kafka Producer and Consumer
🚀 Start Producer(3 commandline)
%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>
📥 Start Consumer(4 new command line)
%KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --topic <topic_name> --from-beginning --bootstrap-server localhost:9092
These commands verify that messages sent by the Producer are received by the Consumer. Please note above 2 commands are just for testing to see if data that we are generating at Producers end being received by Consumer or not. We won't do this in production, either we have to use an IDE like PyCharm, Visual Studio Code or use some kind of tools like Putty.
- While working on IDE on PyCharm, please refer following website for more information: https://kafka-python.readthedocs.io/en/master/usage.html
- Use following command to install Kafka in PyCharm
- pip install kafka-python
🖥️ Kafka Integration with IDEs
🔹 Using Kafka with PyCharm
📌 Install Kafka-Python (terminal uhh have to past below code)
pip install Kafka-python
📥 Run Kafka Consumer in PyCharm
from kafka import KafkaConsumer
consumer = KafkaConsumer('feb17')
for msg in consumer:
print(msg)
📌 Kafka Consumer Output Example
- As you have already started the Kafka Consumer code, it will be continuously iterating and receiving the live information that's getting generated from Kafka Producer
- The consumer iterator returns Consumer Records, which are simple named tuples that expose basic message attributes: topic, partition, offset, key, and value as below:
print(msg.value)
🔍 Troubleshooting Kafka Errors
Issue: Consumer errors in PyCharm.
Solution:
pip install git+https://github.com/dpkp/kafka-python.git
🔗 Kafka Integration with Spark Structured Streaming
By now, you should have a basic understanding of Kafka. Now, let's integrate Kafka with Spark Structured Streaming.
🔹 Download Required Dependency
- Download Spark-Kafka integration JAR from Maven Repository.
- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.2
- Copy the JAR file to Spark’s jars folder (
C:\bigdata\spark-3.1.2-bin-hadoop3.2\jars
).
🔹 Kafka Consumer with Spark Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, current_timestamp
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
# Read Kafka stream
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "feb8") \
.load()
df = df.selectExpr("CAST(value AS STRING)")
# Process Kafka messages
res = (df.withColumn("name", split(col("value"),',')[0])
.withColumn("age", split(col("value"),',')[1]))
def for_each_batch_function(df, epoch_id):
df = df.withColumn("ts", current_timestamp())
df.write.mode("append").format("jdbc").option("url", "jdbc:mysql://<your-db-url>") \
.option("user", "admin").option("password", "xxxxxx").option("dbtable", "livekafkafeb8").save()
df.show()
res.writeStream.foreachBatch(for_each_batch_function).start().awaitTermination()
Explanation of the Code
Import Required Libraries
- Spark Session: The main entry point for Spark applications.
split ()
, col ()
, current_timestamp()
: Functions for data transformation.
Create a Spark Session
- Sets up the Spark environment to run the streaming application.
- Runs in local mode using all available CPU cores.
Read Kafka Stream
- Reads data from a Kafka topic named "feb8".
- Connects to Kafka running on localhost:9092.
- Uses Spark's built-in Kafka connector to continuously receive data.
Convert Kafka Messages to String
- Kafka messages are received in binary format.
- Converts the
value
column (Kafka message) to a readable string.
Process Kafka Messages
- Assumes Kafka messages are in "name,age" format.
- Splits the
value
column into two separate columns:
name
: The first value before the comma.
age
: The second value after the comma.
Define the Batch Processing Function
- Adds a timestamp column to track when the data is processed.
- Writes the processed data into a MySQL table (livekafkafeb8).
- Uses append mode to continuously insert new data without overwriting old records.
- Prints the current batch data to the console
Start Streaming and Write to MySQL
- Uses foreachBatch() to apply the batch processing function to each micro-batch.
- Starts the streaming job and keeps it running indefinitely.
This code enables real-time processing of Kafka messages and stores them in a MySQL database using PySpark Structured Streaming.
📌 Key Points to Remember
- ✅ Kafka is a powerful event streaming platform for real-time data processing.
- ✅ Kafka topics store streams of events.
- ✅ Spark Structured Streaming integrates with Kafka to process real-time data.
- ✅ Ensure Zookeeper and Kafka Server are running before executing any commands.
- ✅ Test producer and consumer to validate data flow.
- ✅ Use appropriate dependencies when integrating Kafka with Spark.
Stay tuned for more details on Spark Structured Streaming with Kafka Producer! 🚀
Comments
Post a Comment