RABBITMQ_02 - Не удалось подтвердить смещение ERROR

RABBITMQ_02 - Не удалось подтвердить смещение ERROR

У меня есть конвейер, который считывает данные из RabbitMQ и загружает их в поток Kinesis, и я получаю следующую ошибку:

есть идеи, в чем может быть причина?

Спасибо

UNKNOWN java.lang.RuntimeException: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
  at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:80)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:857)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:823)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:563)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
  at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:512)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:112)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:74)
  at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:756)
  at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:152)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
  at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
  at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
  at com.streamsets.pipeline.stage.origin.rabbitmq.RabbitSource.commit(RabbitSource.java:187)
  at com.streamsets.pipeline.configurablestage.DSourceOffsetCommitter.commit(DSourceOffsetCommitter.java:41)
  at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:75) ... 22 more

решение1

Это может происходить из-за того, что RabbitMQ истечет время ожидания во время обработки пакета. Проверьте журнал ошибок RabbitMQ.

Если это так, попробуйте уменьшить размер пакета в источнике RabbitMQ.

Связанный контент