Category Archives: Logstash

Elasticsearch Ingest Node Vs Logstash Vs Filebeat


Elasticsearch Ingest Node

Ingest node use to pre-process documents before the actual document indexing
happens. The ingest node intercepts bulk and index requests, it applies transformations, and it then passes the documents back to the index or bulk APIs.

Logstash

Logstash is a server-side data processing pipeline that ingests data from multiple sources simultaneously, transforms it, and then sends it to different output sources like Elasticsearch, Kafka Queues, Databases etc.

Filebeat

Filebeat is lightweight log shipper which reads logs from thousands of logs files and forward those log lines to centralize system like Kafka topics to further processing on Logstash, directly to Logstash or Elasticsearch search.

There is overlap in functionality between Elasticsearch Ingest Node , Logstash and Filebeat.All have there weakness and strength based on architectures and area of uses. You cam also integrate all of these Filebeat, Logstash and Elasticsearch Ingest node by minor configuration to optimize performance and analyzing of data.

Below are some key points to compare Elasticsearch Ingest Node , Logstash and Filebeat.

Points Elasticsearch Ingest Node Logstash Filebeat
Data In and Out As ingest node runs as pipeline within the indexing flow in Elasticsearch, data has to be pushed to it
through bulk or indexing requests and configure pipeline processors process documents before indexing of actively writing data
to Elasticsearch.
Logstash supports wide variety of input and output plugins. It can act as middle server to accept pushed data from clients over TCP, UDP and HTTP and filebeat, message queues and databases.
It parse and process data for variety of output sources e.g elasticseach, message queues like Kafka and RabbitMQ or long term data analysis on S3 or HDFS.
Filebeat specifically to shipped logs files data to Kafka, Logstash or Elasticsearch.
Queuing Elasticsearch Ingest Node is not having any built in queuing mechanism in to pipeline processing.
If the data nodes are not able to accept data, the ingest node will stop accepting data as well.
Logstash provide persistent queuing feature mechanism features by storing on disk. Filebeat provide queuing mechanism with out data loss.
Back-pressure Clients pushing data to ingest node need to be able to handle back-pressure by queuing data In case elasticsearch is not reachable or able to accept data for extended period otherwise there would be data loss. Logstash provide at least once delivery guarantees and buffer data locally through ingestion spikes. Filebeat designed architecture like that with out losing single bit of log line if out put systems like kafka, Logstash or Elasticsearch not available
Data Processing Ingest node comes around 20 different processors, covering the functionality of
the most commonly used Logstash plugins.

Ingest node have some limitation like pipeline can only work in the context of a single event. Processors are
also generally not able to call out to other systems or read data from disk. It’s also not having filters as in beats and logstash. Logstash has a larger selection of plugins to choose from. This includes
plugins to add or transform content based on lookups in configuration files,
Elasticsearch, Beats or relational databases.

Logstash support filtering out and dropping events based on
configurable criteria.
Beats support filtering out and dropping events based on
configurable criteria.
Configuration “Each document can only be processed by a single pipeline when passing through the ingest node.

“Logstash supports to define multiple logically separate pipelines by conditional control flow s to handle complex and multiple data formats.

Logstash is easier to measuring and optimizing performance of the pipeline to supports monitoring and resolve potential issues quickly by excellent pipeline viewer UI.

Minor configuration to read , shipping and filtering of data. But limitation with parsing.
Specialization Ingest Node pipeline processed data before doing indexing on elasticsearch. Its middle server to parse process and filter data from multiple input plugins and send processes data to output plugins. Specific to read and shipped logs from different servers to centralize location on Elasticsearch, Kafka and if require parsing processed through Logstash.
Integration Logstash supports sending data to an Ingest Pipeline. Ingest node can accept data from Filebeat and Logstash etc, Filebeat can send data to Logstash , Elasticsearch Ingest Node or Kafka.
Performance Please follow below link to check performance of each on different cases: Elasticsearch Ingest Node , Logstash and Filebeat Performance comparison.

Learn More

To know more about Elasticsearch Ingest Node, Logstash or Filebeat follow below links:

Log4j2 JSON Configuration for Java Logging Severity Levels, Formatting and Appenders


In below previous post you read about Log4j2 XML configuration for Appenders, formatters and Loggers for Console and File Logging. I have also explained about RollingFile appeneders and there management.

Below is Log4j2 JSON configuration equivalent to XML configuration and dependency required for JSON. For more info in detail and configuration steps follow previous post for Log4J2 XML configuration.

You have to add below JSON dependency in your class path or pom.xml

POM.XML

<!-- basic Log4j2 dependency -->
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.6.1</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.6.1</version>
</dependency>
<!-- Asynchronous logging for multithreaded env -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.4</version>
</dependency>

<!-- Jackson JSON Processor -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.4.1</version>
    </dependency>

create file log4j2.json in your root folder or resource folder.

{
   "configuration": {
       "status": "info",
       "monitorInterval": "60",
       "name": "FacingIssuesOnIT",
      "properties": {
         "property": [
            {
               "name": "filename",
               "value": "target/FacingIssueOnIT.log"
            },
            {
               "name": "log-path",
               "value": "C:/logs/"
            }
         ]
      },
      "appenders": {
         "Console": {
            "PatternLayout": {
               "pattern": "%d{yyyyMMdd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
            },
            "name": "STDOUT",
            "target": "SYSTEM_OUT"
         },
         "File": {
            "PatternLayout": {
               "pattern": "%d %p %c{1.} [%t] %m%n"
            },
            "name": "file",
            "fileName": "${filename}"
         },
         "RollingFile": {
            "PatternLayout": {
               "pattern": "%d{yyyyMMdd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
            },

                "Policies": {
               "SizeBasedTriggeringPolicy": {
                  "size": "100 MB"
               },
               "TimeBasedTriggeringPolicy": {
                  "interval": "1",
                  "modulate": "true"
               }
            },
            "DefaultRolloverStrategy": {
               "Delete": {
                  "IfFileName": {
                     "glob": "*/FacingIssueOnIT-*.log.gz"
                  },
                  "IfLastModified": {
                     "age": "1h"
                  },
                  "basePath": "${log-path}",
                  "maxDepth": "2"
               }
            },
            "name": "RollingFile",
            "fileName": "${log-path}/FacingIssueOnIT.log",
            "filePattern": "${log-path}/$${date:yyyy-MM-dd}/FacingIssueOnIT-%d{yyyy-MM-dd}-%i.log.gz"
         }
      },
      "Loggers": {
         "Logger": [
            {
               "name": "com.logging",
               "level": "debug"
            },
            {
               "appender-ref": {
                  "ref": "RollingFile",
                  "level": "debug"
               },
               "name": "root",
               "level": "debug",
               "additivity": "false"
            }
         ],
         "Root": {
            "AppenderRef": [
               {
                  "ref": "file",
                  "level": "error"
               },
               {
                  "ref": "STDOUT",
                  "level": "debug"
               },
               {
                  "ref": "RollingFile"
               }
            ],
            "level": "trace"
         }
      }

   }
}

How to Configure Filebeat, Kafka, Logstash Input , Elasticsearch Output and Kibana Dashboard


Filebeat, Kafka, Logstash, Elasticsearch and Kibana Integration is used for big organizations where applications deployed in production on hundreds/thousands of servers and scattered around different locations and need to do analysis on data from these servers on real time.

This integration helps mostly for log level analysis , tracking issues, anomalies with data and alerts on events of particular occurrence and where accountability measures.

By using these technology provide scalable architecture to enhance systems and decoupled of each other individually.

Why these Technology?

Filebeat :

  • Lightweight agent for shipping logs.
  • Forward and centralize files and logs.
  • Robust (Not miss a single beat)

Kafka:

  • Open source distributed, Steam Processing, Message Broker platform.
  • process stream data or transaction logs on real time.
  • fault-tolerant, high throughput, low latency platform for dealing real time data feeds.

Logstash:

  •  Open source, server-side data processing pipeline that accept data from a different  sources simultaneously.
  • Parse, Format, Transform data and send to different output sources.

Elasticsearch:

  • Elasticsearch is open source, distributed cross-platform.
  • Built on top of Lucene which provide full text search and provide NRT(Near real Time) search results.
  • Support RESTFUL search  by Elasticsearch REST

Kibana:

  • Open source
  • Provide window to view Elasticsearch data in form different charts and dashboard.
  • Provide way  searches and operation of data easily with respect to time interval.
  • Easily Imported by  any web application by embedded dashboards.

How Data flow works ?

In this integration filebeat will install in all servers where your application is deployed and filebeat will read and ship  latest logs changes from these servers to Kafka topic as configured for this application.

Logstash will subscribe log lines from kafka topic and perform parsing on these lines make relevant changes, formatting, exclude and include fields then send this processed data to Elasticsearch Indexes as centralize location from different servers.

Kibana  is linked with  Elasticsearch indexes which will help to do analysis by search, charts and dashboards .

FKLEK Integration

Design Architecture

In below configured architecture considering my application is deployed on three servers and each server having current log file name as App1.log . Our goal is read real time data from these servers and do analysis on these data.

FKLEK Arch Integration

Steps to Installation, Configuration and Start

Here first we will install Kafka and Elasticsearch run individually rest of tools will install and run sequence to test with data flow.  Initially install all in same machine  and test with sample data with below steps and at end of this post will tell about what changes need to make according to your servers.

  • Kafka Installation, Configuration and Start
  • Elasticsearch Installation,Configuration and Start
  • Filebeat Installation,Configuration and Start
  • Logstash Installation,Configuration and Start
  • Kibana Installation,Start and display.

Pre-Requisite

