Tag Archives: filebeat.yml

Logstash, File Input, CSV Filter and Elasticsearch Output

Logstash, File Input Plugin, CSV Filter and Elasticsearch Output Plugin Example will read data from CSV file, Logstash will parse this data and store in Elasticsearch.

Pre-Requisite

  • Logstash 5.X
  • Elasticsearch 5.X

Below Logstash configuration file is considered based data in CSV file.You can modify  this configuration file as per you data in your CSV file.

Logstash File Input CSV Filter Elasticsearch Output

Sample Data 

transactions-sample-data.txt

TRANSACTION_COUNT|TRANSACTION_DATE|TRANSACTION_TYPE|SERVER
18|07/24/2017|New Customer|SVR-1
9|07/25/2017|Online Customer|SVR-2
9|07/26/2017|Agents|SVR-3
12|07/24/2017|In Store|SVR-1
13|07/25/2017|New Customer|SVR-2
18|07/26/2017|Online Customer|SVR-3
21|07/24/2017|Agents|SVR-2
13|07/25/2017|In Store|SVR-3
15|07/26/2017|New Customer|SVR-4

Logstash Configuration File

Create Logstastash configuration file logstash- installation-dir/bin/transaction-test.conf and paste below content.

input {
    file {
        path => "/opt/app/facinissuesonit/transactions-sample-data.txt"
        start_position => beginning
    }
}
filter {
    csv {
        #add mapping columns name correspondily values assigned
        columns => ["TRANSACTION_COUNT","TRANSACTION_DATE","TRANSACTION_TYPE","SERVER"]
        separator => "|"
        remove_field => ["message"]
        }
#Date filter is used to convert date to @Timestamp sho that chart in Kibana will show as per date
    date {
        match => ["TRANSACTION_DATE", "MM/dd/yyyy"]
    }
#Remove first header line to insert in elasticsearch
    if  [TRANSACTION_TYPE] =~ "TRANSACTION_TYPE"
{
drop {}
}
}
output {

	elasticsearch {
   # Create Index based on date
		index => "app-transactions-%{+YYYY.MM.dd}"
    		hosts => ["elasticsearver:9200"]
  		}
#Console Out put
stdout
         {
         codec => rubydebug
        # debug => true
         }
}

Information about configuration file :

File Input Plugin :  will read data from file and because we set as start-position as “Beginning” will always read file form start.

CSV Filter : This filter will read each line message , split based on “|” and map with corresponding column mentioned position and finally will remove this message field because data is parsed now.

Date Filter : This filter will map TRANSACTION_DATE  to @timestamp value for Index for each document and it says to TRANSACTION_DATE  is having pattern as “MM/dd/YYYY” so that when converting to timestamp will follow same.

drop: Drop is for removing header line if field name match with content.

Run Logstash Configuration with below command

 [logstash-installation-dir]/bin/logstash -f transaction-test.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of TRANSACTION_DATE and for elasticsearch output configuration for index name app-transactions-%{+YYYY.MM.dd} will create 3 indexes based on @timestamp value as   app-transactions-2017.07.24 , app-transactions-2017.07.25, app-transactions-2017.07.26 for sample data.

{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/24/2017",
"@timestamp" => 2017-07-24T04:00:00.000Z,
"SERVER" => "SVR-1",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "New Customer",
"TRANSACTION_COUNT" => "18"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/25/2017",
"@timestamp" => 2017-07-25T04:00:00.000Z,
"SERVER" => "SVR-2",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Online Customer",
"TRANSACTION_COUNT" => "9"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/26/2017",
"@timestamp" => 2017-07-26T04:00:00.000Z,
"SERVER" => "SVR-3",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Agents",
"TRANSACTION_COUNT" => "9"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/24/2017",
"@timestamp" => 2017-07-24T04:00:00.000Z,
"SERVER" => "SVR-1",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "In Store",
"TRANSACTION_COUNT" => "12"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/25/2017",
"@timestamp" => 2017-07-25T04:00:00.000Z,
"SERVER" => "SVR-2",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "New Customer",
"TRANSACTION_COUNT" => "13"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/26/2017",
"@timestamp" => 2017-07-26T04:00:00.000Z,
"SERVER" => "SVR-3",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Online Customer",
"TRANSACTION_COUNT" => "18"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/24/2017",
"@timestamp" => 2017-07-24T04:00:00.000Z,
"SERVER" => "SVR-2",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Agents",
"TRANSACTION_COUNT" => "21"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/25/2017",
"@timestamp" => 2017-07-25T04:00:00.000Z,
"SERVER" => "SVR-3",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "In Store",
"TRANSACTION_COUNT" => "13"
}

