
我有 Kafka 主題“purchase2”,帶有 2 個分區和麵向消費者的 Kafka REST 代理(來自這個教程)。當我運行該主題的第一個消費者時,一切正常:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/cg1
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["purchases2"]}' http://localhost:8082/consumers/cg1/instances/ci1/subscription
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" "http://localhost:8082/consumers/cg1/instances/ci1/records"
但如果我嘗試運行第二個消費者(新 ID)並獲取行:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "ci2", "format": "json", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/cg1
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["purchases2"]}' http://localhost:8082/consumers/cg1/instances/ci2/subscription
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" "http://localhost:8082/consumers/cg1/instances/ci2/records"
我收到很多訊息(在 Kafka REST 日誌中),例如:「請求加入群組,因為:群組已經在重新平衡」:
rest-proxy | [2023-09-21 09:58:12,367] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:15,391] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='consumer-cg1-2-c3f649ce-934c-4e8a-a87a-d2e85ac82b54', protocol='null'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,912] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,913] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The coordinator is not aware of this member.' (UnknownMemberIdException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,915] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: need to re-join with the given member-id: consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:18,414] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:21,439] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:24,461] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:27,488] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:30,511] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:33,535] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:36,560] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,475] WARN [Consumer clientId=consumer-cg1-1, groupId=cg1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,475] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Member consumer-cg1-1-9dc34071-0a6d-4fce-8e11-9e18c6bbc372 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,482] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully joined group with generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,484] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Finished assignment for group at generation 38: {consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef=Assignment(partitions=[purchases2-0, purchases2-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,495] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully synced group in generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Notifying assignor about the new Assignment(partitions=[purchases2-0, purchases2-1]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Adding newly assigned partitions: purchases2-0, purchases2-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-1 to the committed offset FetchPosition{offset=22, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
重新平衡和訊息持續約 max.poll.interval.ms ,而第一個消費者將被刪除。在此期間,沒有人可以從主題獲取數據。只有當第一個消費者因逾時(max.poll.interval.ms)而被踢出時,第二個消費者才會連結。每次我嘗試啟動第二個消費者時,它都會重複。主題有 2 個分區,通常有兩個消費者,每個分區一個。但由於某些原因我無法啟動第二個消費者。
答案1
這個問題在Kafka REST版本:7.3.0中出現,在7.5.0版本中問題解決。
對於 Kafka REST 6.2.7,情況相同,但日誌中有另一個訊息:
Attempt to heartbeat failed since group is rebalancing