Tag Archives: Logstash

Elasticsearch Ingest Node Vs Logstash Vs Filebeat

Elasticsearch Ingest Node

Ingest node use to pre-process documents before the actual document indexing
happens. The ingest node intercepts bulk and index requests, it applies transformations, and it then passes the documents back to the index or bulk APIs.

Logstash

Logstash is a server-side data processing pipeline that ingests data from multiple sources simultaneously, transforms it, and then sends it to different output sources like Elasticsearch, Kafka Queues, Databases etc.

Filebeat

Filebeat is lightweight log shipper which reads logs from thousands of logs files and forward those log lines to centralize system like Kafka topics to further processing on Logstash, directly to Logstash or Elasticsearch search.

There is overlap in functionality between Elasticsearch Ingest Node , Logstash and Filebeat.All have there weakness and strength based on architectures and area of uses. You cam also integrate all of these Filebeat, Logstash and Elasticsearch Ingest node by minor configuration to optimize performance and analyzing of data.

Below are some key points to compare Elasticsearch Ingest Node , Logstash and Filebeat.

Elasticsearch Ingest Node Vs Logstash Vs Filebeat

PointsElasticsearch Ingest NodeLogstashFilebeat
Data In and OutAs ingest node runs as pipeline within the indexing flow in Elasticsearch, data has to be pushed to it
through bulk or indexing requests and configure pipeline processors process documents before indexing of actively writing data
to Elasticsearch.
Logstash supports wide variety of input and output plugins. It can act as middle server to accept pushed data from clients over TCP, UDP and HTTP and filebeat, message queues and databases.
It parse and process data for variety of output sources e.g elasticseach, message queues like Kafka and RabbitMQ or long term data analysis on S3 or HDFS.
Filebeat specifically to shipped logs files data to Kafka, Logstash or Elasticsearch.
QueuingElasticsearch Ingest Node is not having any built in queuing mechanism in to pipeline processing.
If the data nodes are not bale to accept data, the ingest node will stop accepting data as well.
Logstash provide persistent queuing feature mechanism features by storing on disk.Filebeat provide queuing mechanism with out data loss.
Back-pressureClients pushing data to ingest node need to be able to handle back-pressure by queuing data In case elasticsearch is not reachable or able to accept data for extended period otherwise there would be data loss.Logstash provide at least once delivery guarantees and buffer data locally through ingestion spikes.Filebeat designed architecture like that with out losing single bit of log line if out put systems like kafka, Logstash or Elasticsearch not available
Data ProcessingIngest node comes around 20 different processors, covering the functionality of
the most commonly used Logstash plugins.

Ingest node have some limitation like pipeline can only work in the context of a single event. Processors are
also generally not able to call out to other systems or read data from disk. It's also not having filters as in beats and logstash.
Logstash has a larger selection of plugins to choose from. This includes
plugins to add or transform content based on lookups in configuration files,
Elasticsearch, Beats or relational databases.
Logstash support filtering out and dropping events based on
configurable criteria.
Beats support filtering out and dropping events based on
configurable criteria.
ConfigurationEach document can only be processed by a single pipeline when passing through the ingest node.

Logstash supports to define multiple logically separate pipelines by conditional control flow s to handle complex and multiple data formats.

Logstash is easier to measuring and optimizing performance of the pipeline to supports monitoring and resolve potential issues quickly by excellent pipeline viewer UI.
Minor configuration to read , shipping and filtering of data. But limitation with parsing.
SpecializationIngest Node pipeline processed data before doing indexing on elasticsearch.Its middle server to parse process and filter data from multiple input plugins and send processes data to output plugins.Specific to read and shipped logs from different servers to centralize location on Elasticsearch, Kafka and if require parsing processed through Logstash.
IntegrationLogstash supports sending data to an Ingest Pipeline.Ingest node can accept data from Filebeat and Logstash etc,Filebeat can send data to Logstash , Elasticsearch Ingest Node or Kafka.
PerformancePlease follow below link to check performance of each on different cases:Elasticsearch Ingest Node , Logstash and Filebeat Performance comparison.

