import logging from pyspark.sql.streaming import StreamingQueryListener class MyListener(StreamingQueryListener): def __init__(self): super().__init__() def onQueryStarted(self, queryStarted): # log the query started event print("LAEDE: Query started: %s" % queryStarted.id) def onQueryProgress(self, queryProgress): # log the query progress event print("LAEDE: Query made progress: %s" % queryProgress.progress) print("LAEDE: Duration in ms: %s" % queryProgress.durationMs) def onQueryTerminated(self, queryTerminated): # log the query terminated event print("LAEDE: Query terminated: %s" % queryTerminated.id) def onBatchCompleted(self, batchCompleted): # log the batch completed event and processing time print("LAEDE: Batch completed: %s, processing time: %s" % (batchCompleted.batchId, batchCompleted.batchDuration)) # Register MyListener as a listener for the query my_listener = MyListener() spark.streams.addListener(my_listener)