These Filebeat,Logstash, Elasticsearch and Kibana versions should be compatible better use latest from  https://www.elastic.co/downloads.

  • Java 8+
  • Linux Server
  • Filebeat 5.XX
  • Kafka 2.11.XX
  • Logstash 5.XX
  • Elasticsearch 5.XX
  • Kibana 5.XX

Note  : Make sure JDK 8 should be install  and JAVA_HOME environment variable point to JDK 8 home directory  wherever you want in install Elasticsearch, Logstash,Kibana and Kafka.

Window   : My computer ->right click-> Properties -> Advance System Settings->System Variable

Java_Home
Set JAVA_HOME

Linux : Go to your home directory/ sudo directory and below line as below .

export JAVA_HOME=/opt/app/facingissuesonit/jdk1.8.0_66

Sample Data

For testing we will use these sample log line which is having debug as well as stacktrace of logs and grok parsing of this example is designed according to it. For real time testing and actual data you can point to your server log files but you have to modify grok pattern in Logstash configuration accordingly.

2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}
java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist

	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)
	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}

Create  App1.log file  in same machine where filebeat need to install and copy above logs lines in App1.log file.

Kafka Installation , Configuration and Start

Download latest version of Kafka from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://kafka.apache.org/downloads

tar -zxvf kafka_2.11-0.10.0.0

For more configuration and start options follow Setup Kafka Cluster for Single Server/Broker

After download and untar/unzip file it will have below files and directory structure.

ls- l
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr  3 05:18 bin
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May  8 11:05 config
drwxr-xr-x 74 facingissuesonit Saurabh   4096 May 27 20:00 kafka-logs
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 libs
-rw-r--r--  1 facingissuesonit Saurabh  28824 Apr  3 05:17 LICENSE
drwxr-xr-x  2 facingissuesonit Saurabh 487424 May 27 20:00 logs
-rw-r--r--  1 facingissuesonit Saurabh    336 Apr  3 05:18 NOTICE
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 site-docs

For more details about all these files,configuration option and other integration options follow Kafka Tutorial.

Make below changes in files config/zookeeper.properties and config/server.properties

config/zookeeper.properties

clientPort=2181
config/server.properties:

broker.id=0
listeners=PLAINTEXT://:9092
log.dir=/kafka-logs
zookeeper.connect=localhost:2181

Now Kafka is configured and ready to run. Use below command to start zookeeper and Kafka server as  background process.

screen -d -m bin/zookeeper-server-start.sh config/zookeeper.properties
screen -d -m bin/kafka-server-start.sh config/server.properties

To test  Kafka  install successfully you can check by running Kafka process on Linux “ps -ef|grep kafka” or steps for consumer and producer to/from topic in Setup Kafka Cluster for Single Server/Broker.

Elasticsearch Installation,Configuration and Start

Download latest version of Elasticsearch from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/elasticsearch

tar -zxvf elasticsearch-5.4.0.tar.gz

It will show below files and directory structure for Elasticsearch.

drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 25 19:20 bin
drwxr-xr-x  3 facingissuesonit Saurabh   4096 May 13 17:27 config
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr 24 15:56 data
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 lib
-rw-r--r--  1 facingissuesonit Saurabh  11358 Apr 17 10:50 LICENSE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May 28 05:00 logs
drwxr-xr-x 12 facingissuesonit Saurabh   4096 Apr 17 10:55 modules
-rw-r--r--  1 facingissuesonit Saurabh 194187 Apr 17 10:55 NOTICE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 plugins
-rw-r--r--  1 facingissuesonit Saurabh   9540 Apr 17 10:50 README.textile

Before going to start Elasticsearch need to make some basic changes in config/elasticsearch.yml file for cluster  and node name. You can configure it based on you application or organization name.

cluster.name: FACING-ISSUE-IN-IT
node.name: TEST-NODE-1
#network.host: 0.0.0.0
http.port: 9200

Now we are ready with elasticsearch configuration and time start elasticsearch. We can use below command to run elasticsearch in background.

screen -d -m  /bin/elasticsearch

For  checking elasticsearch starts successfully you can use below url on browser  to know cluster status . You will get result like below.

http://localhost:9200/_cluster/health?pretty

or as below if network.host configured

http://elasticseverIp:9200/_cluster/health?pretty

Result :

{
  "cluster_name" : "FACING-ISSUE-IN-IT",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

Filebeat Installation, Configuration and Start

Download latest version of filebeat from  below link and use  command to untar  and installation in Linux server. or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/beats/filebeat

tar -zxvf filebeat-<version>.tar.gz

For more configuration and start options follow Filebeat Download,Installation and Start/Run

After download and untar/unzip file it will have below files and directory structure.

ls- l
-rwxr-xr-x 1 facingissuesonit Saurabh 14908742 Jan 11 14:11 filebeat
-rw-r--r-- 1 facingissuesonit Saurabh    31964 Jan 11 14:11 filebeat.full.yml
-rw-r--r-- 1 facingissuesonit Saurabh     3040 Jan 11 14:11 filebeat.template-es2x.json
-rw-r--r-- 1 facingissuesonit Saurabh     2397 Jan 11 14:11 filebeat.template.json
-rw-r--r-- 1 facingissuesonit Saurabh     4196 Jan 11 14:11 filebeat.yml
-rw-r--r-- 1 facingissuesonit Saurabh      811 Jan 11 14:10 README.md
drwxr-xr-x 2 facingissuesonit Saurabh     4096 Jan 11 14:11 scripts

For more details about all these files,configuration option and other integration options follow Filebeat Tutorial.

Now filebeat is installaed and need to make below changes in filebeat.full.yml file

  • Inside prospectors section change paths to your log file location as
paths:
-/opt/app/facingissuesonit/App1.log
  • Comment out Elasticsearch Output default properties as below
#output.elasticsearch:
#hosts: ["localhost:9200"]
  • Configure multiline option as below so that all stacktrace line which are not starting with date  can we consider as single line.
multiline.pattern: ^\d
multiline.negate: true
multiline.match: after

For learn more on filebeat multiline configuration follow Filebeat Multiline Configuration Changes for Object, StackTrace and XML

  • Inside Kafka Output section update these properties hosts and topic. if Kafka on same machine then use localhost else update with IP of kafka machine.
output.kafka:
 hosts: ["localhost:9092"]
 topic: APP-1-TOPIC

For more on Logging configuration follow link Filebeat, Logging Configuration.

Now filebeat is configured and ready to start with  below command, it will read from configured prospector for file App1.log continiously and publish log line events to Kafka . It will also create topic as APP-1-TOPIC in Kafka if not exist.

./filebeat -e -c filebeat.full.yml -d "publish"

On console it will display output as below for sample lines.

2017/05/28 00:24:27.991828 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 194,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991907 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 375,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
  "offset": 718,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.992Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}",
  "offset": 902,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}

Now you can see from above filebeat debug statements publish event 3 is having multiline statements with stacktrace exception and each debug will have these fields like.

@timestamp:  Timestamp of data shipped.

beat.hostname : filebeat machine name from where data is shipping.

beat.version: which version of filebeat installed on server that help for compatibility check on target end.

message : Log line from logs file or multline log lines

offset: it’s represent inode value in source file

source :  it’s file name from where logs were read

Now time to check data is publish to Kafka topic or not. For this go to below directory  and you will see two files as xyz.index and xyz.log for maintaining data offset and messages.

{Kafka_home}/kafk_logs/APP-1-TOPIC
          00000000000000000000.log
          00000000000000000000.index

Now your server log lines are in Kafka topic for reading and parsing  by Logstash and send it to elasticsearch for doing analysis/search on this data.

Logstash Installation, Configuration and Start

Download latest version of Logstash from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/logstash

tar -zxvf logstash-5.4.0.tar.gz

It will show below file and directory structure.

drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 bin
-rw-r--r-- 1 facingissuesonit Saurabh 111569 Mar 22 23:49 CHANGELOG.md
drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 config
-rw-r--r-- 1 facingissuesonit Saurabh   2249 Mar 22 23:49 CONTRIBUTORS
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 12:07 data
-rw-r--r-- 1 facingissuesonit Saurabh   3945 Mar 22 23:55 Gemfile
-rw-r--r-- 1 facingissuesonit Saurabh  21544 Mar 22 23:49 Gemfile.jruby-1.9.lock
drwxr-xr-x 5 facingissuesonit Saurabh   4096 Apr 20 11:27 lib
-rw-r--r-- 1 facingissuesonit Saurabh    589 Mar 22 23:49 LICENSE
drwxr-xr-x 2 facingissuesonit Saurabh   4096 May 21 00:00 logs
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-event-java
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-plugin-api
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-queue-jruby
-rw-r--r-- 1 facingissuesonit Saurabh  28114 Mar 22 23:56 NOTICE.TXT
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 vendor

Before going to start Logstash need to create configuration file for taking input data from Kafka and parse these data in respected fields and send it elasticsearch. Create file logstash-app1.conf in logstash bin directory with below content.

/bin/logstash-app1.conf

input {
     kafka {
            bootstrap_servers => 'localhost:9092'
            topics => ["APP-1-TOPIC"]
            codec => json {}
          }
}
filter
{
//parse log line
      grok
	{
	match => {"message" => "\A%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:loglevel}\s+(?<logger>(?:[a-zA-Z0-9-]+\.)*[A-Za-z0-9$]+)\s+(-\s+)?(?=(?<msgnr>[A-Z]+[0-9]{4,5}))*%{DATA:message}({({[^}]+},?\s*)*})?\s*$(?<stacktrace>(?m:.*))?" }
	}  

    #Remove unused fields
    #mutate { remove_field =>["beat","@version" ]}
}
output {
    #Output result sent to elasticsearch and dynamically create array
    elasticsearch {
        index  => "app1-logs-%{+YYYY.MM.dd}"
        hosts => ["localhost:9200"]
        sniffing => false
  	}

     #Sysout logs
     stdout
       {
         codec => rubydebug
       }
}