Know More

To know more about Elasticsearch Ingest Node, Logstash or Filebeat follow below links:

Advertisements

Logstash , JDBC Input configuration tutorial with sql_last_value and tracking_column as numeric or timestamp

Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.

In below example I will explain about how to create Logstash configuration file by  using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .

Logstash JDBC Input configuration for Elasticsearch Output

Pre-requisite:

Sample Data:

Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.

defect_id	owned_by	severity	status	          summary	                  application	created_by	creation_date	modified_by	modified_date	assigned_to
530812		Ramesh      Severity 3	Cacelled	      Customer call 5 time 	      TEST-APP	    Saurabh	    7/3/2017 15:44	Gaurav	    8/19/2017 6:22	Development
530828	    Neha	    Severity 1	Cancelled	      Dealer Code Buyer on behalf TEST-APP-5	Rajan	    7/3/2017 16:20	Nilam	    8/17/2017 9:29	Development
540829	    Ramesh	    Severity 1	Retest Completed  Client Not want  Bulk call  TEST-APP-4	Rajiv	    7/24/2017 11:29	Raghav	    8/5/2017 20:00	IST

Configuration File :

Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.

If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.

Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf

input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library ="../jar/ojdbc6.jar";
# ORACLE Driver Class
jdbc_driver_class ="Java::oracle.jdbc.driver.OracleDriver";
# ORACLE database jdbc connection string ,  jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string ="jdbc:oracle:thin:@hostname:1521/service";
#The user and password to connect to database
jdbc_user ="username";
jdbc_password ="password";
#Use when need to read password from file
#jdbc_password_filepath ="/opt/app/password-path-location";
jdbc_paging_enabled ="true";
jdbc_page_size ="50000";
#Configure Cron to How frequent want execute query in database
schedule ="*/15 * * * *";
#Use below if query is big and want to store in separate file
#statement_filepath ="../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement ="select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=true
use_column_value =true
tracking_column =defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type ="timestamp"
record_last_run =true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path ="logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type ="t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone ="UTC";

}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert =[ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly

date {
match =["creation_date","MM/dd/yyyy HH:mm"]
timezone ="America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index = "defect-data-%{+YYYY.MM}"
hosts = ["elasticsearch-server:9200"]
document_type = "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id = "%{defect_id}"
}
#Output to console
stdout { codec = rubydebug}
}

I try to give  descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and  more information just drop comments and send email will discuss in detail.

Date Filter : This filter will map CREATION_DATE  to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.

Execution :

 [logstash-installation-dir]/bin/logstash -f transaction-jdbc-defect.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create  indexes for every month based on @timestamp value as   defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.

Result :

select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id:sql_last_value order by defect_id"
#
{
               "severity" = "Severity 3",
                "summary" = "Customer call 5 time but no response",
               "owned_by" = "Ramesh",
          "creation_date" = "7/3/2017 15:44",
          "modified_date" = "8/19/2017 6:22",
                   "type" = "t-data",
             "created_by" = "Saurabh",
             "@timestamp" = 2017-07-03T19:44:00.000Z,
            "modified_by" = "Gaurav",
               "@version" = "1",
              "defect_id" = 530812,
            "application" = "TEST-APP",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Dealer Code Buyer on behalf",
               "owned_by" = "Neha",
          "creation_date" = "7/3/2017 16:20",
          "modified_date" = "8/17/2017 9:29",
                   "type" = "t-data",
             "created_by" = "Rajan",
             "@timestamp" = 2017-07-03T20:20:00.000Z,
            "modified_by" = "Nilam",
               "@version" = "1",
              "defect_id" = 530828,
            "application" = "TEST-APP5",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Client Not want  Bulk call",
               "owned_by" = "Ramesh",
          "creation_date" = "7/24/2017 11:29",
          "modified_date" = "8/5/2017 20:00",
                   "type" = "t-data",
             "created_by" = "Rajiv",
             "@timestamp" = 2017-07-24T15:29:00.000Z,
            "modified_by" = "Raghav",
               "@version" = "1",
              "defect_id" = 540829,
            "application" = "TEST-APP4",
                 "status" = "Retest Complete",
            "assigned_to" = "IST - Integrated System Test"
}

