Elasticstack (ELK), Suricata and pfSense Firewall – Part 3: Logstash Pipeline Additions – Suricata Alerts

Elasticstack (ELK), Suricata and pfSense Firewall – Part 3: Logstash Pipeline Additions – Suricata Alerts

Introduction

In previous parts we have configured Elasticstack (Logstash, Elasticsearch and Kibana) on an Ubuntu server instance and the Elasticbeats Filebeats log shipper on a pfSense firewall to ship Suricata IDPS logs to the Elasticstack instance. In this part of the series we will look in more depth at the Logstash service, its pipeline and capabilities to enhance and enrich the data coming from our Suricata IDPS to enable richer and more advanced visualizations in Kibana.

Logstash Pipelines

At the simplest level Logstash takes data from a source, extracts and transforms that data, and then sends it to a destination. In order to do this Logstash uses a ‘pipeline’. This pipeline has 3 main elements: Inputs, which receive data from a source; filters, which transform the data; and outputs, which send data to a destination. A pipeline requires an input and an output however filters are optional and each element of the pipeline is enabled by Logstash ‘plugins’.

Transforming Suricata pfSense data

In this section we will build up the filter step-by-step to introduce various filters and their application in order to enrich the data being received from our pfSense Suricata instance.

The starting state (from part 2)

In Part 2 of this series we configured a very basic pipeline using two configuration files. One of which contained the input portion of the pipeline and the other which contained the output portion. A Logstash pipeline or configuration may be split between multiple files (processed in file-system directory order) or kept to a single file. In this series I use the former method. Inputs and Outputs are not very frequently changed, however filters may be.

Recapping from Part 2 we have the following Logstash pipeline in place – note there are no filters:

Inputs–> –>Output

Inputs: 01-inputs.conf
#tcp syslogs stream via 5140
input {
 tcp {
 type => "syslog"
 port => 5140
 }
}
#udp syslogs stream via 5140
input {
 udp {
 type => "syslog"
 port => 5140
 }
}

# Elastic Beats input
input {
 beats {
 port => 5044
 }
}
Outputs: 30-outputs.conf
output {
elasticsearch {

hosts => localhost

index => "logstash-%{+YYYY.MM.dd}" }
# stdout { codec => rubydebug }
}

The starting point – A basic filter and how to test and debug.

Change to the Logstash configuration directory:

cd /etc/logstash/conf.d

Create and edit a file which will contain the pfSense filter:

sudo nano 10-pfsense-filter.conf

Paste the following into the file and save it:

#input { stdin { } }

filter {
   if [type] =="suricataIDPS" {
      json {
         source => "message"
      }
      date {
         match => [ "timestamp", "ISO8601" ]
      }
   }
}

#output { stdout { codec => rubydebug } }

Before we proceed any further you will notice that there are commented out ‘input’ and ‘output’ statements in this file. Often, when developing a filter pipeline, it is a good to test it and see if the transformations you are expecting are occurring (and that there are no errors). If the two lines containing the input and output statements are uncommented, and the red highlighted lines commented (the reason for this will become clear shortly)  you can run Logstash at the command line using the following command:

/usr/share/logstash/bin/logstash -f ./10-pfsense-filter.conf

This allows you to check the config for errors but also to paste sample log records onto the console and see the output that Logstash generates as shown here:

I will be using this method to illustrate the changes that occur to the record that would be output to Elasticsearch as we add to the filter configuration. The ‘test’ data can be taken by copying and pasting from two locations. Firstly if you already have data  in Elasticsearch and appearing in Kibana just copy the contents of the ‘message’ field from a record. Otherwise access the eve.json file on the pfSense instance (refer to Part 1) and copy a log line from that.

Explanation of the Filter

The filter part we have in the 10-pfsense-filter.conf file is as follows:

filter {
   if [type] =="suricataIDPS" {
      json {
         source => "message"
      }
      date {
         match => [ "timestamp", "ISO8601" ]
      }
   }
}

