
Answer-first summary for fast verification
Answer: Duplicate records enqueued more than 2 hours apart may be retained and the orders table may contain duplicate records with the same `customer_id` and `order_id`.
## Explanation This question tests understanding of Structured Streaming's watermarking and deduplication behavior. **Key Concepts:** 1. **Watermarking**: `withWatermark("time", "2 hours")` defines a watermark that tracks the maximum event time seen in the data. Records with event time older than `max_event_time - 2 hours` are considered "late" and may be dropped. 2. **dropDuplicates() with watermark**: When `dropDuplicates()` is used with a watermark, Spark maintains state for deduplication only for records within the watermark window. Records older than the watermark threshold are considered "too late" and their state may be cleaned up. 3. **Trigger(once=True)**: This runs the streaming query once, processing all available data as a single batch. **Why Option A is correct:** - The watermark is set to 2 hours, meaning Spark will maintain state for deduplication for records within a 2-hour window of the maximum event time. - If duplicate records arrive more than 2 hours apart, the state for the first record may have been cleaned up by the time the second duplicate arrives (since it's outside the 2-hour watermark window). - When the state is cleaned up, Spark no longer "remembers" that it has seen the first record, so the second duplicate will be processed as a new record. - This means duplicates arriving more than 2 hours apart may both be written to the `orders` table. **Why other options are incorrect:** - **B**: Records are not held for 2 hours before deduplication; watermarking affects state cleanup, not processing delay. - **C**: The table will contain all records, not just the most recent 2 hours. Watermarking affects state cleanup, not which records are written. - **D**: This is partially correct about duplicates arriving more than 2 hours apart being dropped, but incorrect about duplicates in the same batch. `dropDuplicates()` works within a batch, so duplicates in the same batch would be dropped. - **E**: Records arriving more than 2 hours late may be dropped, but the main issue is that duplicates arriving far apart may both be retained due to state cleanup. **Important Note**: The `trigger(once=True)` means this runs as a batch job, but the watermark logic still applies to state management for deduplication.
Author: Keng Suppaseth
Ultimate access to all questions.
No comments yet.
A task orchestrator has been configured to run two hourly tasks. First, an outside system writes Parquet data to a directory mounted at /mnt/raw_orders/. After this data is written, a Databricks job containing the following code is executed:
(spark.readStream
.format("parquet")
.load("/mnt/raw_orders/")
.withWatermark("time", "2 hours")
.dropDuplicates(["customer_id", "order_id"])
.writeStream
.trigger(once=True)
.table("orders")
)
(spark.readStream
.format("parquet")
.load("/mnt/raw_orders/")
.withWatermark("time", "2 hours")
.dropDuplicates(["customer_id", "order_id"])
.writeStream
.trigger(once=True)
.table("orders")
)
Assume that the fields customer_id and order_id serve as a composite key to uniquely identify each order, and that the time field indicates when the record was queued in the source system.
If the upstream system is known to occasionally enqueue duplicate entries for a single order hours apart, which statement is correct?
A
Duplicate records enqueued more than 2 hours apart may be retained and the orders table may contain duplicate records with the same customer_id and order_id.
B
All records will be held in the state store for 2 hours before being deduplicated and committed to the orders table.
C
The orders table will contain only the most recent 2 hours of records and no duplicates will be present.
D
Duplicate records arriving more than 2 hours apart will be dropped, but duplicates that arrive in the same batch may both be written to the orders table.
E
The orders table will not contain duplicates, but records arriving more than 2 hours late will be ignored and missing from the table.