To test your configuration file you can use below command.


./logstash -t -f logstash-app1.conf

If  we get result OK from above command run below to start reading and parsing data from Kafka topic.


./logstash -f logstash-app1.conf

For design your own grok pattern for you logs line formatting you can follow below link that will help to generate incrementally and also provide some sample logs grok.

http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/

Logstash console will show parse data as below  and you can remove unsed fields for storing in elasticsearch by uncomment mutate section from configuration file.

{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 194,
      "loglevel" => "WARN",
        "logger" => "CreateSomethingActivationKey",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "WhateverException for User 49-123-345678 "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,662"
}
{
         "msgnr" => "ERR1700",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 375,
      "loglevel" => "INFO",
        "logger" => "LMLogger",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "ERR1700 - u:null failures: 0  - Technical error "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,663"
}
{
        "offset" => 718,
        "logger" => "SomeCallLogger",
    "input_type" => "log",

       "message" => [
        [0] "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
        [1] "ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  "
    ],
          "type" => "log",
         "msgnr" => "ESS10005",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
    "stacktrace" => "\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
      "loglevel" => "ERROR",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
      "@version" => "1",
     "timestamp" => "2013-02-28 09:57:56,668"
}
{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 903,
      "loglevel" => "INFO",
        "logger" => "EntryFilter",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",

       "message" => [
        [0] "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}\n",
        [1] "Fresh on request /portalservices/foobarwhatever "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 10:04:35,723"
}

To test on elasticsearch end your data sent  successfully  you can use this url
http://localhost:9200/_cat/indices  on your browser and will display created index with current date.

yellow open app1-logs-2017.05.28                             Qjs6XWiFQw2zsiVs9Ks6sw 5 1         4     0  47.3kb  47.3kb

Kibana Installation, Configuration and Start

Download latest version of Kibana from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/kibana

tar -zxvf kibana-5.4.0.tar.gz

It will show below files and directory structure for kibana.

ls -l
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 14:23 bin
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 18:58 config
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 11:54 data
-rw-r--r--   1 facingissuesonit Saurabh    562 Apr 17 12:04 LICENSE.txt
drwxr-xr-x   6 facingissuesonit Saurabh   4096 Apr 17 12:04 node
drwxr-xr-x 485 facingissuesonit Saurabh  20480 Apr 17 12:04 node_modules
-rw-r--r--   1 facingissuesonit Saurabh 660429 Apr 17 12:04 NOTICE.txt
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 optimize
-rw-r--r--   1 facingissuesonit Saurabh    702 Apr 17 12:04 package.json
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 12:29 plugins
-rw-r--r--   1 facingissuesonit Saurabh   4909 Apr 17 12:04 README.txt
drwxr-xr-x  10 facingissuesonit Saurabh   4096 Apr 17 12:04 src
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 ui_framework
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 17 12:04 webpackShims

Before going to start Kibana need to make some basic changes in config/kibana.yml file make below changes after uncomment these properties file.

server.port: 5601
server.host: localhost
elasticsearch.url: "http://localhost:9200"

Now we are ready with Kibana configuration and time start Kibana. We can use below command to run Kibana in background.

screen -d -m  /bin/kibana

Kibana take time to start and we can test it by using below url in browser

http://localhost:5601/

For checking this data  in Kibana open above url in browser go to management tab on left side menu -> Index Pattern -> Click on Add New

Enter Index name or pattern and time field name as in below screen  and click on create button.

Kibana index setting
Index Pattern Settings

Now go to Discover Tab and select index as app1-log* will display data as below.

kibana discover data

Now make below changes according to  your application specification .

Filebeat :

  • update prospector path to your log directory current file
  •  Move Kafka on different machine because Kafka will single location where receive shipped data from different servers. Update localhost with same IP of kafka server in Kafka output section of filebeat.full.yml file  for hosts properties.
  • Copy same filebeat setup on all servers from where you application deployed and need to read logs.
  • Start all filebeat instances on each Server.

Elasticsearch :

  • Uncomment network.host properties from elasticsearch.yml file for accessing by  IP address.

Logstash:

  • Update localhost in logstash-app1.conf file input section with Kafka machine IP.
  • change grok pattern in filter section according to your logs format. You can take help from below url for incrementally design. http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/
  • Update localhost output section for elasticsearch with IP if moving on different machine.

Kibana:

  • update localhost in kibana.yml file for elasticsearch.url properties with IP if kibana on different machine.

Conclusion :

In this tutorial considers below points :

  • Installation of Filebeat, Kafka, Logstash, Elasticsearch and Kibana.
  • Filebeat is configured to shipped logs to Kafka Message Broker.
  • Logstash configured to read logs line from Kafka topic , Parse and shipped to Elasticsearch.
  • Kibana show these Elasticsearch information in form of chart and dashboard to users for doing analysis.

Read More

To read more on Filebeat, Kafka, Elasticsearch  configurations follow the links and Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/products

 

Logstash , JDBC Input configuration tutorial with sql_last_value and tracking_column as numeric or timestamp


Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.

In below example I will explain about how to create Logstash configuration file by  using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .

Logstash JDBC Input configuration for Elasticsearch Output

Pre-requisite:

Sample Data:

Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.

defect_id	owned_by	severity	status	          summary	                  application	created_by	creation_date	modified_by	modified_date	assigned_to
530812		Ramesh      Severity 3	Cacelled	      Customer call 5 time 	      TEST-APP	    Saurabh	    7/3/2017 15:44	Gaurav	    8/19/2017 6:22	Development
530828	    Neha	    Severity 1	Cancelled	      Dealer Code Buyer on behalf TEST-APP-5	Rajan	    7/3/2017 16:20	Nilam	    8/17/2017 9:29	Development
540829	    Ramesh	    Severity 1	Retest Completed  Client Not want  Bulk call  TEST-APP-4	Rajiv	    7/24/2017 11:29	Raghav	    8/5/2017 20:00	IST

Configuration File :

Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.

If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.

Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf

input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library ="../jar/ojdbc6.jar";
# ORACLE Driver Class
jdbc_driver_class ="Java::oracle.jdbc.driver.OracleDriver";
# ORACLE database jdbc connection string ,  jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string ="jdbc:oracle:thin:@hostname:1521/service";
#The user and password to connect to database
jdbc_user ="username";
jdbc_password ="password";
#Use when need to read password from file
#jdbc_password_filepath ="/opt/app/password-path-location";
jdbc_paging_enabled ="true";
jdbc_page_size ="50000";
#Configure Cron to How frequent want execute query in database
schedule ="*/15 * * * *";
#Use below if query is big and want to store in separate file
#statement_filepath ="../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement ="select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=true
use_column_value =true
tracking_column =defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type ="timestamp"
record_last_run =true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path ="logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type ="t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone ="UTC";

}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert =[ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly

date {
match =["creation_date","MM/dd/yyyy HH:mm"]
timezone ="America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index = "defect-data-%{+YYYY.MM}"
hosts = ["elasticsearch-server:9200"]
document_type = "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id = "%{defect_id}"
}
#Output to console
stdout { codec = rubydebug}
}

I try to give  descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and  more information just drop comments and send email will discuss in detail.

Date Filter : This filter will map CREATION_DATE  to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.

Execution :

 [logstash-installation-dir]/bin/logstash -f transaction-jdbc-defect.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create  indexes for every month based on @timestamp value as   defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.

Result :

select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id:sql_last_value order by defect_id"
#
{
               "severity" = "Severity 3",
                "summary" = "Customer call 5 time but no response",
               "owned_by" = "Ramesh",
          "creation_date" = "7/3/2017 15:44",
          "modified_date" = "8/19/2017 6:22",
                   "type" = "t-data",
             "created_by" = "Saurabh",
             "@timestamp" = 2017-07-03T19:44:00.000Z,
            "modified_by" = "Gaurav",
               "@version" = "1",
              "defect_id" = 530812,
            "application" = "TEST-APP",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Dealer Code Buyer on behalf",
               "owned_by" = "Neha",
          "creation_date" = "7/3/2017 16:20",
          "modified_date" = "8/17/2017 9:29",
                   "type" = "t-data",
             "created_by" = "Rajan",
             "@timestamp" = 2017-07-03T20:20:00.000Z,
            "modified_by" = "Nilam",
               "@version" = "1",
              "defect_id" = 530828,
            "application" = "TEST-APP5",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Client Not want  Bulk call",
               "owned_by" = "Ramesh",
          "creation_date" = "7/24/2017 11:29",
          "modified_date" = "8/5/2017 20:00",
                   "type" = "t-data",
             "created_by" = "Rajiv",
             "@timestamp" = 2017-07-24T15:29:00.000Z,
            "modified_by" = "Raghav",
               "@version" = "1",
              "defect_id" = 540829,
            "application" = "TEST-APP4",
                 "status" = "Retest Complete",
            "assigned_to" = "IST - Integrated System Test"
}

Summary

In above detail cover about below points:

  • Logstash JDBC Input from Oracle Database.
  • JDBC Input changes for sql_last_value for numeric and timestamp
  • Read password and multi-line query from separate file.
  • Date Filter to get Index Timestamp value based on fields and pattern.
  • Dynamic Index Name for each day by appending date format.
  • Duplicate insert record prevention on Elasticsearch.
  • Start Logstash on background for configuration file.
  • Send Logstash output to Elasticsearch and Console.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

 

