
Explanation:
When modifying a Structured Streaming job to add a new aggregation column, the schema of the output changes. The checkpoint location stores metadata about the query, including the schema. Reusing the same checkpoint location with a new schema will cause a schema mismatch error. Even with mergeSchema enabled, the streaming engine checks the existing checkpoint's schema during restart and detects an incompatible change. To avoid this, a new checkpoint location must be specified (Option A). mergeSchema (Option B) is necessary to evolve the sink schema but does not resolve the checkpoint conflict. Increasing shuffle partitions (Option C) is unrelated to schema changes. REFRESH TABLE (Option D) is irrelevant here, as the output is written to a path, not a registered table.
Ultimate access to all questions.
A data team's Structured Streaming job is configured to calculate running aggregates for item sales to update a downstream marketing dashboard. The marketing team has introduced a new promotion and wants to add a field tracking how often this promotion code is used per item. A junior data engineer proposes updating the query as shown below (changes in bold):
Original query:
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"))
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"))
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
Proposed query:
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"),
count("promo_code = 'NEW_MEMBER'").alias("new_member_promo"))
.writeStream
.outputMode("complete")
.option("mergeSchema", "true")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"),
count("promo_code = 'NEW_MEMBER'").alias("new_member_promo"))
.writeStream
.outputMode("complete")
.option("mergeSchema", "true")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
What additional step is required to deploy the proposed query to production?
A
Specify a new checkpointLocation
B
Remove .option('mergeSchema', 'true') from the streaming write
C
Increase the shuffle partitions to account for additional aggregates
D
Run REFRESH TABLE delta.‛/item_agg‛
No comments yet.