Introduction to Structured Streaming in PySpark — Real-Time Data Processing
At NeoMart, data flows continuously:
- Online orders every second
- Clickstream events from the website
- IoT sensor metrics from warehouses
To analyze data in real-time, we use Structured Streaming, PySpark’s high-level, scalable, fault-tolerant stream processing engine.
1. Why Structured Streaming?
- Handles unbounded streams of data
- Provides DataFrame/Dataset API → familiar for batch processing
- Ensures exactly-once semantics for reliable pipelines
- Integrates with multiple sources and sinks
Story: NeoMart wants to calculate real-time sales metrics without waiting for batch jobs.
2. Reading a Stream from Socket
Input: Each line typed into a socket (localhost:9999)
hello world
spark streaming rocks
neo mart analytics
Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col
spark = SparkSession.builder.appName("NeoMartStreaming").getOrCreate()
# Read streaming data from socket
df_stream = spark.readStream.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split incoming lines into words
words = df_stream.select(split(col("value"), " ").alias("words"))
words.printSchema()
Output Schema:
root
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
- Each line from the socket → split into words
df_streamis unbounded → continuously updated
3. Aggregating Streaming Data
Code:
from pyspark.sql.functions import explode
# Explode words for counting
word_counts = words.select(explode(col("words")).alias("word")) \
.groupBy("word").count()
# Stream output to console
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Output Example:
+--------+-----+
|word |count|
+--------+-----+
|hello |1 |
|world |1 |
|spark |1 |
|streaming|1 |
|rocks |1 |
|neo |1 |
|mart |1 |
|analytics|1 |
+--------+-----+
Story: NeoMart dashboard shows top words in real-time clickstreams.
4. Checkpointing
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/tmp/checkpoints") \
.start()
- Stores streaming state → recover after failure
- Required for stateful operations like aggregations
Summary
Structured Streaming allows NeoMart to:
- Ingest real-time data from multiple sources
- Apply transformations and aggregations like batch DataFrames
- Write results to sinks with exactly-once semantics
- Ensure fault tolerance with checkpointing
Next Topic → Reading Streaming Data from Kafka, S3, and Socket