Summary

In above detail cover about below points:

  • Logstash JDBC Input from Oracle Database.
  • JDBC Input changes for sql_last_value for numeric and timestamp
  • Read password and multi-line query from separate file.
  • Date Filter to get Index Timestamp value based on fields and pattern.
  • Dynamic Index Name for each day by appending date format.
  • Duplicate insert record prevention on Elasticsearch.
  • Start Logstash on background for configuration file.
  • Send Logstash output to Elasticsearch and Console.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

 

Logstash , JDBC Input with sql_last_value as numeric or timestamp Example

Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.

In below example I will explain about how to create Logstash configuration file by  using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .

Pre-requisite:

  • Logstash 5.xx installed
  • Elasticsearch 5.xx installed
  • Java 7/8 Installed

Sample Data:

Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.

defect_id	owned_by	severity	status	          summary	                  application	created_by	creation_date	modified_by	modified_date	assigned_to
530812		Ramesh      Severity 3	Cacelled	      Customer call 5 time 	      TEST-APP	    Saurabh	    7/3/2017 15:44	Gaurav	    8/19/2017 6:22	Development
530828	    Neha	    Severity 1	Cancelled	      Dealer Code Buyer on behalf TEST-APP-5	Rajan	    7/3/2017 16:20	Nilam	    8/17/2017 9:29	Development
540829	    Ramesh	    Severity 1	Retest Completed  Client Not want  Bulk call  TEST-APP-4	Rajiv	    7/24/2017 11:29	Raghav	    8/5/2017 20:00	IST

Configuration File :

Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.

If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.

Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf

input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library ="../jar/ojdbc6.jar";
# ORACLE Driver Class
jdbc_driver_class ="Java::oracle.jdbc.driver.OracleDriver";
# ORACLE database jdbc connection string ,  jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string ="jdbc:oracle:thin:@hostname:1521/service";
#The user and password to connect to database
jdbc_user ="username";
jdbc_password ="password";
#Use when need to read password from file
#jdbc_password_filepath ="/opt/app/password-path-location";
jdbc_paging_enabled ="true";
jdbc_page_size ="50000";
#Configure Cron to How frequent want execute query in database
schedule ="*/15 * * * *";
#Use below if query is big and want to store in separate file
#statement_filepath ="../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement ="select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=true
use_column_value =true
tracking_column =defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type ="timestamp"
record_last_run =true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path ="logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type ="t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone ="UTC";

}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert =[ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly

date {
match =["creation_date","MM/dd/yyyy HH:mm"]
timezone ="America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index = "defect-data-%{+YYYY.MM}"
hosts = ["elasticsearch-server:9200"]
document_type = "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id = "%{defect_id}"
}
#Output to console
stdout { codec = rubydebug}
}

I try to give  descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and  more information just drop comments and send email will discuss in detail.

Date Filter : This filter will map CREATION_DATE  to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.

Execution :

 [logstash-installation-dir]/bin/logstash -f transaction-jdbc-defect.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create  indexes for every month based on @timestamp value as   defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.

Result :

