2025-05-22 01:38:01,089 INFO streaming.MicroBatchExecution: Streaming query made progress: { "id" : "ae8842dd-00ce-4e61-baa0-7a77048f3731", "runId" : "dec9516d-8d21-41ef-9b36-444ba79d5d15", "name" : null, "timestamp" : "2025-05-22T08:38:01.087Z", "batchId" : 1, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "latestOffset" : 2, "triggerExecution" : 2 }, "eventTime" : { "watermark" : "1970-01-01T00:00:00.000Z" }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaV2[Subscribe[sex]]", "startOffset" : { "sex" : { "0" : 1 } }, "endOffset" : { "sex" : { "0" : 1 } }, "latestOffset" : { "sex" : { "0" : 1 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "metrics" : { "avgOffsetsBehindLatest" : "0.0", "maxOffsetsBehindLatest" : "0", "minOffsetsBehindLatest" : "0" } } ], "sink" : { "description" : "ForeachWriterTable(org.apache.spark.sql.execution.python.PythonForeachWriter@51ccfecc,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1622/728218895@50775f2f))", "numOutputRows" : -1 } }
时间: 2025-05-30 17:11:54 浏览: 28
### Spark Structured Streaming 日志分析
#### Kafka 数据源的日志记录
当使用 `Kafka` 作为数据源时,Spark Structured Streaming 的日志主要集中在以下几个方面:
- **偏移量管理**:每批次处理完成后,框架会自动提交消费的偏移量至指定位置(如 Kafka 或 Checkpoint)。这可以通过配置选项 `.option("enable.auto.commit", "true/false")` 控制[^1]。
- **分区分配**:日志中会显示当前任务所分配的具体 Kafka 分区及其对应的起始和结束偏移量。这些信息通常以 DEBUG 级别打印,便于排查数据丢失或重复问题。
```scala
val kafkaSourceDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "iotTopic")
.option("startingOffsets", "earliest")
.load()
```
此代码片段展示了如何加载 Kafka 数据源并设置初始偏移量为最早的消息。
---
#### ForeachWriter 的日志行为
`ForeachWriter` 是一种灵活的自定义 Sink 实现方式,允许开发者控制每一行数据的写入逻辑。其日志内容主要包括以下几点:
- **初始化阶段**:每当启动一个新的微批处理作业时,`open()` 方法会被调用一次。此时会在日志中记录资源初始化的状态,例如数据库连接是否成功建立。
- **逐条处理阶段**:在 `process(row)` 中执行具体业务逻辑期间,可能会生成 INFO 或 TRACE 级别的日志消息,帮助追踪单条记录的流向。
- **关闭阶段**:无论处理过程是否异常终止,在最后都会调用 `close(onError)` 来释放外部依赖(比如关闭文件句柄或者断开网络链接),这部分操作也会被详细记录下来以便后续审计[^4]。
下面是一个简单的例子展示如何利用 ForeachWriter 将结果保存到 HDFS:
```java
class MyCustomWriter extends ForeachWriter<Row> {
private BufferedWriter writer;
@Override
public boolean open(long partitionId, long version) {
try {
Path path = new Path("/path/to/output/" + UUID.randomUUID());
FileSystem fs = FileSystem.get(new Configuration());
OutputStream os = fs.create(path);
this.writer = new BufferedWriter(new OutputStreamWriter(os));
return true;
} catch (IOException e) {
System.err.println(e.getMessage());
return false;
}
}
@Override
public void process(Row row) {
String outputLine = row.toString();
try {
writer.write(outputLine);
writer.newLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close(Throwable errorOrNull) {
if(writer != null){
try{
writer.close();
}catch(IOException ex){
// Handle exception appropriately.
}
}
}
}
```
上述 Java 版本实现了基本功能的同时也体现了良好的错误恢复机制设计思路[^4]。
---
#### 综合日志分析建议
为了更高效地监控整个流程中的健康状况以及定位潜在瓶颈所在之处,可以从如下几个维度入手进行深入剖析:
1. 设置合适的日志级别(推荐生产环境采用 WARN/ERROR 而开发调试环境下则可适当放宽至 INFO/TRACE);
2. 定期审查 checkpoint 文件夹下的元数据结构变化趋势图谱;
3. 结合实际应用场景需求调整 batch interval 和 parallelism 参数值大小关系从而达到最优性能平衡点。
---
阅读全文
相关推荐


