Summary

In above detail cover about below points:

  • Logstash File Input reading.
  • How to apply CSV filter for “|” and map with fields.
  • How to drop header line if exist in CSV file
  • Date Filter to get Index Timestamp value based on fields and pattern
  • Dynamic Index Name for each day by appending date format
  • Start Logstash on background for configuration file.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference:

https://www.elastic.co/guide/en/logstash/current/plugins-filters-csv.html

Filebeat and Kafka Integration

Kafka can consume messages published by Filebeat based on configuration  filebeat.yml file for Kafka Output.

Filebeat Kafka Output Configuration

Filebeat.yml  required below fields to connect and publish message to Kafka for configured topic. Kafka will create Topics dynamically based on filebeat requirement.

output.kafka:
#The list of Kafka broker addresses from where to fetch the cluster metadata.
#The cluster metadata contain the actual Kafka brokers events are published to.
hosts: <strong>["localhost:9092"]</strong>

# The Kafka topic used for produced events. The setting can be a format string
topic: <strong>Topic-Name</strong>

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

For more information about filebeat Kafka Output  configuration option refers below Links.

Let me know your thought on this post.

Happy Learning !!!

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Sample filebeat.yml file for Prospectors ,Kafka Output and Logging Configuration

You can copy same file in filebeat.yml  and run  after making below change as per your environment directory structure and follow steps mentioned for Filebeat Download,Installation and Start/Run

  • Change on Prospectors section for your logs file directory and file name
  • Configure Multiline pattern as per your logs format as of now set as generic hopefully will work with all pattern
  • Change on Logstash Output section for Host ,Port, Topic and other settings if required
  • Change on logging directory as per you machine directory.

Sample filebeat.yml file

#=============Filebeat prospectors ===============

filebeat.prospectors:

# Here we can define multiple prospectors and shipping method and rules  as per #requirement and if need to read logs from multiple file from same patter directory #location can use regular pattern also.

#Filebeat support only two types of input_type log and stdin

##############input type logs configuration#####################

- input_type: log

# Paths of the files from where logs will read and use regular expression if need to read #from multiple files
paths:
- /opt/app/app1/logs/app1-debug*.log*
# make this fields_under_root as true if you want filebeat json out for read files in root.
fields_under_root: true

### Multiline configuration for handeling stacktrace, Object, XML etc if that is the case #and multiline is enabled with below configuration will shipped output for these case in #multiline

# The regexp Pattern that has to be matched. The example pattern matches all lines #starting with [DEBUG,ALERT,TRACE,WARNING log level that can be customize #according to your logs line format
#multiline.pattern: '^\[([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)'

# Default is false.Defines if the pattern match  should be negated or not.
#multiline.negate: true

# multiline.match define if pattern not match with above pattern where these line need #to append.Possible values  are "after" or "before".

#multiline.match: after

# if you will set this max line after these number of multiline all will ignore
#multiline.max_lines: 50

#==========Kafka output Configuration ============================
output.kafka:
# Below enable flag is for enable or disable output module will discuss more on filebeat #moodule section
#enabled: true

# Here mentioned all your Kafka broker host and port to fetch cluster metadata which #contains published events for kafka brokers.

hosts: ["kafkahost:port"]

# We can define topic for Kafka broker where events will published.
topic: QC-LOGS

# Default no key setting. But we can use formatted key settings.
#key: ''

#Default partition strategy is 'hash' using key values set. If not set key value will #randomly distribute publish events.

#partition.hash:

# Default value  is false. If reach_only enabled event will publish only reachable kafka #brokers.
#reachable_only: false

# Configure alternative event field names used to compute the hash value.
# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []

# If authentication set on Kafka broker end below fileds are required.
#username: ''
#password: ''

#Kafka Broker version to configure so that filebeat can check compatibility with that.
#version: 0.8.2

#Meta data information is required for broker event publishing so that filbeat can take  #decision based on status of brokers.

#metadata:

#Defaults value for max 3 retries selection of available brokers.
#retry.max: 3

# Default value is 250ms. Will wait for specified time before make next retries.
#retry.backoff: 250ms

