Title
Create new category
Edit page index title
Edit category
Edit link
PySpark Improvements
Arrow-Native UDFs (@arrow_udf)
Spark 4.1.1 introduces the @arrow_udf decorator for scalar functions that accept and return pyarrow.Array objects directly — bypassing Pandas conversion overhead entirely.
xxxxxxxxxximport pyarrow as paimport pyarrow.compute as pcfrom pyspark.sql.functions import arrow_udffrom pyspark.sql.types import DoubleType @arrow_udf(returnType=DoubleType())def discount_price(prices: pa.Array, discount: pa.Array) -> pa.Array: return pc.subtract(prices, pc.multiply(prices, discount)) df = spark.createDataFrame([(100.0, 0.1), (200.0, 0.2)], ["price", "discount"])df.withColumn("final_price", discount_price("price", "discount")).show()Arrow-Native UDTFs (@arrow_udtf)
The @arrow_udtf decorator enables table functions that process entire pyarrow.RecordBatch objects at once, rather than row-by-row — dramatically faster for splitting and exploding operations.
xxxxxxxxxximport pyarrow as pafrom pyspark.sql.functions import arrow_udtffrom pyspark.sql.types import StructType, StructField, StringType @arrow_udtf(returnType=StructType([StructField("word", StringType())]))class WordSplitter: def eval(self, batch: pa.RecordBatch) -> pa.RecordBatch: words = [] for text in batch.column("text").to_pylist(): words.extend(text.split()) return pa.RecordBatch.from_pydict({"word": words}) spark.udtf.register("word_splitter", WordSplitter)spark.sql("SELECT word_splitter(text) FROM documents").show()Python Worker Logging
Debugging Python UDFs has historically been difficult because logs get lost in executor stdout/stderr. Spark 4.1.1 introduces dedicated UDF log capture, exposable via a built-in table-valued function.
Enable:
xxxxxxxxxxspark.sql.pyspark.worker.logging.enabled=trueExample:
xxxxxxxxxximport loggingfrom pyspark.sql.functions import udffrom pyspark.sql.types import StringType logger = logging.getLogger("my_udf") @udf(returnType=StringType())def classify(value): logger.info(f"Processing value: {value}") if value > 100: return "high" return "low" df = spark.createDataFrame([(50,), (150,)], ["val"])df.withColumn("category", classify("val")).show() # Query captured UDF logsspark.sql("SELECT * FROM python_worker_logs()").show()Python Data Source API with Filter Pushdown
The Python Data Source API (for custom data sources written in Python) gains filter pushdown in Spark 4.1.1 — the data source can now receive query predicates from the optimizer and apply them at the source, reducing data movement.
xxxxxxxxxxfrom pyspark.sql.datasource import DataSource, DataSourceReader class MyDataSource(DataSource): def reader(self, schema): return MyReader(schema, self.options) class MyReader(DataSourceReader): def pushFilters(self, filters): # receive filters like [GreaterThan("id", 100)] self.pushed_filters = filters return filters # return filters handled by the source def read(self, partition): # apply self.pushed_filters when reading data for row in self._fetch_data(self.pushed_filters): yield row spark.dataSource.register(MyDataSource, "my_source")# Filter id > 100 will be pushed to MyReader.pushFilters()df = spark.read.format("my_source").load().filter("id > 100")df.show()Python UDTFs
Python UDTFs produce multiple rows per input row — ideal for exploding, splitting, or generating records from a single input.
xxxxxxxxxxfrom pyspark.sql import SparkSession spark = SparkSession.builder.appName("Python UDTF").getOrCreate() def split_udtf(s): for char in s: yield (char,) spark.udtf.register("split_udtf", split_udtf)spark.sql("SELECT split_udtf(column_name) FROM table_name").show()Pandas 2.x Support
Spark 4.1.1 supports pandas 2.x with Arrow-backed conversions for fast, zero-copy exchange between pandas and Spark DataFrames.
xxxxxxxxxximport pandas as pdfrom pyspark.sql import SparkSession spark = SparkSession.builder \ .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ .appName("Pandas 2.x") \ .getOrCreate() pdf = pd.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})sdf = spark.createDataFrame(pdf)result = sdf.withColumn("upper_value", sdf.value.upper()).toPandas()print(result)PySpark UDF Unified Profiler
Profiles PySpark UDFs for CPU and memory usage to identify bottlenecks.
xxxxxxxxxxfrom pyspark.sql.functions import udffrom pyspark.sql.types import IntegerType spark.conf.set("spark.python.profile", "true")spark.conf.set("spark.python.profile.memory", "true") @udf(returnType=IntegerType())def square(x): return x * x data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]df = spark.createDataFrame(data, ["name", "age"])df.withColumn("age_squared", square(df.age)).show() spark.show_profiles()