select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id:sql_last_value order by defect_id"
#
{
               "severity" = "Severity 3",
                "summary" = "Customer call 5 time but no response",
               "owned_by" = "Ramesh",
          "creation_date" = "7/3/2017 15:44",
          "modified_date" = "8/19/2017 6:22",
                   "type" = "t-data",
             "created_by" = "Saurabh",
             "@timestamp" = 2017-07-03T19:44:00.000Z,
            "modified_by" = "Gaurav",
               "@version" = "1",
              "defect_id" = 530812,
            "application" = "TEST-APP",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Dealer Code Buyer on behalf",
               "owned_by" = "Neha",
          "creation_date" = "7/3/2017 16:20",
          "modified_date" = "8/17/2017 9:29",
                   "type" = "t-data",
             "created_by" = "Rajan",
             "@timestamp" = 2017-07-03T20:20:00.000Z,
            "modified_by" = "Nilam",
               "@version" = "1",
              "defect_id" = 530828,
            "application" = "TEST-APP5",
                 "status" = "Cancelled",
            "assigned_to" = "Development"
}
{
               "severity" = "Severity 1",
                "summary" = "Client Not want  Bulk call",
               "owned_by" = "Ramesh",
          "creation_date" = "7/24/2017 11:29",
          "modified_date" = "8/5/2017 20:00",
                   "type" = "t-data",
             "created_by" = "Rajiv",
             "@timestamp" = 2017-07-24T15:29:00.000Z,
            "modified_by" = "Raghav",
               "@version" = "1",
              "defect_id" = 540829,
            "application" = "TEST-APP4",
                 "status" = "Retest Complete",
            "assigned_to" = "IST - Integrated System Test"
}

Summary

In above detail cover about below points:

  • Logstash JDBC Input from Oracle Database.
  • JDBC Input changes for sql_last_value for numeric and timestamp
  • Read password and multi-line query from separate file.
  • Date Filter to get Index Timestamp value based on fields and pattern.
  • Dynamic Index Name for each day by appending date format.
  • Duplicate insert record prevention on Elasticsearch.
  • Start Logstash on background for configuration file.
  • Send Logstash output to Elasticsearch and Console.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

Logstash , JDBC Input Plug-in Configuration Example with Oracle Database and Output to Elasticsearch

Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.

In below example I will explain about how to create Logstash configuration file by  using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .

Logstash JDBC Input configuration for Elasticsearch Output

Pre-requisite:

Sample Data:

Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.

defect_id	owned_by	severity	status	          summary	                  application	created_by	creation_date	modified_by	modified_date	assigned_to
530812		Ramesh      Severity 3	Cacelled	      Customer call 5 time 	      TEST-APP	    Saurabh	    7/3/2017 15:44	Gaurav	    8/19/2017 6:22	Development
530828	    Neha	    Severity 1	Cancelled	      Dealer Code Buyer on behalf TEST-APP-5	Rajan	    7/3/2017 16:20	Nilam	    8/17/2017 9:29	Development
540829	    Ramesh	    Severity 1	Retest Completed  Client Not want  Bulk call  TEST-APP-4	Rajiv	    7/24/2017 11:29	Raghav	    8/5/2017 20:00	IST

Configuration File :

Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.

If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.

Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf

input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library => "../jar/ojdbc6.jar"
# ORACLE Driver Class
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
# ORACLE database jdbc connection string ,  jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string => "jdbc:oracle:thin:@hostname:1521/service"
#The user and password to connect to database
jdbc_user => "username"
jdbc_password => "password"
#Use when need to read password from file
#jdbc_password_filepath => "/opt/app/password-path-location"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#Configure Cron to How frequent want execute query in database
schedule => "*/15 * * * *"
#Use below if query is big and want to store in separate file
#statement_filepath =>"../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement => "select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=>true
use_column_value => true
tracking_column => defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type => "timestamp"
record_last_run => true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path =>"logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type => "t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone => "UTC"</code>

}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert => [ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly

date {
match => ["creation_date","MM/dd/yyyy HH:mm"]
timezone => "America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index => "defect-data-%{+YYYY.MM}"
hosts => ["elasticsearch-server:9200"]
document_type => "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id => "%{defect_id}"
}
#Output to console
stdout { codec => rubydebug}
}

I try to give  descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and  more information just drop comments and send email will discuss in detail.

Date Filter : This filter will map CREATION_DATE  to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.

