RABBITMQ_02 - 無法確認偏移錯誤

RABBITMQ_02 - 無法確認偏移錯誤

我有一個從 RabbitMQ 讀取並將其加載到 kinesis 流的管道,但出現以下錯誤:

知道可能是什麼原因嗎?

謝謝

UNKNOWN java.lang.RuntimeException: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
  at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:80)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:857)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:823)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:563)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
  at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:512)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:112)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:74)
  at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:756)
  at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:152)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
  at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
  at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
  at com.streamsets.pipeline.stage.origin.rabbitmq.RabbitSource.commit(RabbitSource.java:187)
  at com.streamsets.pipeline.configurablestage.DSourceOffsetCommitter.commit(DSourceOffsetCommitter.java:41)
  at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:75) ... 22 more

答案1

發生這種情況的原因可能是 RabbitMQ 在處理批次時逾時。檢查 RabbitMQ 錯誤日誌。

如果是這種情況,請嘗試減少 RabbitMQ 來源中的批次大小。

相關內容