Logstash , JDBC Input with sql_last_value as numeric or timestamp Example


Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.

In below example I will explain about how to create Logstash configuration file by  using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .

Pre-requisite:

  • Logstash 5.xx installed
  • Elasticsearch 5.xx installed
  • Java 7/8 Installed

Sample Data:

Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.

defect_id	owned_by	severity	status	          summary	                  application	created_by	creation_date	modified_by	modified_date	assigned_to
530812		Ramesh      Severity 3	Cacelled	      Customer call 5 time 	      TEST-APP	    Saurabh	    7/3/2017 15:44	Gaurav	    8/19/2017 6:22	Development
530828	    Neha	    Severity 1	Cancelled	      Dealer Code Buyer on behalf TEST-APP-5	Rajan	    7/3/2017 16:20	Nilam	    8/17/2017 9:29	Development
540829	    Ramesh	    Severity 1	Retest Completed  Client Not want  Bulk call  TEST-APP-4	Rajiv	    7/24/2017 11:29	Raghav	    8/5/2017 20:00	IST

Configuration File :

Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.

If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.

Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf

input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library ="../jar/ojdbc6.jar";
# ORACLE Driver Class
jdbc_driver_class ="Java::oracle.jdbc.driver.OracleDriver";
# ORACLE database jdbc connection string ,  jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string ="jdbc:oracle:thin:@hostname:1521/service";
#The user and password to connect to database
jdbc_user ="username";
jdbc_password ="password";
#Use when need to read password from file
#jdbc_password_filepath ="/opt/app/password-path-location";
jdbc_paging_enabled ="true";
jdbc_page_size ="50000";
#Configure Cron to How frequent want execute query in database
schedule ="*/15 * * * *";
#Use below if query is big and want to store in separate file
#statement_filepath ="../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement ="select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=true
use_column_value =true
tracking_column =defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type ="timestamp"
record_last_run =true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path ="logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type ="t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone ="UTC";

}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert =[ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly

date {
match =["creation_date","MM/dd/yyyy HH:mm"]
timezone ="America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index = "defect-data-%{+YYYY.MM}"
hosts = ["elasticsearch-server:9200"]
document_type = "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id = "%{defect_id}"
}
#Output to console
stdout { codec = rubydebug}
}

I try to give  descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and  more information just drop comments and send email will discuss in detail.

Date Filter : This filter will map CREATION_DATE  to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.

Execution :

 [logstash-installation-dir]/bin/logstash -f transaction-jdbc-defect.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create  indexes for every month based on @timestamp value as   defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.

Result :

select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id:sql_last_value order by defect_id"
#
{
               "severity" = "Severity 3",
                "summary" = "Customer call 5 time but no response",
               "owned_by" = "Ramesh",
          "creation_date" = "7/3/2017 15:44",
          "modified_date" = "8/19/2017 6:22",
                   "type" = "t-data",
             "created_by" = "Saurabh",
             "@timestamp" = 2017-07-03T19:44:00.000Z,
            "modified_by" = "Gaurav",
               "@version" = "1",
              "defect_id" = 530812,
            "application" = "TEST-APP",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Dealer Code Buyer on behalf",
               "owned_by" = "Neha",
          "creation_date" = "7/3/2017 16:20",
          "modified_date" = "8/17/2017 9:29",
                   "type" = "t-data",
             "created_by" = "Rajan",
             "@timestamp" = 2017-07-03T20:20:00.000Z,
            "modified_by" = "Nilam",
               "@version" = "1",
              "defect_id" = 530828,
            "application" = "TEST-APP5",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Client Not want  Bulk call",
               "owned_by" = "Ramesh",
          "creation_date" = "7/24/2017 11:29",
          "modified_date" = "8/5/2017 20:00",
                   "type" = "t-data",
             "created_by" = "Rajiv",
             "@timestamp" = 2017-07-24T15:29:00.000Z,
            "modified_by" = "Raghav",
               "@version" = "1",
              "defect_id" = 540829,
            "application" = "TEST-APP4",
                 "status" = "Retest Complete",
            "assigned_to" = "IST - Integrated System Test"
}

Summary

In above detail cover about below points:

  • Logstash JDBC Input from Oracle Database.
  • JDBC Input changes for sql_last_value for numeric and timestamp
  • Read password and multi-line query from separate file.
  • Date Filter to get Index Timestamp value based on fields and pattern.
  • Dynamic Index Name for each day by appending date format.
  • Duplicate insert record prevention on Elasticsearch.
  • Start Logstash on background for configuration file.
  • Send Logstash output to Elasticsearch and Console.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

Logstash , JDBC Input Plug-in Configuration Example with Oracle Database and Output to Elasticsearch


Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.

In below example I will explain about how to create Logstash configuration file by  using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .

Logstash JDBC Input configuration for Elasticsearch Output

Pre-requisite:

Sample Data:

Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.

defect_id	owned_by	severity	status	          summary	                  application	created_by	creation_date	modified_by	modified_date	assigned_to
530812		Ramesh      Severity 3	Cacelled	      Customer call 5 time 	      TEST-APP	    Saurabh	    7/3/2017 15:44	Gaurav	    8/19/2017 6:22	Development
530828	    Neha	    Severity 1	Cancelled	      Dealer Code Buyer on behalf TEST-APP-5	Rajan	    7/3/2017 16:20	Nilam	    8/17/2017 9:29	Development
540829	    Ramesh	    Severity 1	Retest Completed  Client Not want  Bulk call  TEST-APP-4	Rajiv	    7/24/2017 11:29	Raghav	    8/5/2017 20:00	IST

Configuration File :

Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.

If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.

Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf

input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library => "../jar/ojdbc6.jar"
# ORACLE Driver Class
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
# ORACLE database jdbc connection string ,  jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string => "jdbc:oracle:thin:@hostname:1521/service"
#The user and password to connect to database
jdbc_user => "username"
jdbc_password => "password"
#Use when need to read password from file
#jdbc_password_filepath => "/opt/app/password-path-location"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#Configure Cron to How frequent want execute query in database
schedule => "*/15 * * * *"
#Use below if query is big and want to store in separate file
#statement_filepath =>"../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement => "select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=>true
use_column_value => true
tracking_column => defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type => "timestamp"
record_last_run => true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path =>"logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type => "t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone => "UTC"</code>

}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert => [ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly

date {
match => ["creation_date","MM/dd/yyyy HH:mm"]
timezone => "America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index => "defect-data-%{+YYYY.MM}"
hosts => ["elasticsearch-server:9200"]
document_type => "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id => "%{defect_id}"
}
#Output to console
stdout { codec => rubydebug}
}

I try to give  descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and  more information just drop comments and send email will discuss in detail.

Date Filter : This filter will map CREATION_DATE  to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.

Execution :

 [logstash-installation-dir]/bin/logstash -f <strong>transaction-jdbc-defect</strong>.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create  indexes for every month based on @timestamp value as   defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.

Result :

select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#
{
               "severity" => "Severity 3",
                "summary" => "Customer call 5 time but no response",
               "owned_by" => "Ramesh",
          "creation_date" => "7/3/2017 15:44",
          "modified_date" => "8/19/2017 6:22",
                   "type" => "t-data",
             "created_by" => "Saurabh",
             "@timestamp" => 2017-07-03T19:44:00.000Z,
            "modified_by" => "Gaurav",
               "@version" => "1",
              "defect_id" => 530812,
            "application" => "TEST-APP",
                 "status" => "Cancelled",
            "assigned_to" => "Development"
}
{
               "severity" => "Severity 1",
                "summary" => "Dealer Code Buyer on behalf",
               "owned_by" => "Neha",
          "creation_date" => "7/3/2017 16:20",
          "modified_date" => "8/17/2017 9:29",
                   "type" => "t-data",
             "created_by" => "Rajan",
             "@timestamp" => 2017-07-03T20:20:00.000Z,
            "modified_by" => "Nilam",
               "@version" => "1",
              "defect_id" => 530828,
            "application" => "TEST-APP5",
                 "status" => "Cancelled",
            "assigned_to" => "Development"
}
{
               "severity" => "Severity 1",
                "summary" => "Client Not want  Bulk call",
               "owned_by" => "Ramesh",
          "creation_date" => "7/24/2017 11:29",
          "modified_date" => "8/5/2017 20:00",
                   "type" => "t-data",
             "created_by" => "Rajiv",
             "@timestamp" => 2017-07-24T15:29:00.000Z,
            "modified_by" => "Raghav",
               "@version" => "1",
              "defect_id" => 540829,
            "application" => "TEST-APP4",
                 "status" => "Retest Complete",
            "assigned_to" => "IST - Integrated System Test"
}

Summary

In above detail cover about below points:

  • Logstash JDBC Input from Oracle Database.
  • JDBC Input changes for sql_last_value for numeric and timestamp
  • Read password and multi-line query from separate file.
  • Date Filter to get Index Timestamp value based on fields and pattern.
  • Dynamic Index Name for each day by appending date format.
  • Duplicate insert record prevention on Elasticsearch.
  • Start Logstash on background for configuration file.
  • Send Logstash output to Elasticsearch and Console.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

Logstash, File Input, CSV Filter and Elasticsearch Output


Logstash, File Input Plugin, CSV Filter and Elasticsearch Output Plugin Example will read data from CSV file, Logstash will parse this data and store in Elasticsearch.

Pre-Requisite

  • Logstash 5.X
  • Elasticsearch 5.X

Below Logstash configuration file is considered based data in CSV file.You can modify  this configuration file as per you data in your CSV file.

Logstash File Input CSV Filter Elasticsearch Output

Sample Data 

transactions-sample-data.txt

