
Explanation:
The most efficient and idiomatic way to handle incremental data processing in Databricks is via Structured Streaming.
spark.readStream.table("table_name") on a Delta table, Spark identifies the current snapshot and then incrementally processes only the new rows committed after that snapshot. This approach automatically handles progress tracking via checkpoints (when used with .writeStream), ensuring that each nightly run only picks up records it hasn't seen before..load() method in the streaming API expects a file path. Providing a table name will result in an error; .table() is the correct method for registered tables.current_timestamp() is evaluated only once when the query is planned. It does not provide incremental semantics and will likely return zero rows.startingVersion or startingTimestamp. Without these parameters, a batch CDF read will fail. Furthermore, Structured Streaming is simpler as it manages offsets automatically.Ultimate access to all questions.
No comments yet.
A Spark ETL pipeline processes data nightly. One stage requires identifying new records within a Delta Lake table named bronze that have not yet been processed downstream. This table is partitioned by year, month, and day.
Which of the following designs for the new_records function effectively returns a Spark DataFrame containing only the unprocessed data from the bronze table?
A
return spark.read.option("readChangeFeed", "true").table("bronze")
B
return spark.readStream.table("bronze")
C
return spark.readStream.load("bronze")
D
return spark.read.table("bronze").filter(col("ingest_time") == current_timestamp())
E
return spark.read.table("bronze").filter(col("source_file") == f"/mnt/daily_batch/{year}/{month}/{day}")