
私は2つのパーティションを持つKafkaトピック「purchase2」と消費者向けのKafka RESTプロキシを持っています(このツアー)。このトピックの最初のコンシューマーを実行すると、すべて正常になります。
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/cg1
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["purchases2"]}' http://localhost:8082/consumers/cg1/instances/ci1/subscription
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" "http://localhost:8082/consumers/cg1/instances/ci1/records"
しかし、2 番目のコンシューマー (新しい ID) を実行して行を取得しようとすると、次のようになります。
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "ci2", "format": "json", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/cg1
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["purchases2"]}' http://localhost:8082/consumers/cg1/instances/ci2/subscription
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" "http://localhost:8082/consumers/cg1/instances/ci2/records"
「グループへの参加をリクエストしました。理由は、グループがすでに再調整されているためです」のようなメッセージが多数表示されます (Kafka REST ログ内)。
rest-proxy | [2023-09-21 09:58:12,367] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:15,391] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='consumer-cg1-2-c3f649ce-934c-4e8a-a87a-d2e85ac82b54', protocol='null'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,911] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,912] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,913] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The coordinator is not aware of this member.' (UnknownMemberIdException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,915] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: need to re-join with the given member-id: consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:17,919] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:18,414] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:21,439] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:24,461] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:27,488] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:30,511] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:33,535] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:36,560] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: group is already rebalancing (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,475] WARN [Consumer clientId=consumer-cg1-1, groupId=cg1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,475] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Member consumer-cg1-1-9dc34071-0a6d-4fce-8e11-9e18c6bbc372 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,477] INFO [Consumer clientId=consumer-cg1-1, groupId=cg1] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,482] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully joined group with generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,484] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Finished assignment for group at generation 38: {consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef=Assignment(partitions=[purchases2-0, purchases2-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,495] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Successfully synced group in generation Generation{generationId=38, memberId='consumer-cg1-2-ff236f69-f902-4294-847a-8bf4940354ef', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Notifying assignor about the new Assignment(partitions=[purchases2-0, purchases2-1]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,498] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Adding newly assigned partitions: purchases2-0, purchases2-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2023-09-21 09:58:38,506] INFO [Consumer clientId=consumer-cg1-2, groupId=cg1] Setting offset for partition purchases2-1 to the committed offset FetchPosition{offset=22, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
再バランスとメッセージは max.poll.interval.ms 程度続きますが、その間に最初のコンシューマーは削除されます。その間、トピックからデータを取得することはできません。また、2 番目のコンシューマーは、最初のコンシューマーがタイムアウト (max.poll.interval.ms) で追い出された場合にのみ接続します。2 番目のコンシューマーを起動しようとするたびに、この動作が繰り返されます。トピックには 2 つのパーティションがあり、通常は各パーティションに 1 つずつ、合計 2 つのコンシューマーがあります。しかし、何らかの理由で 2 番目のコンシューマーをアクティブ化できません。
答え1
この問題は Kafka REST バージョン 7.3.0 で発生しましたが、バージョン 7.5.0 では解決されました。
Kafka REST 6.2.7 でも状況は同じですが、ログには別のメッセージが表示されます:
Attempt to heartbeat failed since group is rebalancing