Kafka REST Proxy: conecta solo un consumidor

Kafka REST Proxy: conecta solo un consumidor

Tengo el tema de Kafka "compra2" con 2 particiones y Kafka REST Proxy para consumidores (deeste turial). Cuando ejecuto el primer consumidor para este tema, todo está bien:

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}'      http://localhost:8082/consumers/cg1

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["purchases2"]}'      http://localhost:8082/consumers/cg1/instances/ci1/subscription

 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"  "http://localhost:8082/consumers/cg1/instances/ci1/records"

pero si intento ejecutar un segundo consumidor (nueva ID) y recuperar filas:

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"name": "ci2", "format": "json", "auto.offset.reset": "earliest"}'      http://localhost:8082/consumers/cg1

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["purchases2"]}'      http://localhost:8082/consumers/cg1/instances/ci2/subscription

 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"  "http://localhost:8082/consumers/cg1/instances/ci2/records"

Recibo muchos mensajes (en el registro REST de Kafka) como: "Solicitar unirse al grupo debido a: el grupo ya se está reequilibrando":

rest-proxy    | [2023-09-21 09:58:12,367] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:15,391] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='consumer-cg1-2-c3f649ce-934c-4e8a-a87a-d2e85ac82b54', protocol='null'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,912] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,913] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The coordinator is not aware of this member.' (UnknownMemberIdException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,915] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: need to re-join with the given member-id: consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:18,414] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:21,439] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:24,461] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:27,488] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:30,511] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:33,535] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:36,560] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,475] WARN [Consumer clientId=consumer-cg1-1, groupId=cg1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,475] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Member consumer-cg1-1-9dc34071-0a6d-4fce-8e11-9e18c6bbc372 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,482] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully joined group with generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,484] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Finished assignment for group at generation 38: {consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef=Assignment(partitions=[purchases2-0, purchases2-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,495] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully synced group in generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Notifying assignor about the new Assignment(partitions=[purchases2-0, purchases2-1]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Adding newly assigned partitions: purchases2-0, purchases2-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-1 to the committed offset FetchPosition{offset=22, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

El reequilibrio y los mensajes duran aproximadamente max.poll.interval.ms, mientras que el primer consumidor se eliminará. Durante ese tiempo nadie puede obtener datos del tema. Y el segundo consumidor se conecta solo cuando el primer consumidor será expulsado por tiempo de espera (max.poll.interval.ms). Se repite cada vez que intento iniciar un segundo consumidor. El tema tiene 2 particiones y lo normal es que tenga dos consumidores, uno para cada partición. Pero por alguna razón no puedo activar el segundo consumidor.

Respuesta1

Este problema en la versión Kafka REST: 7.3.0, en la versión 7.5.0 se solucionó el problema.

Para Kafka REST 6.2.7 la misma situación pero otros mensajes en el registro:

Attempt to heartbeat failed since group is rebalancing

información relacionada