
Answer-first summary for fast verification
Answer: Specify a new checkpointLocation
When modifying a Structured Streaming job to add a new aggregation column, the schema of the output changes. The checkpoint location stores metadata about the query, including the schema. Reusing the same checkpoint location with a new schema will cause a schema mismatch error. Even with `mergeSchema` enabled, the streaming engine checks the existing checkpoint's schema during restart and detects an incompatible change. To avoid this, a new checkpoint location must be specified (Option A). `mergeSchema` (Option B) is necessary to evolve the sink schema but does not resolve the checkpoint conflict. Increasing shuffle partitions (Option C) is unrelated to schema changes. `REFRESH TABLE` (Option D) is irrelevant here, as the output is written to a path, not a registered table.
Author: LeetQuiz Editorial Team
Ultimate access to all questions.
A data team's Structured Streaming job is configured to calculate running aggregates for item sales to update a downstream marketing dashboard. The marketing team has introduced a new promotion and wants to add a field tracking how often this promotion code is used per item. A junior data engineer proposes updating the query as shown below (changes in bold):
Original query:
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"))
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"))
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
Proposed query:
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"),
count("promo_code = 'NEW_MEMBER'").alias("new_member_promo"))
.writeStream
.outputMode("complete")
.option("mergeSchema", "true")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"),
count("promo_code = 'NEW_MEMBER'").alias("new_member_promo"))
.writeStream
.outputMode("complete")
.option("mergeSchema", "true")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
What additional step is required to deploy the proposed query to production?
A
Specify a new checkpointLocation
B
Remove .option('mergeSchema', 'true') from the streaming write
C
Increase the shuffle partitions to account for additional aggregates
D
Run REFRESH TABLE delta.‛/item_agg‛
No comments yet.