Execution :

 [logstash-installation-dir]/bin/logstash -f <strong>transaction-jdbc-defect</strong>.conf

For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start

Logstash Console Output

If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create  indexes for every month based on @timestamp value as   defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.

Result :

select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#
{
               "severity" => "Severity 3",
                "summary" => "Customer call 5 time but no response",
               "owned_by" => "Ramesh",
          "creation_date" => "7/3/2017 15:44",
          "modified_date" => "8/19/2017 6:22",
                   "type" => "t-data",
             "created_by" => "Saurabh",
             "@timestamp" => 2017-07-03T19:44:00.000Z,
            "modified_by" => "Gaurav",
               "@version" => "1",
              "defect_id" => 530812,
            "application" => "TEST-APP",
                 "status" => "Cancelled",
            "assigned_to" => "Development"
}
{
               "severity" => "Severity 1",
                "summary" => "Dealer Code Buyer on behalf",
               "owned_by" => "Neha",
          "creation_date" => "7/3/2017 16:20",
          "modified_date" => "8/17/2017 9:29",
                   "type" => "t-data",
             "created_by" => "Rajan",
             "@timestamp" => 2017-07-03T20:20:00.000Z,
            "modified_by" => "Nilam",
               "@version" => "1",
              "defect_id" => 530828,
            "application" => "TEST-APP5",
                 "status" => "Cancelled",
            "assigned_to" => "Development"
}
{
               "severity" => "Severity 1",
                "summary" => "Client Not want  Bulk call",
               "owned_by" => "Ramesh",
          "creation_date" => "7/24/2017 11:29",
          "modified_date" => "8/5/2017 20:00",
                   "type" => "t-data",
             "created_by" => "Rajiv",
             "@timestamp" => 2017-07-24T15:29:00.000Z,
            "modified_by" => "Raghav",
               "@version" => "1",
              "defect_id" => 540829,
            "application" => "TEST-APP4",
                 "status" => "Retest Complete",
            "assigned_to" => "IST - Integrated System Test"
}

Summary

In above detail cover about below points:

  • Logstash JDBC Input from Oracle Database.
  • JDBC Input changes for sql_last_value for numeric and timestamp
  • Read password and multi-line query from separate file.
  • Date Filter to get Index Timestamp value based on fields and pattern.
  • Dynamic Index Name for each day by appending date format.
  • Duplicate insert record prevention on Elasticsearch.
  • Start Logstash on background for configuration file.
  • Send Logstash output to Elasticsearch and Console.

Read More

To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

Filebeat “connection reset by peer”

Below issue occurred while Filebeat trying to publish events to Logstash.

Failed to publish events caused by: write tcp 10.0.2.12:33822->10.2.3.17:5044: write: connection reset by peer
2017-05-06T12:55:33-05:00 INFO Error publishing events (retrying): write tcp 10.0.2.12:33822->10.2.3.17:5044: write: connection reset by peer

This can cause by many reasons :

  • Logstash is not running  or port is used by some other instance of Logstash.
  • Filebeat and Logstash versions are not compatible.
  • If fireball is enabled between different VM versions then there should be same certificate to connect with Logstash.

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

 

Kafka Server Properties Configuration

Kafka provide server level properties for configuration of  Broker, Socket, Zookeeper, Buffering, Retention etc.

broker.id :  This broker id which is unique integer value in Kafka cluster.

broker.id:0

Socket Server Settings :

listeners: default value is PLAINTEXT://:9092  where socket servers listens and if not configured will take from java.net.InetAddress.getCanonicalHostName()

Format: security_protocol://host_name:port

listeners:PLAINTEXT://:9092

advertised.listeners: Need to set this value if listeners value is not set. Broker will advertise this listener value to producers and consumers.

Example: PLAINTEXT://your.host.name:9092

advertised.listeners:PLAINTEXT://:9092

num.network.threads: Threads handling network requests.

num.network.threads:3

num.io.threads: Number of threads handling I/O for disk.

num.io.threads:8

