Documentation
ODP 3.3.6.4-1
Release Notes
What is ODP
Installation
Component User guide and Installation Instructions
Upgrade Instructions
Downgrade Instructions
Reference Guide
Security Guide
Troubleshooting Guide
Uninstall ODP
Title
Message
Create new category
What is the title of your new category?
Edit page index title
What is the title of the page index?
Edit category
What is the new title of your category?
Edit link
What is the new title and URL of your link?
Structured Streaming
Summarize Page
Copy Markdown
Open in ChatGPT
Open in Claude
Connect to Cursor
Connect to VS Code
Real-Time Mode (RTM)
Spark 4.1.1 introduces the first official support for Real-Time Mode in Structured Streaming, enabling continuous sub-second latency processing. For stateless workloads, p99 latencies can reach single-digit milliseconds.
Activation — no code changes required, just configuration:
Bash
xxxxxxxxxxquery = df.writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "output-topic") \ .option("checkpointLocation", "/checkpoint") \ .trigger(continuous="1 second") \ # enables Real-Time Mode .start()Spark 4.1.1 RTM support matrix:
| Dimension | Supported in 4.1.1 |
|---|---|
| Query types | Stateless, single-stage |
| Language | Scala |
| Sources | Kafka |
| Sinks | Kafka, Foreach |
| Operators | Stateless ops, Unions, Broadcast Stream-Static Joins |
| Output mode | Update |
| Target latency | Sub-second (p99 single-digit ms for stateless) |
Arbitrary Stateful Processing V2
Enhances Structured Streaming with flexible custom stateful operations. Supports complex event processing, stateful ML models, and a State Data Source for reading key-value pairs from checkpoints — useful for debugging and testing streaming pipelines.
Example:
Bash
xxxxxxxxxxfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import colfrom pyspark.sql.streaming import GroupStateTimeout spark = SparkSession.builder.appName("Stateful Processing V2").getOrCreate() streaming_df = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() def update_state(new_values, state): if state.isTimeout(): return None total = sum(new_values) + (state.get() or 0) state.update(total) return total query = streaming_df \ .groupBy("key") \ .mapGroupsWithState(update_state, GroupStateTimeout.NoTimeout()) \ .writeStream \ .format("console") \ .outputMode("update") \ .start() query.awaitTermination()Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard
Last updated on May 14, 2026
Was this page helpful?
Next to read:
Feature Summarynull
Discard Changes
Do you want to discard your current changes and overwrite with the template?
Archive Synced Block
Message
Create new Template
What is this template's title?
Delete Template
Message