
Ich versuche, Filebeat dazu zu bringen, Nachrichten von Kafka mithilfe des Kafka-Inputs zu verarbeiten. Ich kann mich aus irgendeinem Grund nicht mit SASL authentifizieren und bin mir nicht sicher, warum das so ist. Die Dokumentation für Kafka und Filebeat ist beim Versuch, es mit SASL zu verwenden, etwas mangelhaft.
Meine Filebeat-Konfiguration ist wie folgt:
filebeat.config:
modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
filebeat.inputs:
- type: kafka
hosts: 'the.kafka.server.com:9092'
topics: 'my_topic'
group_id: 'my_group'
ssl.enabled: yes
username: "$ConnectionString"
password: "org.apache.kafka.common.security.plain.PlainLoginModule required username='my_username' password='my_password';"
processors:
- add_cloud_metadata: ~
- add_docker_metadata: ~
output.console:
pretty: true
Die Ausgabe zeigt
INFO input/input.go:114 Starting input of type: kafka; ID: 14409252276502564738
INFO kafka/log.go:53 kafka message: Initializing new client
INFO kafka/log.go:53 client/metadata fetching metadata for all topics from broker the.kafka.server.com:9092
INFO crawler/crawler.go:106 Loading and starting Inputs completed. Enabled inputs: 1
INFO cfgfile/reload.go:171 Config reloader started
INFO cfgfile/reload.go:226 Loading of config files completed.
INFO kafka/log.go:53 kafka message: Successful SASL handshake. Available mechanisms: %!(EXTRA []string=[PLAIN OAUTHBEARER])
INFO kafka/log.go:53 Failed to read response while authenticating with SASL to broker the.kafka.server.com:9092: EOF
INFO kafka/log.go:53 Closed connection to broker the.kafka.server.com:9092
INFO kafka/log.go:53 client/metadata got error from broker -1 while fetching metadata: EOF
Ich bin mir nicht sicher, was hier passiert. Ich habe auch versucht, hinzuzufügen, compression: none
was nicht geholfen hat, und mit OpenSSL überprüft, ob das Serverzertifikat überprüft werden kann. Was mache ich hier falsch? Der fragliche Kafka-Server ist ein in der Cloud gehosteter Kafka-Server und ich kann die Serverkonfiguration nicht sehen. Ich habe die „Verbindungszeichenfolge“ von der Cloud-Benutzeroberfläche von Kafka erhalten.
Antwort1
Ich habe das Problem gefunden, die $ConnectionString
Syntax funktioniert nicht mit konfluenten Cloud-Kafka-Clustern. Die korrekte Syntax lautet wie folgt:
filebeat.inputs:
- type: kafka
hosts: 'the.kafka.server.com:9092'
topics: 'my_topic'
group_id: 'my_group'
ssl.enabled: yes
username: <API KEY>
password: <API SECRET>
Das reicht aus, um eine Verbindung herzustellen und zu konsumieren