Working With Ingest Pipelines In ElasticSearch And Filebeat

Published on Author gryzli

What are ingest pipelines and why you need to know about them ?

Ingest Pipelines are powerful tool that ElasticSearch gives you in order to pre-process your documents, during the Indexing process. In fact they are integrating pretty much of the Logstash functionality, by giving you the ability to configure grok filters or using different types of processors, to match and modify data.

By using Ingest pipelines, you can easily parse your log files for example and put important data into separate document values. For example, you can use grok filters to extract: date , URL, User-Agent, ….etc from a simple Apache access log entry.

You can also use existing Elastic ingest modules inside the pipelines, such as the famous geoip ingest module and the user-agent parse one.

This way you can for example generate GeoIP lookup for the ip address part of your log entry, and put it inside your document, during index time.

Inside the pipelines, you can use all of the processors Elastic gives, most of whom are described here:

https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest-processors.html

Some pros which make Ingest Pipelines better choice for pre-processing compared to Logstash

  • You skip another layer of complexity 

    • By using the pipelines, you skip the additional layer of complexity that Logstash adds to your infrastructure.

 

  • Pipelines are much easier to debug

    • Debugging in Logstash can be a nightmare ! Especially when you have big number of processing rules in Logstash, restarting Logstash (in order to for your changes to apply) can take up to several minutes. I have heard for cases, when it could take more than hour.
      During grok filter development process you may need to restart tens or hundreds of times until get your job done. Having to wait minutes for each restart, could make your life tough.

      On the other side, pipelines are heaven for debugging, compared to logstash slowness. ElasticSearch provides you with interface, where you can define your pipeline rules and test them with sample data. Or even using exisiting pipelines and test them with sample data.
      This could be done by using the “_ingest/pipeline/_simulate” interface inside Kibana->Dev tools. I’ll give examples below.

 

  • They have most of the processors Logstash gives you

    • As you know, Logstash is made by the same people making Elastic. Most of the processors you have inside Logstash, are also accessible inside Ingest Pipelines (the most important one – grok filters). Also I suppose that the code under this processors is also pretty the same.
  • Integration with Filebat

    • Filebeat supports using Ingest Pipelines for pre-processing. Actually it is already using them for all existing filebeat modules like: apache2, mysql, syslog, auditd …etc.
    • Filebeat uses its predefined module pipelines, when you configure it to ingest data directly to ElasticSearch

Modifying Filebeat Ingest Pipelines

Basically you have 2 choices – one to change existing module pipelines in order to fine-tune them, or to make new custom Filebeat module, where you can define your own pipeline.

I’m going to look at the 1st choice, modifying existing pipelines.

 

Modifying existing pipeline configuration files

Under Centos/RHEL, module configuration files are located here:

/usr/share/filebeat/module/

 

For example, if you want to edit the pipeline for Apache access logs (apache2 module), you need to edit the following file:

/usr/share/filebeat/module/apache2/access/ingest/default.json

 

Telling Filebeat to overwrite the existing pipelines

 

After you have made changes to the pipeline configuration, you need to tell Filebeat to re-import the new pipeline definitions inside Elastic.

You can do this by one of the following methods

(UPDATE) – The easiest way to reload a pipeline is by command line:

filebeat setup --pipelines --modules your_module

 

However there are some more ways of reloading the pipelines:

  • 1) Delete the pipeline from elasticsearch and restart filebeat
    • Restart Filebeat, in order to re-read your configuration 
    • First check what is the exact name of the pipeline inside elastic, you can check this by issuing:
      GET /_ingest/pipeline
      
      ----------
      # You will see all currently configured ingest pipelines
      --------
      # The result will be something like: 
        "filebeat-6.5.4-apache2-access-default" : {
      .....
      .....
         } 

       

    • After finding the exact pipeline name, you need to delete it:
      # Delete the pipeline   filebeat-6.5.4-apache2-access-default
      DELETE /_ingest/pipeline/filebeat-6.5.4-apache2-access-default
    • Restart Filebeat, and if everytthing is okay with your pipeline definitions, it will re-import it inside ElasticSearch

  • 2) Configure Filebeat to overwrite the pipelines on each restart
    • This is the easier method. You can just configure Filebeat to overwrite pipelines, and you can be sure that each time you make modification it will propagate after FB restart.
    • In order to do that, you need to add the following config to your Filebeat config:
    • vim /etc/filebeat/filebeat.yml
      ----------
      
      filebeat.overwrite_pipelines: true

       

    • Finally restart Filebеat

 

 

