Tengo una canalización que lee desde RabbitMQ y la carga en Kinesis Stream y aparece el siguiente error:
¿Alguna idea de cuál podría ser la razón?
Gracias
UNKNOWN java.lang.RuntimeException: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:80)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:857)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:823)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:563)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:512)
at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:112)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:74)
at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:756)
at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:152)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
at com.streamsets.pipeline.stage.origin.rabbitmq.RabbitSource.commit(RabbitSource.java:187)
at com.streamsets.pipeline.configurablestage.DSourceOffsetCommitter.commit(DSourceOffsetCommitter.java:41)
at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:75) ... 22 more
Respuesta1
Esto puede deberse a que RabbitMQ agotó el tiempo de espera mientras se procesaba el lote. Consulte el registro de errores de RabbitMQ.
Si este es el caso, intente reducir el tamaño del lote en el origen de RabbitMQ.