
Answer-first summary for fast verification
Answer: return spark.readStream.table("bronze")
The function `new_records()` is designed to identify and process new records that have been appended to the 'bronze' table but not yet processed further in the pipeline. The correct approach involves using a streaming DataFrame to read from the 'bronze' table incrementally. This is achieved with `spark.readStream.table("bronze")`, which is option A. This method efficiently handles new data by leveraging Delta Lake's streaming capabilities, making it the suitable choice for the scenario described. Option B, which involves the Change Data Feed (CDF) feature, is not applicable here as there's no indication that CDF is enabled or required for this task.
Author: LeetQuiz Editorial Team
Ultimate access to all questions.
No comments yet.
A nightly job ingests data into a Delta Lake table using the following code:
from pyspark.sql.functions import current_timestamp, input_file_name, col
from pyspark.sql.column import Column
def ingest_daily_batch(time_col: Column, year: int, month: int, day: int):
(spark.read
.format("parquet")
.load(f"/mnt/daily_batch/{year}/{month}/{day}")
.select("*",
time_col.alias("ingest_time"),
input_file_name().alias("source_file"))
.write
.mode("append")
.saveAsTable("bronze"))
from pyspark.sql.functions import current_timestamp, input_file_name, col
from pyspark.sql.column import Column
def ingest_daily_batch(time_col: Column, year: int, month: int, day: int):
(spark.read
.format("parquet")
.load(f"/mnt/daily_batch/{year}/{month}/{day}")
.select("*",
time_col.alias("ingest_time"),
input_file_name().alias("source_file"))
.write
.mode("append")
.saveAsTable("bronze"))
The next step in the pipeline requires a function that returns an object that can be used to manipulate new records that have not yet been processed to the next table in the pipeline.
Which code snippet completes this function definition?
def new_records():
def new_records():
A
return spark.readStream.table("bronze")
B
return spark.read.option("readChangeFeed", "true").table ("bronze")
C
return (spark.read.table("bronze").filter(col("ingest_time") == current_timestamp())
D
return spark.read.option("readChangeFeed","true").table("bronze")
E
return (spark.read.table("bronze").filter(col("source_file") == f"/mnt/daily_batch/{year}/{month}/{day}")