Testing and Troubleshooting Pipelines inside Kibana (Dev Tools)

As I mentioned earlier, ES gives us pretty nice interface for interacting with Pipelines, especially when we talk for testing and troubleshooting.

I will give some examples, how you can test and develop your pipelines by using Kibana and it’s Dev Tools.

Testing existing pipelines

You can easily test existing pipelines by using Kibana. For example let’s say we need to test the filebeat apache2 access log pipeline, which in my case is named: “filebeat-6.5.4-apache2-access-default

We are going to use the “_ingest/pipeline/_simulate” interface for this purpose.

Request (Kibana):

POST _ingest/pipeline/filebeat-6.5.4-apache2-access-default/_simulate
{
  "docs": [
      { "_source": 
          { "message" : """192.168.1.1 - - [21/Apr/2019:17:15:25 +0300] "GET /js/jquery/colorbox/controls.png HTTP/1.1" 304 - \"http://www.example.com/js/jquery/colorbox/colorbox.css" "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0\" """
          }
      }
    ]
}

Notice, that when you need to escape chars with “\” , your message should be enclosed in ‘ “”” ‘ (triple double quotes).

And the result is like:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_type",
        "_id" : "_id",
        "_source" : {
          "message" : """192.168.1.1 - - [21/Apr/2019:17:15:25 +0300] "GET /js/jquery/colorbox/controls.png HTTP/1.1" 304 - \"http://www.example.com/js/jquery/colorbox/colorbox.css" "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0\" """,
          "apache2" : {
            "access" : {
              "response_code" : "304",
              "remote_ip" : "192.168.1.1",
              "method" : "GET",
              "user_name" : "-",
              "http_version" : "1.1",
              "time" : "21/Apr/2019:17:15:25 +0300",
              "url" : "/js/jquery/colorbox/controls.png"
            }
          },
          "error" : {
            "message" : "field [source] not present as part of path [source]"
          }
        },
        "_ingest" : {
          "timestamp" : "2019-04-21T14:17:02.391Z"
        }
      }
    }
  ]
}

 

Troubleshooting or Creating Pipelines With Tests

The previous example was pretty straight forward, but it let’s you only test what’s already imported.

But let say, you want to create or modify pipeline, and play with the different processors and see how it goes. For example you want to validate if your grok filters (or modifications) will work as you expect.

You can do this as well, and here is the easy way to do it (again by using Kibana).

By using the same interface “_ingest/pipeline/_simulate” you can also define your pipeline inside the body.

In the following example, I want to make some modifications to the existing “filebeat-6.5.4-apache2-access-default” pipeline and make sure my modifications are working as expected. 

 

First , let’s take the current pipeline configuration

That could be easily done by the following request in Kibana:

GET /_ingest/pipeline/filebeat-6.5.4-apache2-access-default

 

Now by having our pipeline config, let’s make a test including the pipeline definition:

Creating pipeline on-the-fly and testing it

Execute the following inside Kibana:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Pipeline for parsing apache2 access log",
    "processors": [
      {
        "grok": {
          "ignore_missing": true,
          "field": "message",
          "patterns": [
            """%{IPORHOST:apache2.access.remote_ip} - %{DATA:apache2.access.user_name} \[%{HTTPDATE:apache2.access.time}\] "%{WORD:apache2.access.method} %{DATA:apache2.access.url} HTTP/%{NUMBER:apache2.access.http_version}" %{NUMBER:apache2.access.response_code} (?:%{NUMBER:apache2.access.body_sent.bytes}|-)( "%{DATA:apache2.access.referrer}")?( "%{DATA:apache2.access.agent}")?""",
            """%{IPORHOST:apache2.access.remote_ip} - %{DATA:apache2.access.user_name} \[%{HTTPDATE:apache2.access.time}\] "-" %{NUMBER:apache2.access.response_code} -"""
          ]
        }
      },
      {
        "remove": {
          "field": "message"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """192.168.1.1 - - [21/Apr/2019:17:15:25 +0300] "GET /js/jquery/colorbox/controls.png HTTP/1.1" 304 - \"http://www.example.com/js/jquery/colorbox/colorbox.css" "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0\" """
      }
    }
  ]
}