socket.send.buffer.bytes: Buffer size used by socket server to keep records for sending.

socket.send.buffer.bytes:102400

socket.receive.buffer.bytes: Buffer size used by socket server to keep records for sending.

socket.receive.buffer.bytes:102400

socket.request.max.bytes: max size of request that the socket server will accept.

socket.request.max.bytes:104857600

Log Basics

log.dirs: a comma separated list of directories under which to store log files.

log.dirs=/tmp/kafka-logs

num.partitions: The default number of logs per topic. More partitions allow greater parallelism for consumption, but this will also result in more files across the brokers.

num.partitions=1

num.recovery.threads.per.data.dir: The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1

Log Flush Policy

log.flush.interval.messages:  Messages are immediately written to the file system but by default we only fsync() to sync the OS cache lazily. The following configurations control the flush of data to disk.There are a few important trade-offs here:

  1. Durability: Unflushed data may be lost if you are not using replication.
  2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  3.  Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.

The settings below allow one to configure the flush policy to flush data after a period of time or every N messages (or both). This can be done globally and overridden on a per-topic basis.

The number of messages to accept before forcing a flush of data to disk.

log.flush.interval.messages=10000

The maximum amount of time a message can sit in a log before we force a flush.

log.flush.interval.ms=1000

Log Retention Policy

The following configurations control the disposal of log segments. The policy can be set to delete segments after a period of time, or after a given size has accumulated. A segment will be deleted whenever either of these criteria are met. Deletion always happens from the end of the log.

log.retention.hours:The minimum age of a log file to be eligible for deletion

log.retention.hours=168

log.retention.bytes:A size-based retention policy for logs. Segments are pruned from the log as long as the remaining segments don’t drop below log.retention.bytes.

log.retention.bytes=1073741824

log.segment.bytes:The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824

log.retention.check.interval.ms: The interval at which log segments are checked to see if they can be deleted according to the retention policies

log.retention.check.interval.ms=300000

Zookeeper

zookeeper.connect: Zookeeper connection string is a comma separated host:port pairs, each corresponding to a zk server. e.g. “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”. We can also append an optional root string to the urls to specify theroot directory for all kafka znodes.

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms: Timeout in ms for connecting to zookeeper.

zookeeper.connection.timeout.ms=6000

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Sample filebeat.yml file for Prospectors ,Logstash Output and Logging Configuration

You can copy same file in filebeat.yml  and run  after making below change as per your environment directory structure and follow steps mentioned for Filebeat Download,Installation and Start/Run

  • Change on Prospectors section for your logs file directory and file name
  • Configure Multiline pattern as per your logs format as of now set as generic hopefully will work with all pattern
  • Change on Logstash Output section for Host ,Port and other settings if required
  • Change on logging directory as per you machine directory.

Sample filebeat.yml file

#=============Filebeat prospectors ===============

filebeat.prospectors:

# Here we can define multiple prospectors and shipping method and rules  as per #requirement and if need to read logs from multiple file from same patter directory #location can use regular pattern also.

#Filebeat support only two types of input_type log and stdin

##############input type logs configuration#####################

- input_type: log

# Paths of the files from where logs will read and use regular expression if need to read #from multiple files
paths:
- /opt/app/app1/logs/app1-debug*.log*
# make this fields_under_root as true if you want filebeat json out for read files in root.
fields_under_root: true

### Multiline configuration for handeling stacktrace, Object, XML etc if that is the case #and multiline is enabled with below configuration will shipped output for these case in #multiline

# The regexp Pattern that has to be matched. The example pattern matches all lines #starting with [DEBUG,ALERT,TRACE,WARNING log level that can be customize #according to your logs line format
#multiline.pattern: '^\[([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)'

# Default is false.Defines if the pattern match  should be negated or not.
#multiline.negate: true

# multiline.match define if pattern not match with above pattern where these line need #to append.Possible values  are "after" or "before".

#multiline.match: after