# Will update meta data information  in every 10 minutes.
#refresh_frequency: 10m

# It shows no of worker will run for each configure kafka broker.
#worker: 1

#Default value is 3. If set less than 0 filebeat will retry continuously as logs as events not #publish.
#max_retries: 3

# The Default value is 2048.It shows max number of batch events will publish to Kafka in #one request.
#bulk_max_size: 2048

#The default value is 30 second. It will timeout if not hear any response from Kafka #broker with in specified time.
#timeout: 30s
# Default is value is 10 seconds. During this max duration broker will wait for #number #of required acknowledgement.
#broker_timeout: 10s

# Default value is 256 for buffered message for Kafka broker.
#channel_buffer_size: 256

# Default value is 0 seconds  as keep alive is disabled and if this value set will keep alive #active network connection for that time.
#keep_alive: 0

# Default value for compression is gzip. We can also set other compression codec like #snappy, gzip or none.
compression: gzip

#Default value is 1000000 bytes . If Json value is more than configured max message #bytes event will dropped.
max_message_bytes: 1000000

#Default Value is 1 for ACK for reliability. Possible values can be :

#0=no response , Message can be lost on some error happens

#1=wait for local commit

#-1=wait for all replicas to commit.
#required_acks: 1

# Waiting Interval between new events and previous events for read logs.
#flush_interval: 1s

# The configurable ClientID used for logging, debugging, and auditing
# purposes. The default is "beats".

#Default value is beat. We can set values for this field that will help for analysis and #auditing purpose.
#client_id: beats

# Configure SSL setting id required for Kafk broker
#ssl.enabled: true

# Optional SSL configuration options. SSL is off by default.
# List of root certificates for HTTPS server verifications

#SSL configuration is Optional and OFF by default . It required for server verification if #HTTPS root certificate .
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

#Default value is full. SSL configuration verfication mode is required if SSL is configured .#We can use value as 'none' for testing purpose but in this mode it can accept any #certificate.
#ssl.verification_mode: full

# List of supported/valid TLS versions. By default all TLS versions 1.0 up to
# 1.2 are enabled.

#By Default  it support all TLS versions after 1.0 to 1.2. We can also mentioned version in #below array
#ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]

# Define path for certificate for SSL
#ssl.certificate: "/etc/pki/client/cert.pem"

# Define path for Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# If data is configured and shipped encrypted form. Need to add passphrase for decrypting the Certificate Key otherwise optional
#ssl.key_passphrase: ''

# Configure encryption cipher suites to be used for SSL connections
#ssl.cipher_suites: []

# Configure encryption curve types for ECDHE based cipher suites
#ssl.curve_types: []
#====================Logging ==============================

# Default log level is info if set above or below will record top this hierarchy #automatically. Available log levels are: critical, error, warning, info, debug

logging.level: debug
# Possible values for selectors are "beat", "publish" and  "service" if you want to enable #for all select value as "*". This selector decide on command line when  start filebeat.
logging.selectors: ["*"]

# The default value is false.If make it true will send out put to syslog.
logging.to_syslog: false
# The default is true. all non-zero metrics  reading are output on shutdown.
logging.metrics.enabled: true

# Period of matrics for log reading counts from log files and it will send complete report #when shutdown filebeat
logging.metrics.period: 30s
# Set this flag as true to enable logging in files if not set that will disable.
logging.to_files: true
logging.files:
# Path of directory where logs file will write if not set default directory will home #directory.
path: /tmp

# Name of files where logs will write
name: filebeat-app.log
# Log File will rotate if reach max size and will create new file. Default value is 10MB
rotateeverybytes: 10485760 # = 10MB

# This will keep recent maximum log files in directory for rotation and remove oldest #files.
keepfiles: 7
# Will enable logging for that level only. Available log levels are: critical, error, warning, #info, debug
level: debug

Sample filebeat.yml File

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues.To Know more about YAML follow link YAML Tutorials.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Sample filebeat.yml file for Prospectors,Multiline and Logging Configuration

You can copy same file in filebeat.yml  and run  after making below change as per your environment directory structure and follow steps mentioned for Filebeat Download,Installation and Start/Run

  • Change on Prospectors section for your logs file directory and file name
  • Configure Multiline pattern as per your logs format as of now set as generic hopefully will work with all pattern
  • Change on Kafka output section for Host ,Port and topic name as required
  • Change on logging directory as per you machine directory.

Sample filebeat.yml file

