Ultimate access to all questions.
A data team's Structured Streaming job is configured to calculate running aggregates for item sales to update a downstream marketing dashboard. The marketing team has introduced a new promotion and wants to add a field tracking how often this promotion code is used per item. A junior data engineer proposes updating the query as shown below (changes in bold):
Original query:
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"))
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
Proposed query:
df.groupBy("item")
.agg(count("item").alias("total_count"),
mean("sale_price").alias("avg_price"),
count("promo_code = 'NEW_MEMBER'").alias("new_member_promo"))
.writeStream
.outputMode("complete")
.option("mergeSchema", "true")
.option("checkpointLocation", "/item_agg/_checkpoint")
.start("/item_agg")
What additional step is required to deploy the proposed query to production?