# if you will set this max line after these number of multiline all will ignore
#multiline.max_lines: 50
#=========Logstash Output Configuration=======================
output.logstash:
# Below enable flag is for enable or disable output module will discuss more on filebeat #module section.
#enabled: true

#  Here mentioned all your logstash server host and port to publish events. Default port #for logstash is 5044 if Logstash listener start with different port then use same here.
#hosts: ["logstashserver:5044"]

# It shows no of worker will run for each configure Logstash host.
#worker: 1

#Filebeat provide gzip compression level which varies from 1 to 9. As compression level #increase processing speed will reduce but network speed increase.By default #compression level disable and value is 0.
#compression_level: 3

# Default value is false.  If set to true will check status of hosts if unresponsive will send #to another available host. if false filebeat will select random host and send events to it.
#loadbalance: true

# Default value is 0 means pipeline disabled. Configure value decide of pipeline  batches #to send to logstash asynchronously and wait for response. If pipeline value is written #means output will blocking.
#pipelining: 0

#Filebeat use SOCKS5 protocol to communicate with Logstash servers. If any proxy #configure for this protocol on server end then we can overcome by setting below #details.

# SOCKS5 proxy URL
#proxy_url: socks5://userid:pwd@socks5-server:2233

# Default value is false means resolve host name resolution on  proxy server. If value is #set as true Logstash host name resolution locally for proxy.
#proxy_use_local_resolver: false

# Configure SSL setting id required for Logstash broker if SSL is configured
#ssl.enabled: true

# Optional SSL configuration options. SSL is off by default.
# List of root certificates for HTTPS server verifications

#SSK configuration is Optional and OFF by default . It required for server verification if #HTTPS root certificate .
#ssl.certificate_authorities: ["/app/pki/root/ca.pem"]

#Default value is full. SSL configuration verfication mode is required if SSL is configured #We can use value as 'none' for testing purpose but in this mode it can accept any #certificate.
#ssl.verification_mode: full

# List of supported/valid TLS versions. By default all TLS versions 1.0 up to
# 1.2 are enabled.

#By Default  it support all TLS versions after 1.0 to 1.2. We can also mentioned version in #below array
#ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]

# Define path for certificate for SSL
#ssl.certificate: "/etc/pki/client/cert.pem"

# Define path for Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# If data is configured and shipped encrypted form. Need to add passphrase for #decrypting the Certificate Key otherwise optional
#ssl.key_passphrase: ''

# Configure encryption cipher suites to be used for SSL connections
#ssl.cipher_suites: []

# Configure encryption curve types for ECDHE based cipher suites
#ssl.curve_types: []
#====================Logging ==============================

# Default log level is info if set above or below will record top this hierarchy #automatically. Available log levels are: critical, error, warning, info, debug

logging.level: debug
# Possible values for selectors are "beat", "publish" and  "service" if you want to enable #for all select value as "*". This selector decide on command line when  start filebeat.
logging.selectors: ["*"]

# The default value is false.If make it true will send out put to syslog.
logging.to_syslog: false
# The default is true. all non-zero metrics  reading are output on shutdown.
logging.metrics.enabled: true

# Period of matrics for log reading counts from log files and it will send complete report #when shutdown filebeat
logging.metrics.period: 30s
# Set this flag as true to enable logging in files if not set that will disable.
logging.to_files: true
logging.files:
# Path of directory where logs file will write if not set default directory will home #directory.
path: /tmp

# Name of files where logs will write
name: filebeat-app.log
# Log File will rotate if reach max size and will create new file. Default value is 10MB
rotateeverybytes: 10485760 # = 10MB

# This will keep recent maximum log files in directory for rotation and remove oldest #files.
keepfiles: 7
# Will enable logging for that level only. Available log levels are: critical, error, warning, #info, debug
level: debug

Sample filebeat.yml File

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues.To Know more about YAML follow link YAML Tutorials.

Leave you feedback to enhance more on this topic so that make it more helpful for others.