And you will get the result from evaluating the Pipeline:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_type",
        "_id" : "_id",
        "_source" : {
          "apache2" : {
            "access" : {
              "response_code" : "304",
              "remote_ip" : "192.168.1.1",
              "method" : "GET",
              "user_name" : "-",
              "http_version" : "1.1",
              "time" : "21/Apr/2019:17:15:25 +0300",
              "url" : "/js/jquery/colorbox/controls.png"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2019-04-21T14:28:22.969Z"
        }
      }
    }
  ]
}

 

You are also able to supply the _simulate with multiple messages, and test all of them.

 

Common Pitfalls

During my development time with Filebeat and Pipelines I’ve struggled with some pitfalls, that took me some good time to understand and handle.
I will share most of the things I can remember and see as significant, in hope to make someone else life easier :)

 

  • Updating filebeat after existing pipeline modifications

    • Beware that if you make any modifications to the existing pipelines Filebeat uses, you may end up with overwritten changes if you update Filebeat.
    • One good way to protect from automatic updates if you used the official Elastic repo, is to make sure that the repository is disabled by default.
  • Having multiple Filebeat versions in your infrastructure

    • That’s another thing to be careful with. If your infrastructure has different Filebeat versions installed, you may end up with partially modified pipelines (if you do modify your existing pipelines).
      The reason is that Filebeat creates/overwrites Pipelines, which always include it’s version in their name. For example the apache 2 access log pipeline, for FB version 6.5.4, would be named:
      filebeat-6.5.4-apache2-access-default
    • But if you have also servers with Filebeat, let say 6.5.5 version, their pipelines would be named: filebeat-6.5.5-apache2-access-default
    • This is important, because if you make modifications to your pipeline, they apply only for the current version in use by the specific Filebeat. So in order to make sure your changes are consistent through out your whole infrastructure, make sure you modify all the pipelines for different versions of Filebeat.

 

  • Error handling in pipelines

    • That’s very important especially if you are not prepared, that your document indexing will fail to be inserted in Elastic, if your pipeline processing fails. Unless you have taken some measures about it.
    • There are 2 very important actions, when dealing with pipeline processors:
    • “ignore_missing” : true
      • If you forget to add this to your processor and the field that you are trying to access/process is missing, then the pipeline will fail, and your ingestion process will be terminated. This means, you will loose the whole document, something that you don’t want most of the times.
    • “ignore_failure” : true
      • This is especially useful when using processors like Geoip or Date
        The idea is to not fail, if the processor fails for some reason.
        One such reason could be, that you have provided the Date processor with invalid date format field.
        If you don’t have the ignore_failure:true, then the whole document you try to index will be denied.

  • Having syntax errors inside Filebeat pipeline definition

    • When you are making changes to the existing pipeline config in Filebeat, always make sure, that your pipeline can be imported by Filebeat, without errors. The easiest way to check this is :
      First: Check the pipeline definition in Elastic, and make sure it matches your latest changes
      Second: Check filebeat log (/var/log/filebeat/filebeat) for any errors during the pipeline creation/overwrite process
  • Escaping strings in pipeline definitions

    • Escaping strings in filebeat ingest/default.json configuration files
      • If you need to escape some symbols inside your pipeline definition, for example “[ ] or ( ) ” inside grok filters, the right way to do it is by using two slashes: “\\”
      • So for example ‘(‘ escaped will look like ‘\\(‘
    • Escaping strings in Kibana pipeline _simulate interface
      • If you want to escape special symbols inside Kibana, during your pipeline development/testing process, the right way to do it is to first enclose the whole field with three double quotes ‘ “”” ‘, and then use backslash escaping inside the message.
    • Important
      • Keep in mind that escaping is different in Filebeat configs and Kibana /_pipeline interface. So if you try to directly copy pipeline definitions from Filebeat to Kibana or vice-versa, it will most probably fail

2 Responses to Working With Ingest Pipelines In ElasticSearch And Filebeat