
Answer-first summary for fast verification
Answer: trigger(processingTime="5 seconds")
## Explanation In Apache Spark Structured Streaming, the `trigger()` method is used to specify how often the streaming query should process data. There are several trigger types: 1. **ProcessingTime trigger**: Processes data at fixed intervals (micro-batch processing) 2. **Once trigger**: Processes all available data once and then stops 3. **Continuous trigger**: Processes data continuously (experimental) For processing data every 5 seconds using micro-batch processing, the correct syntax is: ```scala .trigger(processingTime="5 seconds") ``` **Why other options are incorrect:** - **A. trigger("5 seconds")**: This syntax is not valid in Spark Structured Streaming. The trigger method requires specifying the trigger type. - **B. trigger()**: Without parameters, this would use the default trigger which processes data as soon as possible, not at fixed intervals. - **C. trigger(once="5 seconds")**: This is incorrect syntax. The `once` trigger doesn't take a time parameter - it processes all data once and stops. - **E. trigger(continuous="5 seconds")**: Continuous trigger is for continuous processing mode, not micro-batch processing. Also, continuous trigger doesn't use time intervals in this way. **Correct usage in the code:** ```scala (spark.table("sales") .withColumn("avg_price", col("sales") / col("units")) .writeStream .option("checkpointLocation", checkpointPath) .trigger(processingTime="5 seconds") .outputMode("complete") .table("new_sales")) ``` This configuration will execute a micro-batch every 5 seconds, processing any new data that has arrived since the last micro-batch.
Author: Keng Suppaseth
Ultimate access to all questions.
No comments yet.
A data engineer has configured a Structured Streaming job to read from a table, manipulate the data, and then perform a streaming write into a new table. The code block used by the data engineer is below:
(spark.table("sales")
.withColumn("avg_price", col("sales") / col("units"))
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("complete")
.table("new_sales"))
(spark.table("sales")
.withColumn("avg_price", col("sales") / col("units"))
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("complete")
.table("new_sales"))
If the data engineer only wants the query to execute a micro-batch to process data every 5 seconds, which of the following lines of code should the data engineer use to fill in the blank?
A
trigger("5 seconds")
B
trigger()
C
trigger(once="5 seconds")
D
trigger(processingTime="5 seconds")
E
trigger(continuous="5 seconds")