if: The ‘if’ statement evaluates as ‘true’ if the type field equals “suricataIDPS” this field is added by Filebeats on the pfSense device before sending to the Logstash instance, based upon our configuration of Filebeats (refer to Part 1). This is why, when testing on the command-line, we need to comment out this block as the field does not exist in the raw JSON log line we paste in. The reason for having this check is that our Elasticstack instance may be consuming logs from multiple sources, each of which may need their own sets of filters to transform the data. Without such checks Logstash passes each record through EVERY statement in the configurations. This means that the outcome (and the data sent to Elasticsearch) may not be as expected.

json: The Logstash json filter processes the record field referred to in the ‘source’ parameter as JSON data and creates a field in the record for each JSON field it is able to extract. If you refer to the screenshot above you can see that fields such as src_ip and signature have been extracted from the log message and put into fields.

date: The Logstash date filter is used to extract dates and times from fields, and then uses that date or time-stamp as the Logstash time-stamp for the event. This is important when processing data as you typically want to have your records showing the time-stamp of then the actual event occurred. If this filter is not used then Logstash will use the time-stamp of when it processes the log records – which is likely to be significantly different from the actual event time. The match parameter tells the filter to use the timestamp field and expect that to be in the ISO8601 datetime format. Note that the timestamp field was created from the JSON data by the JSON filter above.

Adding IP geo-location data for Suricata ALERTS

In order to allow us to create map based visualizations in Kibana in a later part in this series we will need to ensure the records contain data which enables this. Logstash has a filter plugin which enables this but, firstly, we need to ensure we have the MaxMind Geo-IP database available for the filter plugin to use

Install MaxMind Geo-IP PPA and configure auto-updates

A PPA is a ‘personal package archive’ and the MaxMind Libraries and Software are available here

Install the PPA as follows:

sudo add-apt-repository ppa:maxmind/ppa
sudo apt-get update

Install the geoipupdate application as follows:

sudo apt install geoipupdate

When the PPA and geoipupdate application is installed edit the /etc/GeoIP.conf file as detailed here for free GeoLite2 Databases (Note:the paths are different for the config and executable) or as follows:

sudo nano /etc/GeoIP.conf

Update the ProductIds configuration line to:

ProductIds GeoLite2-City GeoLite2-Country GeoLite-Legacy-IPv6-City GeoLite-Legacy-IPv6-Country 506 517 533

Save the file and run:

sudo geoipupdate

The GeoIP databases are in the following location, check that GeoLite2-City and GeoLite2-Country mmdb files are present.

To automate the update process use crontab. Copy the below to a file

# top of crontab

28 7 * * 5 /usr/bin/geoipupdate
# end of crontab

This crontab file would run every week on a Saturday at 07:28. To apply the crontab file run:

sudo crontab cronfile

The GeoIP databases are in

/usr/share/GeoIP

and will be referred to in the filter which we will now configure

Update Filter to add GeoIP fields

Edit your 10-pfsense-filter.conf file as shown here. The highlighted section is the newly added part:
filter {
    if [type] =="suricataIDPS" {
        json {
            source => "message"
        }
        date {
            match => [ "timestamp", "ISO8601" ]
        }
# For Suricata Alerts events set the geoip data based upon the source address

        if [event_type] == "alert" {
            if [src_ip]  {
                geoip {
                    source => "src_ip"
                    target => "geoip"
                    database => "/usr/share/GeoIP/GeoLite2-City.mmdb"
                }
                mutate {
                    convert => [ "[geoip][coordinates]", "float" ]
                }
            }
            else if ![geoip.ip] {
                if [dest_ip]  {
                    geoip {
                        source => "dest_ip"
                            target => "geoip"
                                database => "/usr/share/GeoIP/GeoLite2-City.mmdb"
                    }
                    mutate {
                        convert => [ "[geoip][coordinates]", "float" ]
                    }
                }
            }
        }
    }
}


Explanation of the Filter additions

if [event_type] == “alert”: This if-block is processed when the event-type (As extracted by the JSON filter) is “alert”. This is because for an alert you are interested in the location of the ‘source’ of the attempt (at least when viewing from the context of your external ‘WAN’ interface).

