Proxy REST Kafka: conecte apenas um consumidor

Proxy REST Kafka: conecte apenas um consumidor

Eu tenho o tópico Kafka "purchase2" com 2 partições e Kafka REST Proxy para consumidores (deeste toturial). Quando executo o primeiro consumidor deste tópico, está tudo certo:

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}'      http://localhost:8082/consumers/cg1

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["purchases2"]}'      http://localhost:8082/consumers/cg1/instances/ci1/subscription

 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"  "http://localhost:8082/consumers/cg1/instances/ci1/records"

mas se eu tentar executar o segundo consumidor (novo ID) e buscar linhas:

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"name": "ci2", "format": "json", "auto.offset.reset": "earliest"}'      http://localhost:8082/consumers/cg1

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["purchases2"]}'      http://localhost:8082/consumers/cg1/instances/ci2/subscription

 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"  "http://localhost:8082/consumers/cg1/instances/ci2/records"

Recebo muitas mensagens (no log REST do Kafka) como: "Solicitar adesão ao grupo devido a: o grupo já está reequilibrando":

rest-proxy    | [2023-09-21 09:58:12,367] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:15,391] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='consumer-cg1-2-c3f649ce-934c-4e8a-a87a-d2e85ac82b54', protocol='null'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,912] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,913] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The coordinator is not aware of this member.' (UnknownMemberIdException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,915] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: need to re-join with the given member-id: consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:18,414] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:21,439] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:24,461] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:27,488] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:30,511] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:33,535] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:36,560] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,475] WARN [Consumer clientId=consumer-cg1-1, groupId=cg1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,475] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Member consumer-cg1-1-9dc34071-0a6d-4fce-8e11-9e18c6bbc372 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,482] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully joined group with generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,484] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Finished assignment for group at generation 38: {consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef=Assignment(partitions=[purchases2-0, purchases2-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,495] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully synced group in generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Notifying assignor about the new Assignment(partitions=[purchases2-0, purchases2-1]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Adding newly assigned partitions: purchases2-0, purchases2-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-1 to the committed offset FetchPosition{offset=22, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

O reequilíbrio e as mensagens duram cerca de max.poll.interval.ms , enquanto o primeiro consumidor será excluído. Durante esse tempo, ninguém pode obter dados do tópico. E o segundo consumidor se conecta somente quando o primeiro consumidor for expulso por tempo limite (max.poll.interval.ms). Isso se repete toda vez que tento iniciar qualquer segundo consumidor. O tópico possui 2 partições e é normal ter dois consumidores, um para cada parcela. Mas por algum motivo não consigo ativar o segundo consumidor.

Responder1

Este problema na versão Kafka REST: 7.3.0, na versão 7.5.0 o problema foi resolvido.

Para Kafka REST 6.2.7 a mesma situação, mas outras mensagens no log:

Attempt to heartbeat failed since group is rebalancing

informação relacionada