TRANSACTION_COUNT|TRANSACTION_DATE|TRANSACTION_TYPE|SERVER
18|07/24/2017|New Customer|SVR-1
9|07/25/2017|Online Customer|SVR-2
9|07/26/2017|Agents|SVR-3
12|07/24/2017|In Store|SVR-1
13|07/25/2017|New Customer|SVR-2
18|07/26/2017|Online Customer|SVR-3
21|07/24/2017|Agents|SVR-2
13|07/25/2017|In Store|SVR-3
15|07/26/2017|New Customer|SVR-4

Logstash Configuration File

Create Logstastash configuration file logstash- installation-dir/bin/transaction-test.conf and paste below content.

input {
    file {
        path => "/opt/app/facinissuesonit/transactions-sample-data.txt"
        start_position => beginning
    }
}
filter {
    csv {
        #add mapping columns name correspondily values assigned
        columns => ["TRANSACTION_COUNT","TRANSACTION_DATE","TRANSACTION_TYPE","SERVER"]
        separator => "|"
        remove_field => ["message"]
        }
#Date filter is used to convert date to @Timestamp sho that chart in Kibana will show as per date
    date {
        match => ["TRANSACTION_DATE", "MM/dd/yyyy"]
    }
#Remove first header line to insert in elasticsearch
    if  [TRANSACTION_TYPE] =~ "TRANSACTION_TYPE"
{
drop {}
}
}
output {

	elasticsearch {
   # Create Index based on date
		index => "app-transactions-%{+YYYY.MM.dd}"
    		hosts => ["elasticsearver:9200"]
  		}
#Console Out put
stdout
         {
         codec => rubydebug
        # debug => true
         }
}

Information about configuration file :

File Input Plugin :  will read data from file and because we set as start-position as “Beginning” will always read file form start.

CSV Filter : This filter will read each line message , split based on “|” and map with corresponding column mentioned position and finally will remove this message field because data is parsed now.

Date Filter : This filter will map TRANSACTION_DATE  to @timestamp value for Index for each document and it says to TRANSACTION_DATE  is having pattern as “MM/dd/YYYY” so that when converting to timestamp will follow same.

drop: Drop is for removing header line if field name match with content.

Run Logstash Configuration with below command

 [logstash-installation-dir]/bin/logstash -f transaction-test.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of TRANSACTION_DATE and for elasticsearch output configuration for index name app-transactions-%{+YYYY.MM.dd} will create 3 indexes based on @timestamp value as   app-transactions-2017.07.24 , app-transactions-2017.07.25, app-transactions-2017.07.26 for sample data.

{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/24/2017",
"@timestamp" => 2017-07-24T04:00:00.000Z,
"SERVER" => "SVR-1",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "New Customer",
"TRANSACTION_COUNT" => "18"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/25/2017",
"@timestamp" => 2017-07-25T04:00:00.000Z,
"SERVER" => "SVR-2",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Online Customer",
"TRANSACTION_COUNT" => "9"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/26/2017",
"@timestamp" => 2017-07-26T04:00:00.000Z,
"SERVER" => "SVR-3",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Agents",
"TRANSACTION_COUNT" => "9"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/24/2017",
"@timestamp" => 2017-07-24T04:00:00.000Z,
"SERVER" => "SVR-1",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "In Store",
"TRANSACTION_COUNT" => "12"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/25/2017",
"@timestamp" => 2017-07-25T04:00:00.000Z,
"SERVER" => "SVR-2",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "New Customer",
"TRANSACTION_COUNT" => "13"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/26/2017",
"@timestamp" => 2017-07-26T04:00:00.000Z,
"SERVER" => "SVR-3",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Online Customer",
"TRANSACTION_COUNT" => "18"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/24/2017",
"@timestamp" => 2017-07-24T04:00:00.000Z,
"SERVER" => "SVR-2",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "Agents",
"TRANSACTION_COUNT" => "21"
}
{
"path" => "/opt/app/facinissuesonit/transactions-sample-data.txt",
"TRANSACTION_DATE" => "07/25/2017",
"@timestamp" => 2017-07-25T04:00:00.000Z,
"SERVER" => "SVR-3",
"@version" => "1",
"host" => "facingissuesonit.saurabh.com",
"TRANSACTION_TYPE" => "In Store",
"TRANSACTION_COUNT" => "13"
}

Summary

In above detail cover about below points:

  • Logstash File Input reading.
  • How to apply CSV filter for “|” and map with fields.
  • How to drop header line if exist in CSV file
  • Date Filter to get Index Timestamp value based on fields and pattern
  • Dynamic Index Name for each day by appending date format
  • Start Logstash on background for configuration file.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference:

https://www.elastic.co/guide/en/logstash/current/plugins-filters-csv.html

Logstash Installation, Configuration and Start


Download latest version of Logstash from below link and use command to unTar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/logstash

tar -zxvf logstash-5.4.0.tar.gz

It will show below file and directory structure.

drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 bin
-rw-r--r-- 1 facingissuesonit Saurabh 111569 Mar 22 23:49 CHANGELOG.md
drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 config
-rw-r--r-- 1 facingissuesonit Saurabh   2249 Mar 22 23:49 CONTRIBUTORS
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 12:07 data
-rw-r--r-- 1 facingissuesonit Saurabh   3945 Mar 22 23:55 Gemfile
-rw-r--r-- 1 facingissuesonit Saurabh  21544 Mar 22 23:49 Gemfile.jruby-1.9.lock
drwxr-xr-x 5 facingissuesonit Saurabh   4096 Apr 20 11:27 lib
-rw-r--r-- 1 facingissuesonit Saurabh    589 Mar 22 23:49 LICENSE
drwxr-xr-x 2 facingissuesonit Saurabh   4096 May 21 00:00 logs
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-event-java
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-plugin-api
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-queue-jruby
-rw-r--r-- 1 facingissuesonit Saurabh  28114 Mar 22 23:56 NOTICE.TXT
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 vendor

Before going to start Logstash need to create configuration file for taking input from different input sources like file, csv, jdbc, json, kafka, filebeat etc.  and parse these data in respected fields and send it to output like elasticsearch, file, kafka etc.

Logstash Configuration file will follow below syntax  as i have created file logstash-app1.conf in logstash bin directory . Please follow Logstash Tutorial for more Input, Filter and Output plugin Examples.

/bin/logstash-app1.conf

input {
     kafka {
            ....
          }
    jdbc {
            ....
          }
}
filter
{
//parse log line or data...
      grok
	{
	....
	}
}
output {
    #Output result sent to elasticsearch
    elasticsearch {
       ....
  	}
     #Sysout to console
     stdout
       {
         codec => rubydebug
       }
}

To test your configuration file you can use below command.


./logstash -t -f logstash-app1.conf

If  we get result OK means no any syntax, compile time issue with configuation file from above command.Now run below to start reading and parsing data from different sources.


./logstash -f logstash-app1.conf

To run logstash in background follow command as so that when close your console your Logstash process will keep running.


screen -d -m ./logstash -f logstash-app1.conf

Summary

In above detail cover about below points:

  • How to Install Logstash on Linux Environment.
  • Configuration file Syntax and validation.
  • Start Logstash for configuration file.
  • Start Logstash on background for configuration file.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana


Filebeat, Kafka, Logstash, Elasticsearch and Kibana Integration is used for big organizations where applications deployed in production on hundreds/thousands of servers and scattered around different locations and need to do analysis on data from these servers on real time.

This integration helps mostly for log level analysis , tracking issues, anomalies with data and alerts on events of particular occurrence and where accountability measures.

By using these technology provide scalable architecture to enhance systems and decoupled of each other individually.

Why these Technology?

Filebeat :

  • Lightweight agent for shipping logs.
  • Forward and centralize files and logs.
  • Robust (Not miss a single beat)

Kafka:

  • Open source distributed, Steam Processing, Message Broker platform.
  • process stream data or transaction logs on real time.
  • fault-tolerant, high throughput, low latency platform for dealing real time data feeds.

Logstash:

  •   Open source, server-side data processing pipeline that accept data from a different  sources simultaneously.
  • Parse, Format, Transform data and send to different output sources.

Elasticsearch:

  • Elasticsearch is open source, distributed cross-platform.
  • Built on top of Lucene which provide full text search and provide NRT(Near real Time) search results.
  • Support RESTFUL search  by Elasticsearch REST

Kibana:

  • Open source
  • Provide window to view Elasticsearch data in form different charts and dashboard.
  • Provide way  searches and operation of data easily with respect to time interval.
  • Easily Imported by  any web application by embedded dashboards.

How Data flow works ?

In this integration filebeat will install in all servers where your application is deployed and filebeat will read and ship  latest logs changes from these servers to Kafka topic as configured for this application.

Logstash will subscribe log lines from kafka topic and perform parsing on these lines make relevant changes, formatting, exclude and include fields then send this processed data to Elasticsearch Indexes as centralize location from different servers.

Kibana  is linked with  Elasticsearch indexes which will help to do analysis by search, charts and dashboards .

FKLEK Integration

Design Architecture

In below configured architecture considering my application is deployed on three servers and each server having current log file name as App1.log . Our goal is read real time data from these servers and do analysis on these data.

FKLEK Arch Integration

Steps to Installation, Configuration and Start

Here first we will install Kafka and Elasticsearch run individually rest of tools will install and run sequence to test with data flow.  Initially install all in same machine  and test with sample data with below steps and at end of this post will tell about what changes need to make according to your servers.

  • Kafka Installation, Configuration and Start
  • Elasticsearch Installation,Configuration and Start
  • Filebeat Installation,Configuration and Start
  • Logstash Installation,Configuration and Start
  • Kibana Installation,Start and display.

Pre-Requisite