if [src_ip]: This if-block is processed if the [src_ip] field is present

geoip: The Logstash geoip filter adds geo-location data to the record. In this case it is using the src_ip as the IP address that will be checked and the geolocation data will be created under the geoip  field. The previously installed MaxMind GeoIP database is used to determine the geolocation of the IP address.

mutate: The Logstash mutate filter is used for general manipulation of fields. In this case it is converting, or forcing, the datatype of the geoip.coordinates field to be a float data type. This is required for the Kibana visualizations.

The remainder of the filter configuration checks if the geoip.ip field exists (which it should if the src_ip address does and the geoip filter has added fields). If it doesn’t it will fall-back to using the dest_ip field to calculate the geolocation data.

Below you can see the additional fields added by Logstash:

Adding Further Signature Information for Suricata Alerts

To provide some more ‘human readable’ information on the alerts and also links to the signature information (where available) we are going to add a couple of fields.

Edit your 10-pfsense-filter.conf file as shown here. The highlighted section is the newly added part:

filter {
    if [type] =="suricataIDPS" {
        json {
            source => "message"
        }
        date {
            match => [ "timestamp", "ISO8601" ]
        }
# For Suricata Alerts events set the geoip data based upon the source address

        if [event_type] == "alert" {
            if [src_ip]  {
                geoip {
                    source => "src_ip"
                    target => "geoip"
                    database => "/etc/logstash/GeoLite2-City.mmdb"
                }
                mutate {
                    convert => [ "[geoip][coordinates]", "float" ]
                }
            }
            else if ![geoip.ip] {
                if [dest_ip]  {
                    geoip {
                        source => "dest_ip"
                            target => "geoip"
                                database => "/etc/logstash/GeoLite2-City.mmdb"
                    }
                    mutate {
                        convert => [ "[geoip][coordinates]", "float" ]
                    }
                }
            }
			
# Add additional fields related to the signature

            if [alert][signature] =~ /^ET/ {
                mutate {
                    add_tag => [ "ET-Sig" ]
                    add_field => [ "ids_rule_type", "Emerging Threats" ]
                    add_field => [ "Signature_Info", "http://doc.emergingthreats.net/bin/view/Main/%{[alert][signature_id]}" ]
                }
            }
            if [alert][signature] =~ /^SURICATA/ {
                mutate {
                    add_tag => [ "SURICATA-Sig" ]
                    add_field => [ "ids_rule_type", "Suricata" ]
			    }
            }

		
        }
    }
}

Explanation of the Filter additions

if blocks: These check the contents of the alert.signature field to determine if the rule is an Emerging Threats or a Suricata signature. Note the matches are using regex.

mutate: The Logstash mutate filter is used for general manipulation of fields. In this case it is adding a tag to the tags field and up to two new fields to the record. If the Signature_Info field is added it is using the contents of the alert.signature_id field to make a full URL to the Emerging Threats Signature information.

Below you can see the additional fields added by Logstash:

Adding FQDNs via reverse IP lookup and adding TCP/UDP service names

We are going to add FQDN fiels for both the source and destination IP addresses using reverse DNS lookups with the Logstash DNS filter and we are going to use the Logstash translate filter to lookup the destination TCP/UDP port number from a CSV file and add a field containing the service name.

Preparation for the translate filter

The translate filter is a ‘community supported’ filter and is not installed as part of the standard Logstash package, eventhough it is fully documented on the Logstash site. Installation of the filter is straightforward via the following command:

sudo /usr/share/logstash/bin/logstash-plugin install logstash-filter-translate

In order to use the translate filter there needs to be a dictionary file against which the filter performs lookups. The file can be in CSV, JSON or YAML format. In this instance we are going to use a CSV file. Create a dictionary file by copying the contents of this file: service-names-port-numbers into:

/usr/share/logstash/dictionary/service-names-port-numbers.csv

Here is a sample of the file. You will notice there are only two fields and the first line details the field names. The first field is used as the ‘lookup’ value and the second field is the data returned by the filter.