#=============Filebeat prospectors ===============

filebeat.prospectors:

# Here we can define multiple prospectors and shipping method and rules  as per
#requirement and if need to read logs from multiple file from same patter directory #location can use regular pattern also.

#Filebeat support only two types of input_type log and stdin

# #############input type logs configuration#####################

- input_type: log

# Paths of the files from where logs will read and use regular expression if need to read #from multiple files
paths:
- /opt/app/app1/logs/app1-debug*.log*
# make this fields_under_root as true if you want filebeat json out for read files in root.
fields_under_root: true

### Multiline configuration for handeling stacktrace, Object, XML etc if that is the case #and multiline is enabled with below configuration will shipped output for these case in #multiline

# The regexp Pattern that has to be matched. The example pattern matches all lines #starting with [DEBUG,ALERT,TRACE,WARNING log level that can be customize #according to your logs line format
multiline.pattern: '^\[([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)'

# Default is false.Defines if the pattern match  should be negated or not.
multiline.negate: true

# multiline.match define if pattern not match with above pattern where these line need #to append.Possible values  are "after" or "before".

multiline.match: after

# if you will set this max line after these number of multiline all will ignore
#multiline.max_lines: 50

#==========Kafka output Configuration ============================
output.kafka:
# Below enable flag is for enable or disable output module will discuss more on filebeat #module section
#enabled: true

# Here mentioned all your Kafka broker host and port to fetch cluster metadata which #contains published events for kafka brokers.

hosts: ["kafkahost:port"]

# We can define topic for Kafka broker where events will published.
topic: QC-LOGS

# Default no key setting. But we can use formatted key settings.
#key: ''

#Default partition strategy is 'hash' using key values set. If not set key value will #randomly distribute publish events.

#partition.hash:

# Default value  is false. If reach_only enabled event will publish only reachable kafka #brokers.
#reachable_only: false

# Configure alternative event field names used to compute the hash value.
# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []

# If authentication set on Kafka broker end below fileds are required.
#username: ''
#password: ''

#Kafka Broker version to configure so that filebeat can check compatibility with that.
#version: 0.8.2

#Meta data information is required for broker event publishing so that filbeat can take  #decision based on status of brokers.

#metadata:

#Defaults value for max 3 retries selection of available brokers.
#retry.max: 3

# Default value is 250ms. Will wait for specified time before make next retries.
#retry.backoff: 250ms

# Will update meta data information  in every 10 minutes.
#refresh_frequency: 10m

# It shows no of worker will run for each configure kafka broker.
#worker: 1

#Default value is 3. If set less than 0 filebeat will retry continuously as logs as events not #publish.
#max_retries: 3

# The Default value is 2048.It shows max number of batch events will publish to Kafka in #one request.
#bulk_max_size: 2048

#The default value is 30 second. It will timeout if not hear any response from Kafka #broker with in specified time.
#timeout: 30s
# Default is value is 10 seconds. During this max duration broker will wait for #number #of required acknowledgement.
#broker_timeout: 10s

# Default value is 256 for buffered message for Kafka broker.
#channel_buffer_size: 256

# Default value is 0 seconds  as keep alive is disabled and if this value set will keep alive #active network connection for that time.
#keep_alive: 0

# Default value for compression is gzip. We can also set other compression codec like #snappy, gzip or none.
compression: gzip

#Default value is 1000000 bytes . If Json value is more than configured max message #bytes event will dropped.
max_message_bytes: 1000000

#Default Value is 1 for ACK for reliability. Possible values can be :

#0=no response , Message can be lost on some error happens

#1=wait for local commit

#-1=wait for all replicas to commit.
#required_acks: 1

# Waiting Interval between new events and previous events for read logs.
#flush_interval: 1s

# The configurable ClientID used for logging, debugging, and auditing
# purposes. The default is "beats".

#Default value is beat. We can set values for this field that will help for analysis and #auditing purpose.
#client_id: beats

# Configure SSL setting id required for Kafk broker
#ssl.enabled: true

# Optional SSL configuration options. SSL is off by default.
# List of root certificates for HTTPS server verifications

#SSK configuration is Optional and OFF by default . It required for server verification if #HTTPS root certificate .
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

#Default value is full. SSL configuration verfication mode is required if SSL is configured #We can use value as 'none' for testing purpose but in this mode it can accept any #certificate.
#ssl.verification_mode: full

