
Answer-first summary for fast verification
Answer: `return spark.readStream.table("bronze")`
The most efficient and idiomatic way to handle incremental data processing in Databricks is via **Structured Streaming**. * **Why Option B is correct:** When you use `spark.readStream.table("table_name")` on a Delta table, Spark identifies the current snapshot and then incrementally processes only the new rows committed after that snapshot. This approach automatically handles progress tracking via checkpoints (when used with `.writeStream`), ensuring that each nightly run only picks up records it hasn't seen before. * **Why other options are incorrect:** * **Option C:** The `.load()` method in the streaming API expects a file path. Providing a table name will result in an error; `.table()` is the correct method for registered tables. * **Option D:** `current_timestamp()` is evaluated only once when the query is planned. It does not provide incremental semantics and will likely return zero rows. * **Option A:** Performing a batch read of the **Change Data Feed (CDF)** requires specifying a `startingVersion` or `startingTimestamp`. Without these parameters, a batch CDF read will fail. Furthermore, Structured Streaming is simpler as it manages offsets automatically. * **Option E:** Manual partition filtering is brittle and error-prone. It cannot easily account for late-arriving data in older partitions or ensure that every record is processed exactly once without complex manual state management.
Author: LeetQuiz Editorial Team
Ultimate access to all questions.
No comments yet.
A Spark ETL pipeline processes data nightly. One stage requires identifying new records within a Delta Lake table named bronze that have not yet been processed downstream. This table is partitioned by year, month, and day.
Which of the following designs for the new_records function effectively returns a Spark DataFrame containing only the unprocessed data from the bronze table?
A
return spark.read.option("readChangeFeed", "true").table("bronze")
B
return spark.readStream.table("bronze")
C
return spark.readStream.load("bronze")
D
return spark.read.table("bronze").filter(col("ingest_time") == current_timestamp())
E
return spark.read.table("bronze").filter(col("source_file") == f"/mnt/daily_batch/{year}/{month}/{day}")