Port Number,Service Name
1,tcpmux
2,compressnet
3,compressnet
5,rje
7,echo
9,discard
11,systat
13,daytime
17,qotd
18,msp
19,chargen
20,ftp-data
21,ftp

Additions to the filter configuration

Edit your 10-pfsense-filter.conf file as shown here. The highlighted section is the newly added part:

filter {
    if [type] =="suricataIDPS" {
        json {
            source => "message"
        }
        date {
            match => [ "timestamp", "ISO8601" ]
        }
# For Suricata Alerts events set the geoip data based upon the source address

        if [event_type] == "alert" {
            if [src_ip]  {
                geoip {
                    source => "src_ip"
                    target => "geoip"
                    database => "/etc/logstash/GeoLite2-City.mmdb"
                }
                mutate {
                    convert => [ "[geoip][coordinates]", "float" ]
                }
            }
            else if ![geoip.ip] {
                if [dest_ip]  {
                    geoip {
                        source => "dest_ip"
                            target => "geoip"
                                database => "/etc/logstash/GeoLite2-City.mmdb"
                    }
                    mutate {
                        convert => [ "[geoip][coordinates]", "float" ]
                    }
                }
            }
			
# Add additional fields related to the signature

            if [alert][signature] =~ /^ET/ {
                mutate {
                    add_tag => [ "ET-Sig" ]
                    add_field => [ "ids_rule_type", "Emerging Threats" ]
                    add_field => [ "Signature_Info", "http://doc.emergingthreats.net/bin/view/Main/%{[alert][signature_id]}" ]
                }
            }
            if [alert][signature] =~ /^SURICATA/ {
                mutate {
                    add_tag => [ "SURICATA-Sig" ]
                    add_field => [ "ids_rule_type", "Suricata" ]
			    }
            }

		
        }
		
# Add FQDN via reverse DNS lookup		

        mutate {
                add_field => { "src_FQDN" => "%{src_ip}" }
                add_field => { "dest_FQDN" => "%{dest_ip}" }
        }

# Bug here means that a separate DNS filter for each field needing reverse
# lookup  needs to be used.
# https://github.com/logstash-plugins/logstash-filter-dns/issues/5

        dns {
                action => "replace"
                reverse => [ "src_FQDN" ]
        }
        dns {
                action => "replace"
                reverse => [ "dest_FQDN" ]
        }


# Add TCP/UDP Service names
# Install community maintained translate filter:
#   sudo /usr/share/logstash/bin/logstash-plugin install logstash-filter-translate

        translate {
                dictionary_path => '/usr/share/logstash/dictionary/service-names-port-numbers.csv'
                field => "dest_port"
                destination => "dest_port_serviceName"
        }				
    }
}

Explanation of the Filter additions

mutate: The Logstash mutate filter is used for general manipulation of fields. We are using it to add two new field src_FQDN and dest_FQDN containing the existing contents of src_ip and dest_ip (ip addresses). This is needed due to the way the dns filter works.

dns: The Logstash DNS filter performs either a forward or reverse DNS lookup. In this instance we are using a reverse DNS lookup on the contents of the fields created by the mutate filter in the above step. The dns filter replaces the contents of this field with its result if it is successful as resolving the address, or leaves it as-is if it is not. This means our data will contain a FQDN in these fields where available or the IP address if not.

translate: The Logstash translate filter is a general search and replace tool that uses a dictionary to insert or replace values. In this configuration we are using the .csv file uploaded earlier and performing a lookup using the dest_port field. The result (where available) of the dictionary lookup is then written to a new field dest_port_serviceName.

Below you can see the additional fields added by Logstash:

Conclusion

By expanding the Logstash filter configuration we have added additional fields to the JSON log data being sent from our pfSense Suricata instance by Filebeats. These fields will be used in the next post in the series to create a Suricata alerts dashboard containing visualizations of the data

 

Final Files

Here are links to the final filter configuration and the tcp/udp port csv file:

service-names-port-numbers

10-pfsense-filter