¿Entrada Filebeat Kafka con SASL?

¿Entrada Filebeat Kafka con SASL?

Estoy intentando que filebeat consuma mensajes de Kafka usando la entrada de Kafka. No puedo autenticarme con SASL por algún motivo y no estoy seguro de por qué. La documentación tanto para Kafka como para Filebeat falta un poco cuando se intenta utilizarla con SASL.

Mi configuración de filebeat es la siguiente:

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

filebeat.inputs:
- type: kafka
  hosts: 'the.kafka.server.com:9092'
  topics: 'my_topic'
  group_id: 'my_group'
  ssl.enabled: yes
  username: "$ConnectionString"
  password: "org.apache.kafka.common.security.plain.PlainLoginModule required username='my_username' password='my_password';"

processors:
- add_cloud_metadata: ~
- add_docker_metadata: ~

output.console:
  pretty: true

La salida muestra

INFO    input/input.go:114      Starting input of type: kafka; ID: 14409252276502564738
INFO    kafka/log.go:53 kafka message: Initializing new client
INFO    kafka/log.go:53 client/metadata fetching metadata for all topics from broker the.kafka.server.com:9092

INFO    crawler/crawler.go:106  Loading and starting Inputs completed. Enabled inputs: 1
INFO    cfgfile/reload.go:171   Config reloader started
INFO    cfgfile/reload.go:226   Loading of config files completed.
INFO    kafka/log.go:53 kafka message: Successful SASL handshake. Available mechanisms: %!(EXTRA []string=[PLAIN OAUTHBEARER])
INFO    kafka/log.go:53 Failed to read response while authenticating with SASL to broker the.kafka.server.com:9092: EOF

INFO    kafka/log.go:53 Closed connection to broker the.kafka.server.com:9092

INFO    kafka/log.go:53 client/metadata got error from broker -1 while fetching metadata: EOF

No estoy seguro de lo que está pasando aquí. También intenté agregar compression: nonelo que no ayudó y verifiqué con openssl que el certificado del servidor se puede verificar. ¿Qué estoy haciendo mal aquí? El servidor Kafka en cuestión es un servidor Kafka alojado en la nube y no puedo ver la configuración del servidor. Me dieron la "cadena de conexión" de la interfaz de usuario de la nube de Kafka.

Respuesta1

Encontré el problema: la $ConnectionStringsintaxis no funciona con clústeres Kafka en la nube confluentes. La sintaxis correcta es la siguiente:

filebeat.inputs:
- type: kafka
  hosts: 'the.kafka.server.com:9092'
  topics: 'my_topic'
  group_id: 'my_group'
  ssl.enabled: yes
  username: <API KEY>
  password: <API SECRET>

Eso es suficiente para que se conecte y consuma.

información relacionada