
Answer-first summary for fast verification
Answer: ```scala (spark.readStream.load(rawSalesLocation) .writeStream .option("checkpointLocation", checkpointPath) .outputMode("append") .table("newSales")) ```
## Explanation **Option E** is the correct answer because it represents a streaming hop from raw data to a Bronze table. Here's why: ### Key Characteristics of a Streaming Hop: 1. **Reads from raw data source** - Uses `spark.readStream.load(rawSalesLocation)` to read streaming data from raw files 2. **Streaming write** - Uses `.writeStream` to write as a streaming job 3. **Checkpointing** - Includes `.option("checkpointLocation", checkpointPath)` for fault tolerance 4. **Append mode** - Uses `.outputMode("append")` which is appropriate for streaming writes to Bronze tables ### Why Other Options Are Incorrect: **Option A, B, C**: These all read from `spark.table("sales")` which means they're reading from an existing table (likely a Bronze or Silver table), not from raw data. They're performing transformations on existing tables, not the initial ingestion from raw data. **Option D**: This uses batch processing (`spark.read.load`) not streaming, and doesn't include checkpointing, which is essential for streaming jobs. ### Bronze Table Context: In Databricks medallion architecture: - **Bronze tables** store raw, unprocessed data as it arrives from source systems - **Streaming hops** to Bronze tables typically involve reading from raw data sources (files, Kafka, etc.) and writing to Delta tables with minimal or no transformations - The checkpoint location ensures exactly-once processing semantics and fault tolerance Option E correctly demonstrates the pattern for streaming ingestion from raw data files into a Bronze table.
Author: Keng Suppaseth
Ultimate access to all questions.
No comments yet.
Which of the following queries is performing a streaming hop from raw data to a Bronze table?
A
(spark.table("sales")
.groupBy("store")
.agg(sum("sales"))
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("complete")
.table("newSales"))
(spark.table("sales")
.groupBy("store")
.agg(sum("sales"))
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("complete")
.table("newSales"))
B
(spark.table("sales")
.filter(col("units") > 0)
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.table("newSales"))
(spark.table("sales")
.filter(col("units") > 0)
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.table("newSales"))
C
(spark.table("sales")
.withColumn("avgPrice", col("sales") / col("units"))
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.table("newSales"))
(spark.table("sales")
.withColumn("avgPrice", col("sales") / col("units"))
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.table("newSales"))
D
(spark.read.load(rawSalesLocation)
.write
.mode("append")
.table("newSales"))
(spark.read.load(rawSalesLocation)
.write
.mode("append")
.table("newSales"))
E
(spark.readStream.load(rawSalesLocation)
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.table("newSales"))
(spark.readStream.load(rawSalesLocation)
.writeStream
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.table("newSales"))