These Filebeat,Logstash, Elasticsearch and Kibana versions should be compatible better use latest from  https://www.elastic.co/downloads.

  • Java 8+
  • Linux Server
  • Filebeat 5.XX
  • Kafka 2.11.XX
  • Logstash 5.XX
  • Elasticsearch 5.XX
  • Kibana 5.XX

Note  : Make sure JDK 8 should be install  and JAVA_HOME environment variable point to JDK 8 home directory  wherever you want in install Elasticsearch, Logstash,Kibana and Kafka.

Window   : My computer ->right click-> Properties -> Advance System Settings->System Variable

Java_Home
Set JAVA_HOME

Linux : Go to your home directory/ sudo directory and below line as below .

export JAVA_HOME=/opt/app/facingissuesonit/jdk1.8.0_66

Sample Data

For testing we will use these sample log line which is having debug as well as stacktrace of logs and grok parsing of this example is designed according to it. For real time testing and actual data you can point to your server log files but you have to modify grok pattern in Logstash configuration accordingly.

2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}
java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist

	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)
	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}

Create  App1.log file  in same machine where filebeat need to install and copy above logs lines in App1.log file.

Kafka Installation , Configuration and Start

Download latest version of Kafka from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://kafka.apache.org/downloads

tar -zxvf kafka_2.11-0.10.0.0

For more configuration and start options follow Setup Kafka Cluster for Single Server/Broker

After download and untar/unzip file it will have below files and directory structure.

ls- l
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr  3 05:18 bin
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May  8 11:05 config
drwxr-xr-x 74 facingissuesonit Saurabh   4096 May 27 20:00 kafka-logs
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 libs
-rw-r--r--  1 facingissuesonit Saurabh  28824 Apr  3 05:17 LICENSE
drwxr-xr-x  2 facingissuesonit Saurabh 487424 May 27 20:00 logs
-rw-r--r--  1 facingissuesonit Saurabh    336 Apr  3 05:18 NOTICE
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 site-docs

For more details about all these files,configuration option and other integration options follow Kafka Tutorial.

Make below changes in files config/zookeeper.properties and config/server.properties

config/zookeeper.properties

clientPort=2181
config/server.properties:

broker.id=0
listeners=PLAINTEXT://:9092
log.dir=/kafka-logs
zookeeper.connect=localhost:2181

Now Kafka is configured and ready to run. Use below command to start zookeeper and Kafka server as  background process.

screen -d -m bin/zookeeper-server-start.sh config/zookeeper.properties
screen -d -m bin/kafka-server-start.sh config/server.properties

To test  Kafka  install successfully you can check by running Kafka process on Linux “ps -ef|grep kafka” or steps for consumer and producer to/from topic in Setup Kafka Cluster for Single Server/Broker.

Elasticsearch Installation,Configuration and Start

Download latest version of Elasticsearch from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/elasticsearch

tar -zxvf elasticsearch-5.4.0.tar.gz

It will show below files and directory structure for Elasticsearch.

drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 25 19:20 bin
drwxr-xr-x  3 facingissuesonit Saurabh   4096 May 13 17:27 config
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr 24 15:56 data
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 lib
-rw-r--r--  1 facingissuesonit Saurabh  11358 Apr 17 10:50 LICENSE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May 28 05:00 logs
drwxr-xr-x 12 facingissuesonit Saurabh   4096 Apr 17 10:55 modules
-rw-r--r--  1 facingissuesonit Saurabh 194187 Apr 17 10:55 NOTICE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 plugins
-rw-r--r--  1 facingissuesonit Saurabh   9540 Apr 17 10:50 README.textile

Before going to start Elasticsearch need to make some basic changes in config/elasticsearch.yml file for cluster  and node name. You can configure it based on you application or organization name.

cluster.name: FACING-ISSUE-IN-IT
node.name: TEST-NODE-1
#network.host: 0.0.0.0
http.port: 9200

Now we are ready with elasticsearch configuration and time start elasticsearch. We can use below command to run elasticsearch in background.

screen -d -m  /bin/elasticsearch

For  checking elasticsearch starts successfully you can use below url on browser  to know cluster status . You will get result like below.

http://localhost:9200/_cluster/health?pretty

or as below if network.host configured

http://elasticseverIp:9200/_cluster/health?pretty

Result :

{
  "cluster_name" : "FACING-ISSUE-IN-IT",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

Filebeat Installation, Configuration and Start

Download latest version of filebeat from  below link and use  command to untar  and installation in Linux server. or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/beats/filebeat

tar -zxvf filebeat-<version>.tar.gz

For more configuration and start options follow Filebeat Download,Installation and Start/Run

After download and untar/unzip file it will have below files and directory structure.

ls- l
-rwxr-xr-x 1 facingissuesonit Saurabh 14908742 Jan 11 14:11 filebeat
-rw-r--r-- 1 facingissuesonit Saurabh    31964 Jan 11 14:11 filebeat.full.yml
-rw-r--r-- 1 facingissuesonit Saurabh     3040 Jan 11 14:11 filebeat.template-es2x.json
-rw-r--r-- 1 facingissuesonit Saurabh     2397 Jan 11 14:11 filebeat.template.json
-rw-r--r-- 1 facingissuesonit Saurabh     4196 Jan 11 14:11 filebeat.yml
-rw-r--r-- 1 facingissuesonit Saurabh      811 Jan 11 14:10 README.md
drwxr-xr-x 2 facingissuesonit Saurabh     4096 Jan 11 14:11 scripts

For more details about all these files,configuration option and other integration options follow Filebeat Tutorial.

Now filebeat is installaed and need to make below changes in filebeat.full.yml file

  • Inside prospectors section change paths to your log file location as
paths:
-/opt/app/facingissuesonit/App1.log
  • Comment out Elasticsearch Output default properties as below
#output.elasticsearch:
#hosts: ["localhost:9200"]
  • Configure multiline option as below so that all stacktrace line which are not starting with date  can we consider as single line.
multiline.pattern: ^\d
multiline.negate: true
multiline.match: after

For learn more on filebeat multiline configuration follow Filebeat Multiline Configuration Changes for Object, StackTrace and XML

  • Inside Kafka Output section update these properties hosts and topic. if Kafka on same machine then use localhost else update with IP of kafka machine.
output.kafka:
 hosts: ["localhost:9092"]
 topic: APP-1-TOPIC

For more on Logging configuration follow link Filebeat, Logging Configuration.

Now filebeat is configured and ready to start with  below command, it will read from configured prospector for file App1.log continiously and publish log line events to Kafka . It will also create topic as APP-1-TOPIC in Kafka if not exist.

./filebeat -e -c filebeat.full.yml -d "publish"

On console it will display output as below for sample lines.

2017/05/28 00:24:27.991828 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 194,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991907 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 375,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
  "offset": 718,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.992Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}",
  "offset": 902,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}

Now you can see from above filebeat debug statements publish event 3 is having multiline statements with stacktrace exception and each debug will have these fields like.

@timestamp:  Timestamp of data shipped.

beat.hostname : filebeat machine name from where data is shipping.

beat.version: which version of filebeat installed on server that help for compatibility check on target end.

message : Log line from logs file or multline log lines

offset: it’s represent inode value in source file

source :  it’s file name from where logs were read

Now time to check data is publish to Kafka topic or not. For this go to below directory  and you will see two files as xyz.index and xyz.log for maintaining data offset and messages.

{Kafka_home}/kafk_logs/APP-1-TOPIC
          00000000000000000000.log
          00000000000000000000.index

Now your server log lines are in Kafka topic for reading and parsing  by Logstash and send it to elasticsearch for doing analysis/search on this data.

Logstash Installation, Configuration and Start

Download latest version of Logstash from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/logstash

tar -zxvf logstash-5.4.0.tar.gz

It will show below file and directory structure.

drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 bin
-rw-r--r-- 1 facingissuesonit Saurabh 111569 Mar 22 23:49 CHANGELOG.md
drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 config
-rw-r--r-- 1 facingissuesonit Saurabh   2249 Mar 22 23:49 CONTRIBUTORS
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 12:07 data
-rw-r--r-- 1 facingissuesonit Saurabh   3945 Mar 22 23:55 Gemfile
-rw-r--r-- 1 facingissuesonit Saurabh  21544 Mar 22 23:49 Gemfile.jruby-1.9.lock
drwxr-xr-x 5 facingissuesonit Saurabh   4096 Apr 20 11:27 lib
-rw-r--r-- 1 facingissuesonit Saurabh    589 Mar 22 23:49 LICENSE
drwxr-xr-x 2 facingissuesonit Saurabh   4096 May 21 00:00 logs
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-event-java
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-plugin-api
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-queue-jruby
-rw-r--r-- 1 facingissuesonit Saurabh  28114 Mar 22 23:56 NOTICE.TXT
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 vendor

Before going to start Logstash need to create configuration file for taking input data from Kafka and parse these data in respected fields and send it elasticsearch. Create file logstash-app1.conf in logstash bin directory with below content.

/bin/logstash-app1.conf

input {
     kafka {
            bootstrap_servers => 'localhost:9092'
            topics => ["APP-1-TOPIC"]
            codec => json {}
          }
}
filter
{
//parse log line
      grok
	{
	match => {"message" => "\A%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:loglevel}\s+(?<logger>(?:[a-zA-Z0-9-]+\.)*[A-Za-z0-9$]+)\s+(-\s+)?(?=(?<msgnr>[A-Z]+[0-9]{4,5}))*%{DATA:message}({({[^}]+},?\s*)*})?\s*$(?<stacktrace>(?m:.*))?" }
	}  

    #Remove unused fields
    #mutate { remove_field =>["beat","@version" ]}
}
output {
    #Output result sent to elasticsearch and dynamically create array
    elasticsearch {
        index  => "app1-logs-%{+YYYY.MM.dd}"
        hosts => ["localhost:9200"]
        sniffing => false
  	}

     #Sysout logs
     stdout
       {
         codec => rubydebug
       }
}