# List of supported/valid TLS versions. By default all TLS versions 1.0 up to
# 1.2 are enabled.

#By Default  it support all TLS versions after 1.0 to 1.2. We can also mentioned version in #below array
#ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]

# Define path for certificate for SSL
#ssl.certificate: "/etc/pki/client/cert.pem"

# Define path for Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# If data is configured and shipped encrypted form. Need to add passphrase for #decrypting the Certificate Key otherwise optional
#ssl.key_passphrase: ''

# Configure encryption cipher suites to be used for SSL connections
#ssl.cipher_suites: []

# Configure encryption curve types for ECDHE based cipher suites
#ssl.curve_types: []
#====================Logging ==============================

# Default log level is info if set above or below will record top this hierarchy #automatically. Available log levels are: critical, error, warning, info, debug

logging.level: debug
# Possible values for selectors are "beat", "publish" and  "service" if you want to enable #for all select value as "*". This selector decide on command line when  start filebeat.
logging.selectors: ["*"]

# The default value is false.If make it true will send out put to syslog.
logging.to_syslog: false
# The default is true. all non-zero metrics  reading are output on shutdown.
logging.metrics.enabled: true

# Period of matrics for log reading counts from log files and it will send complete report #when shutdown filebeat
logging.metrics.period: 30s
# Set this flag as true to enable logging in files if not set that will disable.
logging.to_files: true
logging.files:
# Path of directory where logs file will write if not set default directory will home #directory.
path: /tmp

# Name of files where logs will write
name: filebeat-app.log
# Log File will rotate if reach max size and will create new file. Default value is 10MB
rotateeverybytes: 10485760 # = 10MB

# This will keep recent maximum log files in directory for rotation and remove oldest #files.
keepfiles: 7
# Will enable logging for that level only. Available log levels are: critical, error, warning, #info, debug
level: debug

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues.To Know more about YAML follow link YAML Tutorials.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Sample filebeat.yml file for Prospectors and Logging Configuration

Filebeat.yml file  with Prospectors, Kafka Output and Logging Configuration

You can  copy same file in filebeat.yml  and run after making below change as per your environment directory structure and follow steps mentioned for  Filebeat Download,Installation and Start/Run

  • Change on Prospectors section for your logs file directory and file name
  • Configure Multiline pattern as per your logs format as of now set as generic hopefully will work with all pattern
  • Change on Kafka output section for Host ,Port and topic name as required
  • Change on logging directory as per you machine directory.

Below is Sample file:

#=============Filebeat prospectors ===============

filebeat.prospectors:

#Here we can define multiple prospectors and shipping method and rules  as per
#requirement and if need to read logs from multiple file from same patter directory #location can use regular pattern also.

#Filebeat support only two types of input_type log and stdin

- input_type: log

# Paths of the files from where logs will read and use regular expression if need to read #from multiple files
paths:
- /opt/app/app1/logs/app1-debug*.log*
# make this fields_under_root as true if you want filebeat json out for read files in root.
fields_under_root: true

### Multiline configuration for handeling stacktrace, Object, XML etc if that is the case #and multiline is enabled with below configuration will shipped output for these case in #multiline

# The regexp Pattern that has to be matched. The example pattern matches all lines #starting with [DEBUG,ALERT,TRACE,WARNING log level that can be customize #according to your logs line format
#multiline.pattern: '^\[([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)'

# Default is false.Defines if the pattern match  should be negated or not.
#multiline.negate: true

# multiline.match define if pattern not match with above pattern where these line need #to append.Possible values  are "after" or "before".

#multiline.match: after

# if you will set this max line after these number of multiline all will ignore
#multiline.max_lines: 50

#==========Kafka output Configuration ============================
output.kafka:
# Below enable flag is for enable or disable output module will discuss more on filebeat #module section
#enabled: true

# Here mentioned all your Kafka broker host and port to fetch cluster metadata which #contains published events for kafka brokers.

hosts: ["kafkahost:port"]

# We can define topic for Kafka broker where events will published.
topic: QC-LOGS

# Default no key setting. But we can use formatted key settings.
#key: ''

#Default partition strategy is 'hash' using key values set. If not set key value will #randomly distribute publish events.

#partition.hash:

# Default value  is false. If reach_only enabled event will publish only reachable kafka #brokers.
#reachable_only: false

# Configure alternative event field names used to compute the hash value.
# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []

