Kafka REST 프록시: 하나의 소비자만 연결

Kafka REST 프록시: 하나의 소비자만 연결

2개의 파티션이 있는 Kafka 주제 "purchase2"와 소비자를 위한 Kafka REST 프록시가 있습니다(이 투어). 이 주제에 대한 첫 번째 소비자를 실행하면 문제가 없습니다.

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}'      http://localhost:8082/consumers/cg1

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["purchases2"]}'      http://localhost:8082/consumers/cg1/instances/ci1/subscription

 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"  "http://localhost:8082/consumers/cg1/instances/ci1/records"

하지만 두 번째 소비자(새 ID)를 실행하고 행을 가져오려고 하면 다음과 같습니다.

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"name": "ci2", "format": "json", "auto.offset.reset": "earliest"}'      http://localhost:8082/consumers/cg1

 curl -X POST      -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["purchases2"]}'      http://localhost:8082/consumers/cg1/instances/ci2/subscription

 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"  "http://localhost:8082/consumers/cg1/instances/ci2/records"

Kafka REST 로그에서 다음과 같은 메시지를 많이 받습니다. "다음으로 인해 그룹 가입 요청: 그룹이 이미 재조정 중입니다."

rest-proxy    | [2023-09-21 09:58:12,367] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:15,391] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='consumer-cg1-2-c3f649ce-934c-4e8a-a87a-d2e85ac82b54', protocol='null'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,912] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,913] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The coordinator is not aware of this member.' (UnknownMemberIdException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,915] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: need to re-join with the given member-id: consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:18,414] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:21,439] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:24,461] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:27,488] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:30,511] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:33,535] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:36,560] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,475] WARN [Consumer clientId=consumer-cg1-1, groupId=cg1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,475] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Member consumer-cg1-1-9dc34071-0a6d-4fce-8e11-9e18c6bbc372 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,482] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully joined group with generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,484] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Finished assignment for group at generation 38: {consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef=Assignment(partitions=[purchases2-0, purchases2-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,495] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully synced group in generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Notifying assignor about the new Assignment(partitions=[purchases2-0, purchases2-1]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Adding newly assigned partitions: purchases2-0, purchases2-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy    | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-1 to the committed offset FetchPosition{offset=22, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

재조정 및 메시지는 max.poll.interval.ms 정도 지속되며 첫 번째 소비자는 삭제됩니다. 그 시간 동안에는 누구도 주제에서 데이터를 얻을 수 없습니다. 그리고 두 번째 소비자는 첫 번째 소비자가 시간 초과(max.poll.interval.ms)로 인해 추방될 경우에만 연결됩니다. 두 번째 소비자를 시작하려고 할 때마다 반복됩니다. 주제에는 2개의 부분이 있으며 일반적으로 각 부분마다 하나씩 두 명의 소비자가 있습니다. 하지만 어떤 이유로 두 번째 소비자를 활성화할 수 없습니다.

답변1

Kafka REST 버전: 7.3.0에서 이 문제가 발생했고, 버전 7.5.0에서는 문제가 해결되었습니다.

Kafka REST 6.2.7의 경우 상황은 동일하지만 로그에 다른 메시지가 있습니다.

Attempt to heartbeat failed since group is rebalancing

관련 정보