To test your configuration file you can use below command.


./logstash -t -f logstash-app1.conf

If  we get result OK from above command run below to start reading and parsing data from Kafka topic.


./logstash -f logstash-app1.conf

For design your own grok pattern for you logs line formatting you can follow below link that will help to generate incrementally and also provide some sample logs grok.

http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/

Logstash console will show parse data as below  and you can remove unsed fields for storing in elasticsearch by uncomment mutate section from configuration file.

{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 194,
      "loglevel" => "WARN",
        "logger" => "CreateSomethingActivationKey",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "WhateverException for User 49-123-345678 "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,662"
}
{
         "msgnr" => "ERR1700",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 375,
      "loglevel" => "INFO",
        "logger" => "LMLogger",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "ERR1700 - u:null failures: 0  - Technical error "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,663"
}
{
        "offset" => 718,
        "logger" => "SomeCallLogger",
    "input_type" => "log",

       "message" => [
        [0] "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
        [1] "ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  "
    ],
          "type" => "log",
         "msgnr" => "ESS10005",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
    "stacktrace" => "\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
      "loglevel" => "ERROR",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
      "@version" => "1",
     "timestamp" => "2013-02-28 09:57:56,668"
}
{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 903,
      "loglevel" => "INFO",
        "logger" => "EntryFilter",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",

       "message" => [
        [0] "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}\n",
        [1] "Fresh on request /portalservices/foobarwhatever "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 10:04:35,723"
}

To test on elasticsearch end your data sent  successfully  you can use this url
http://localhost:9200/_cat/indices  on your browser and will display created index with current date.

yellow open app1-logs-2017.05.28                             Qjs6XWiFQw2zsiVs9Ks6sw 5 1         4     0  47.3kb  47.3kb

Kibana Installation, Configuration and Start

Download latest version of Kibana from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/kibana

tar -zxvf kibana-5.4.0.tar.gz

It will show below files and directory structure for kibana.

ls -l
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 14:23 bin
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 18:58 config
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 11:54 data
-rw-r--r--   1 facingissuesonit Saurabh    562 Apr 17 12:04 LICENSE.txt
drwxr-xr-x   6 facingissuesonit Saurabh   4096 Apr 17 12:04 node
drwxr-xr-x 485 facingissuesonit Saurabh  20480 Apr 17 12:04 node_modules
-rw-r--r--   1 facingissuesonit Saurabh 660429 Apr 17 12:04 NOTICE.txt
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 optimize
-rw-r--r--   1 facingissuesonit Saurabh    702 Apr 17 12:04 package.json
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 12:29 plugins
-rw-r--r--   1 facingissuesonit Saurabh   4909 Apr 17 12:04 README.txt
drwxr-xr-x  10 facingissuesonit Saurabh   4096 Apr 17 12:04 src
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 ui_framework
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 17 12:04 webpackShims

Before going to start Kibana need to make some basic changes in config/kibana.yml file make below changes after uncomment these properties file.

server.port: 5601
server.host: localhost
elasticsearch.url: "http://localhost:9200"

Now we are ready with Kibana configuration and time start Kibana. We can use below command to run Kibana in background.

screen -d -m  /bin/kibana

Kibana take time to start and we can test it by using below url in browser

http://localhost:5601/

For checking this data  in Kibana open above url in browser go to management tab on left side menu -> Index Pattern -> Click on Add New

Enter Index name or pattern and time field name as in below screen  and click on create button.

Kibana index setting
Index Pattern Settings

Now go to Discover Tab and select index as app1-log* will display data as below.

kibana discover data

Now make below changes according to  your application specification .

Filebeat :

  • update prospector path to your log directory current file
  •  Move Kafka on different machine because Kafka will single location where receive shipped data from different servers. Update localhost with same IP of kafka server in Kafka output section of filebeat.full.yml file  for hosts properties.
  • Copy same filebeat setup on all servers from where you application deployed and need to read logs.
  • Start all filebeat instances on each Server.

Elasticsearch :

  • Uncomment network.host properties from elasticsearch.yml file for accessing by  IP address.

Logstash:

  • Update localhost in logstash-app1.conf file input section with Kafka machine IP.
  • change grok pattern in filter section according to your logs format. You can take help from below url for incrementally design. http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/
  • Update localhost output section for elasticsearch with IP if moving on different machine.

Kibana:

  • update localhost in kibana.yml file for elasticsearch.url properties with IP if kibana on different machine.

ce

Logstash Input and Output to/from Kafka Example



Logstash can take input from Kafka to parse data  and send parsed output to Kafka for streaming to other Application.

Kafka Input Configuration in Logstash

Below are basic configuration for Logstash to consume messages from Logstash. For more information about Logstash, Kafka Input configuration  refer this elasticsearch site Link

input {
   kafka {
       bootstrap_servers => 'KafkaServer:9092'
       topics => ["TopicName"]
       codec => json {}
        }
}

bootstrap_servers : Default value is “localhost:9092”. Here it takes list of all servers connections in the form of  host1:port1,host2:port2  to establish the initial connection to the cluster. It will connect with other if one server is down.

topics: List of topics to subscribe from where it will consume messages.

Kafka Output Configuration in Logstash

Below are basic configuration for Logstash to publish messages to Logstash. For more information about Logstash, Kafka Output configuration  refer this elasticsearch site Link

output {
        kafka {
        bootstrap_servers => "localhost:9092"
        topic_id => 'TopicName'
        }
      }

bootstrap_servers : Default value is “localhost:9092”. Here it takes list of all servers connections in the form of  host1:port1,host2:port2   and producer will only use it for getting metadata(topics, partitions and replicas) .The socket connections for sending the actual data will be established based on the broker information returned in the metadata.

topic_id: Topic name where messages will publish.

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Logstash Connection Refused by Elasticsearch Over Proxy or AIC


We faced connection refused issue if  trying to Logstash output data to Elasticsearch over HTTP that happen because of Proxy configuration or if Elasticsearch on cloud environment.

Generally we faced below exception

[2017-04-24T10:45:32,933][WARN ][logstash.outputs.elasticsearch] UNEXPECTED POOL ERROR {:e=>#}
[2017-04-24T10:45:32,934][ERROR][logstash.outputs.elasticsearch] Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down? {:error_message=>"No Available connections", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError", :will_retry_in_second
Logstash allow proxy declaration in configuration file for Elasticsearch Output as given below in field proxy. For user userid and password is having any special symbol than you have to use ASCII value. For example my password is music@123 then after converting to ASCII value for that is %40 my password become music%40123. Refer this link ASCII CODE for getting ASCII value corresponding to each character.
proxy => "http://userid:passowrd@proxyhost:8080"

For  example my userid and password is “smart” and “music@123” below proxy configuration like

proxy => "http://smart:music%40123@proxyhost:8080"

How to set Proxy in Logstash Configuration for Elasticsearch Output?

output {
    elasticsearch {
       index => "app1-logs-%{+YYYY.MM.dd}"
       proxy => "http://smart:music%40123@proxyhost:8080"
       hosts => ["elasticServerHost:elasticServerPort"]
       }
}

Issues Solution

For more Logstash issues solution follow link Common Logstash Issues.

Logstash Custom Grok Pattern


Logstash provide some predefined grok pattern for some standard cases like URL , INT, GREEDYDATA, WORD etc. We can customize and define our own grok pattern also.

Why do we need customize Grok Pattern?

If our requirement is define our own grok pattern because need to configure on multiple configuration files for same pattern so that in future any thing change on pattern on log format just need to update on one place only and will reflect on all files.

How to define own Grok Pattern?

  • Go to Logstash installation directory and follow below path to edit grok-pattern file.
Logstash-Installation-directory/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.0.2/patterns
  • Grok-Pattern file define grok  in below form and same way we can define our own grok pattern.
Name regular expression for same
  • Consume define Grok Pattern  in your logstash configuration file for grok filter as given in below example.

Example : Suppose our requirement is to parse below log line and retrieve all information like Loglevel, timestamp, ClassName, threadNumber and logContent.

Log statement :

[DEBUG|20161226 134758 956] (ElasticManagerImpl@ExecuteThread: '297' for queue: 'weblogic.kernel.Default') {Using Weblogic-specific timeout values for context request. RequestTimeout: 7200000 RMIClientTimeout: 7200000}

As per our requirement  divide complete log line in sub part with different fields like as below.

logLevel:DEBUG

timestamp: 20161226 134758 956

className: ElasticManagerImpl

threadNumber:297

logContent: Using Weblogic-specific timeout values for context request. RequestTimeout: 7200000 RMIClientTimeout: 7200000

for above parse information grok predefine patterns are there like LOGLEVEL for logs level , INT for thread number , WORD for className and GREEDYDATA for logContent but there is no grok pattern matching for timestamp so we can define our own pattern in grok-pattern file.

LOG_TIMESTAMP %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{SPACE}%{HOUR}%{MINUTE}%{SECOND}%{SPACE}%{INT:milliseconds}

Grok Pattern for Logstash:

In Logstash configuration file will define grok pattern filter as given below.


grok{

match => {"message" => "(?m)^\[%{LOGLEVEL:loglevel}%{SPACE}*\|%{LOG_TIMESTAMP:timestamp \]\]%{SPACE}\(%{GREEDYDATA:className}@%{GREEDYDATA}%{NUMBER:threadNumber}%{GREEDYDATA}\)%{SPACE}\{+?%{GREEDYDATA:logContent\}" }

}

Issues Solution

For more Logstash issues solution follow link Common Logstash Issues.

Sample filebeat.yml file for Prospectors ,Logstash Output and Logging Configuration


You can copy same file in filebeat.yml  and run  after making below change as per your environment directory structure and follow steps mentioned for Filebeat Download,Installation and Start/Run

  • Change on Prospectors section for your logs file directory and file name
  • Configure Multiline pattern as per your logs format as of now set as generic hopefully will work with all pattern
  • Change on Logstash Output section for Host ,Port and other settings if required
  • Change on logging directory as per you machine directory.

Sample filebeat.yml file

#=============Filebeat prospectors ===============

filebeat.prospectors:

# Here we can define multiple prospectors and shipping method and rules  as per #requirement and if need to read logs from multiple file from same patter directory #location can use regular pattern also.

#Filebeat support only two types of input_type log and stdin

##############input type logs configuration#####################

- input_type: log

# Paths of the files from where logs will read and use regular expression if need to read #from multiple files
paths:
- /opt/app/app1/logs/app1-debug*.log*
# make this fields_under_root as true if you want filebeat json out for read files in root.
fields_under_root: true

### Multiline configuration for handeling stacktrace, Object, XML etc if that is the case #and multiline is enabled with below configuration will shipped output for these case in #multiline

# The regexp Pattern that has to be matched. The example pattern matches all lines #starting with [DEBUG,ALERT,TRACE,WARNING log level that can be customize #according to your logs line format
#multiline.pattern: '^\[([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)'

# Default is false.Defines if the pattern match  should be negated or not.
#multiline.negate: true

# multiline.match define if pattern not match with above pattern where these line need #to append.Possible values  are "after" or "before".

#multiline.match: after

# if you will set this max line after these number of multiline all will ignore
#multiline.max_lines: 50
#=========Logstash Output Configuration=======================
output.logstash:
# Below enable flag is for enable or disable output module will discuss more on filebeat #module section.
#enabled: true

#  Here mentioned all your logstash server host and port to publish events. Default port #for logstash is 5044 if Logstash listener start with different port then use same here.
#hosts: ["logstashserver:5044"]

# It shows no of worker will run for each configure Logstash host.
#worker: 1

#Filebeat provide gzip compression level which varies from 1 to 9. As compression level #increase processing speed will reduce but network speed increase.By default #compression level disable and value is 0.
#compression_level: 3

# Default value is false.  If set to true will check status of hosts if unresponsive will send #to another available host. if false filebeat will select random host and send events to it.
#loadbalance: true

# Default value is 0 means pipeline disabled. Configure value decide of pipeline  batches #to send to logstash asynchronously and wait for response. If pipeline value is written #means output will blocking.
#pipelining: 0

#Filebeat use SOCKS5 protocol to communicate with Logstash servers. If any proxy #configure for this protocol on server end then we can overcome by setting below #details.

# SOCKS5 proxy URL
#proxy_url: socks5://userid:pwd@socks5-server:2233

# Default value is false means resolve host name resolution on  proxy server. If value is #set as true Logstash host name resolution locally for proxy.
#proxy_use_local_resolver: false

# Configure SSL setting id required for Logstash broker if SSL is configured
#ssl.enabled: true

# Optional SSL configuration options. SSL is off by default.
# List of root certificates for HTTPS server verifications

#SSK configuration is Optional and OFF by default . It required for server verification if #HTTPS root certificate .
#ssl.certificate_authorities: ["/app/pki/root/ca.pem"]

#Default value is full. SSL configuration verfication mode is required if SSL is configured #We can use value as 'none' for testing purpose but in this mode it can accept any #certificate.
#ssl.verification_mode: full

# List of supported/valid TLS versions. By default all TLS versions 1.0 up to
# 1.2 are enabled.

#By Default  it support all TLS versions after 1.0 to 1.2. We can also mentioned version in #below array
#ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]

# Define path for certificate for SSL
#ssl.certificate: "/etc/pki/client/cert.pem"

# Define path for Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# If data is configured and shipped encrypted form. Need to add passphrase for #decrypting the Certificate Key otherwise optional
#ssl.key_passphrase: ''

# Configure encryption cipher suites to be used for SSL connections
#ssl.cipher_suites: []

# Configure encryption curve types for ECDHE based cipher suites
#ssl.curve_types: []
#====================Logging ==============================

# Default log level is info if set above or below will record top this hierarchy #automatically. Available log levels are: critical, error, warning, info, debug

logging.level: debug
# Possible values for selectors are "beat", "publish" and  "service" if you want to enable #for all select value as "*". This selector decide on command line when  start filebeat.
logging.selectors: ["*"]

# The default value is false.If make it true will send out put to syslog.
logging.to_syslog: false
# The default is true. all non-zero metrics  reading are output on shutdown.
logging.metrics.enabled: true

# Period of matrics for log reading counts from log files and it will send complete report #when shutdown filebeat
logging.metrics.period: 30s
# Set this flag as true to enable logging in files if not set that will disable.
logging.to_files: true
logging.files:
# Path of directory where logs file will write if not set default directory will home #directory.
path: /tmp

# Name of files where logs will write
name: filebeat-app.log
# Log File will rotate if reach max size and will create new file. Default value is 10MB
rotateeverybytes: 10485760 # = 10MB

# This will keep recent maximum log files in directory for rotation and remove oldest #files.
keepfiles: 7
# Will enable logging for that level only. Available log levels are: critical, error, warning, #info, debug
level: debug

Sample filebeat.yml File

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues.To Know more about YAML follow link YAML Tutorials.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Filebeat, Logstash Output Configuration


If need  to shipped server logs lines  directly to Logstash. Follow below steps:

Pre-Requisite :

  • Create Logstash Configuration file  with input section mentioned same port as configured in filebeat for logstash listener. Default port for logstash is 5044.
  • Start Logstash with same configuration file.

Logstash Output  Required Configuration :

  • Comment out output.elasticsearch output section and uncomment output.logstash section
  • Set enabled value is true to make logstash output as enabled
  • Set host  of server where Logstash is running for listening  by default port for Logstash is 5044 if any change use same port value.
output.logstash:
 enabled:true
#use localhost if on same machine and same port                                                                    useby  logstash listener
 hosts:["logstashserver:5044"]

Other Optional Configurations:

Logstash Output Compression Configuration:

Filebeat provide gzip compression level which varies from 1 to 9. As compression level increase processing speed will reduce but network speed increase.By default compression level disable and value is 0.

compress_level:0

Logstash Output Performance Configuration:

worker:  we can configure number of worker for each host publishing events to elasticseach which will do load balancing.

loadbalance: Default value is false.  If set to true will check status of hosts if unresponsive will send to another available host. if false filebeat will select random host and send events to it.

pipelining: Default value is 0 means pipeline disabled. Configure value decide of pipeline  batches to send to logstash asynchronously and wait for response. If pipeline value is written means output will blocking.

Logstash Output Proxy Configuration: Filebeat use SOCKS5 protocol to communicate with logstash servers. If any proxy configure for this protocol on server end then we can overcome by setting below details.

proxy_url:socks5://userid:pwd@socks5-server:2233

proxy_use_local_resolver: Default value is false means resolve host name resolution on  proxy server. If value is set as true Logstash host name resolution locally for proxy.

Sample configuration file

Sample filebeat.yml file for Logstash Output

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues. To know more about YML follow link YAML Tutorials.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Filebeat Download,Installation and Start/Run


Filebeat Latest Version : 6.2.4

Filebeat Download Link :  https://www.elastic.co/downloads/beats/filebeat

 Download filebeat from above link according to your Operating System and copy to directory where you want to install.

 Installation on Linux :  Go to directory where tar file was copied and use below    command to install it.


tar –zxvf filebeat--linux-x86.tar.gz

Installation on Windows: Go to directory where zip file was copied and unzip  file.


unzip file filebeat--window-xxx.zip file

Before Test and Run filebeat installation need to make below configuration changes in filbeat.yml file for prospectors,Output ,logging etc. Prospectors changes are required rest of changes optional and decide based on application requirements.

Required Change:

 Optional Change : Based on your Application Requirement

 Run/Start Filebeat On Linux:


./filebeat -e -c filebeat.yml -d "publish"

For running filebeat in background add “screen –d –m” as given below:


screen -d -m ./filebeat -e -c filebeat.yml -d "publish"

For Logging filebeat output to log file remove –e option from command as given below and follow link Filebeat Configuration Changes for Logging  for more info.


./filebeat  -c filebeat.yml -d "publish"

screen -d -m ./filebeat  -c filebeat.yml -d "publish"

Filebeat 5 added new features of passing command line arguments while start filebeat. This is really helpful because no change required in filebeat.yml configuration file specifics to servers and and pass server specific information over command line. If in future your servers scaling and changes in output port and machine IP for elasticsearch or kafka or logstash. Then configuration team need to update only command line arguments for specific information and no change in configuration file.

Run/Start Filebeat with command line Arguments in Forground:


./filebeat -c filebeat.yml -d publish -E server= -E file=app1.log  -E tz=CDT -E                     kafkaHost=IP:PORT

Run In Background :


screen –d –m ./filebeat -c filebeat.yml -d publish -E server= -E  file=app1.log  -E tz=CDT -E kafkaHost=IP:PORT

Here -E option represents argument values are passing from command line will set in respective position in filebeat.yml configuration file. Follow link Filebeat Commandline Arguments setting in configuration file .

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues. To know more about YAML/YML follow YAML Tutorials

Leave you feedback to enhance more on this topic so that make it more helpful for others.