# If authentication set on Kafka broker end below fileds are required.
#username: ''
#password: ''

#Kafka Broker version to configure so that filebeat can check compatibility with that.
#version: 0.8.2

#Meta data information is required for broker event publishing so that filbeat can take  #decision based on status of brokers.

#metadata:

#Defaults value for max 3 retries selection of available brokers.
#retry.max: 3

# Default value is 250ms. Will wait for specified time before make next retries.
#retry.backoff: 250ms

# Will update meta data information  in every 10 minutes.
#refresh_frequency: 10m

# It shows no of worker will run for each configure kafka broker.
#worker: 1

#Default value is 3. If set less than 0 filebeat will retry continuously as logs as events not #publish.
#max_retries: 3

# The Default value is 2048.It shows max number of batch events will publish to Kafka in #one request.
#bulk_max_size: 2048

#The default value is 30 second. It will timeout if not hear any response from Kafka #broker with in specified time.
#timeout: 30s
# Default is value is 10 seconds. During this max duration broker will wait for #number #of required acknowledgement.
#broker_timeout: 10s

# Default value is 256 for buffered message for Kafka broker.
#channel_buffer_size: 256

# Default value is 0 seconds  as keep alive is disabled and if this value set will keep alive #active network connection for that time.
#keep_alive: 0

# Default value for compression is gzip. We can also set other compression codec like #snappy, gzip or none.
compression: gzip

#Default value is 1000000 bytes . If Json value is more than configured max message #bytes event will dropped.
max_message_bytes: 1000000

#Default Value is 1 for ACK for reliability. Possible values can be :

#0=no response , Message can be lost on some error happens

#1=wait for local commit

#-1=wait for all replicas to commit.
#required_acks: 1

# Waiting Interval between new events and previous events for read logs.
#flush_interval: 1s

# The configurable ClientID used for logging, debugging, and auditing
# purposes. The default is "beats".

#Default value is beat. We can set values for this field that will help for analysis and #auditing purpose.
#client_id: beats

# Configure SSL setting id required for Kafk broker
#ssl.enabled: true

# Optional SSL configuration options. SSL is off by default.
# List of root certificates for HTTPS server verifications

#SSK configuration is Optional and OFF by default . It required for server verification if #HTTPS root certificate .
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

#Default value is full. SSL configuration verfication mode is required if SSL is configured #.We can use value as 'none' for testing purpose but in this mode it can accept any #certificate.
#ssl.verification_mode: full

# List of supported/valid TLS versions. By default all TLS versions 1.0 up to
# 1.2 are enabled.

#By Default  it support all TLS versions after 1.0 to 1.2. We can also mentioned version in #below array
#ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]

# Define path for certificate for SSL
#ssl.certificate: "/etc/pki/client/cert.pem"

# Define path for Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# If data is configured and shipped encrypted form. Need to add passphrase for #decrypting the Certificate Key otherwise optional
#ssl.key_passphrase: ''

# Configure encryption cipher suites to be used for SSL connections
#ssl.cipher_suites: []

# Configure encryption curve types for ECDHE based cipher suites
#ssl.curve_types: []
#====================Logging ==============================

# Default log level is info if set above or below will record top this hierarchy #automatically. Available log levels are: critical, error, warning, info, debug

logging.level: debug
# Possible values for selectors are "beat", "publish" and  "service" if you want to enable #for all select value as "*". This selector decide on command line when  start filebeat.
logging.selectors: ["*"]

# The default value is false.If make it true will send out put to syslog.
logging.to_syslog: false
# The default is true. all non-zero metrics  reading are output on shutdown.
logging.metrics.enabled: true

# Period of matrics for log reading counts from log files and it will send complete report #when shutdown filebeat
logging.metrics.period: 30s
# Set this flag as true to enable logging in files if not set that will disable.
logging.to_files: true
logging.files:
# Path of directory where logs file will write if not set default directory will home #directory.
path: /tmp

# Name of files where logs will write
name: filebeat-app.log
# Log File will rotate if reach max size and will create new file. Default value is 10MB
rotateeverybytes: 10485760 # = 10MB

# This will keep recent maximum log files in directory for rotation and remove oldest #files.
keepfiles: 7
# Will enable logging for that level only. Available log levels are: critical, error, warning, #info, debug
level: debug

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues.To Know more about YAML follow link YAML Tutorials.

Leave you feedback to enhance more on this topic so that make it more helpful for others.