Logstash tcp 輸入未傳遞到elasticsearch

Logstash tcp 輸入未傳遞到elasticsearch

在使用檔案輸入、logstash-forwarder 成功設定 ELK 並查看來自幾台伺服器的 Kibana 流中的日誌後,我嘗試設定 TCP 輸入:

tcp {
    codec => "json"
    host => "localhost"
    port => 9250
    tags => ["sensu"]
  }

發送者是 sensu,訊息確實是 JSON 格式 - 使用 tcpdump 命令檢查了這一點。

Logstash 日誌表示連線已被接受:

{:timestamp=>"2015-06-15T14:03:39.832000+1000", :message=>"Accepted connection", :client=>"127.0.0.1:38065", :server=>"localhost:9250", :level=>:debug, :file=>"logstash/inputs/tcp.rb", :line=>"146", :method=>"client_thread"}
{:timestamp=>"2015-06-15T14:03:39.962000+1000", :message=>"config LogStash::Codecs::JSONLines/@charset = \"UTF-8\"", :level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
{:timestamp=>"2015-06-15T14:03:39.963000+1000", :message=>"config LogStash::Codecs::Line/@charset = \"UTF-8\"", :level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}

然而,這些數據似乎沒有進一步發展,並且在 Kibana 中找不到。

我盡可能停用其他輸入,然後觀察 elasticsearch 中的分片(curl 'localhost:9200/_cat/shards'),它的大小沒有增加。

根據這個連結我走在正確的軌道上,但可能只是在某個地方做了一些愚蠢的事情......提前致謝。

Logstash.conf:

input {
  file {
    path => ["/var/log/messages", "/var/log/secure", "/var/log/iptables"]
    type => "syslog"
    start_position => "end"
  }

  lumberjack {
    port => 5043
    type => "logs"
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }

  tcp {
    codec => "json"
    host => "localhost"
    port => 9250
    tags => ["sensu"]
  }

}

output {
  elasticsearch {
    host => "localhost"
    cluster => "webCluster"
  }
}

彈性搜尋.yml:

cluster.name: webCluster
node.name: "bossNode"
node.master: true
node.data: true
index.number_of_shards: 1
index.number_of_replicas: 0
network.host: localhost

答案1

經過幾天令人沮喪的日子後,我得出結論,json/json_lines 編解碼器已損壞 - 可能僅在與 tcp 輸入一起使用時。

但是,我找到了一個解決方法,使用過濾器:

filter {
  if ("sensu" in [tags]) {
    json {
      "source" => "message"
    }
  }
}

這和一些突變產生了我最初想要達到的效果。對於後代,這是我的工作logstash.conf,它結合了來自sensu的日誌和CPU/記憶體指標資料:

input {
  file {
    path => [
      "/var/log/messages"
      , "/var/log/secure"
    ]
    type => "syslog"
    start_position => "end"
  }

  file {
    path => "/var/log/iptables"
    type => "iptables"
    start_position => "end"
  }

  file {
    path => ["/var/log/httpd/access_log"
        ,"/var/log/httpd/ssl_access_log"
    ]
    type => "apache_access"
    start_position => "end"
  }

  file {
    path => [
      "/var/log/httpd/error_log"
      , "/var/log/httpd/ssl_error_log"
    ]
    type => "apache_error"
    start_position => "end"
  }

  lumberjack {
    port => 5043
    type => "logs"
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }

  tcp {
    host => "localhost"
    port => 9250
    mode => "server"
    tags => ["sensu"]
  }

}

filter {
  if ("sensu" in [tags]) {
    json {
      "source" => "message"
    }
    mutate {
      rename => { "[check][name]" => "type" }
      replace => { "host" => "%{[client][address]}" }
      split => { "[check][output]" => " " }
      add_field => { "output" => "%{[check][output][1]}" }
      remove_field => [ "[client]", "[check]", "occurrences" ]
    }
  } else if([type] == "apache_access") {
    grok {
      match => { "message" => "%{IP:client}" }
    }
  }
}

filter {
  mutate {
    convert => { "output" => "float" }
  }
}

output {
  elasticsearch {
    host => "localhost"
    cluster => "webCluser"
  }
}

與問題無關:“輸出”作為用空格分隔的多個值接收,因此是“拆分”操作。使用第二個元素,然後將其轉換為浮點數,以便 Kibana 可以很好地繪製它(這是我通過艱難的方式學到的)。

相關內容