Tag Archives: Example

Log4j2 Java Logging Example Tutorial – XML Configuration, Severity Levels, Formatting and Appenders


Why Logging?

 

Logging information refers to the recording of your application activity that help in analyzing runtime behavior of application especially when encounters unexpected scenarios, errors or tracking steps executed by any request. As much as logging is done will easy to analyze any issues and bugs in the code.

Now a days more companies are moving to cloud and focusing on monitor logs and log analysis. There are some tools for centralize log management such as Logstash, Loggy, Graylog etc.

Follow link to know more about How to do centralize logging by Logstash when logs scattered on multiple servers.

There are so many JAVA logging frameworks and tools such as log4j, log4j2, slf4j, tinylog, logback etc. But here we mainly focus on Apache Log4j2 severity level, configuration file ways and java logging.

Log4j2 New Features,Compare with Log4j and other Logging Framework

How to do Logging?

Java provides standard Logging API to work as wrapper over different Logging framework. Compatible frameworks can be loaded into JVM and accessed via the API. There is also a default logging framework implementation provided by the Sun JVM which accessed by the API. Many developers confuse this implementation with the Java Logging API.

Logging is broken into three major parts:

  • Logger : The Logger is responsible for capturing the message to be logged along with certain metadata and passing it to the logging framework. These messages can be  an object, debug text or exceptions with an optional severity level.
  • Formatter: After receiving the message formatter do formatting with output.
  • Appender :Formatted message output will go to appender for disposition. Appenders might include console display, appending to database, log file or email etc.

Severity Level :

In logging framework always maintain the current configured logging level for each logger. That configured severity level can be set more or less restrictive.

For example : As we know each log message will logged at certain level. suppose the logging level is set to “WARNING”, then all messages of that level or higher are logged, ERROR and FATAL.

Below is list of all severity level from top to bottom. If any lower severity level configured all severity level above of it will by default consider.

  1. FATAL: Severe errors that cause premature termination. Expect these to be immediately visible on a status console.
  2. ERROR: Other runtime errors or unexpected conditions. Expect these to be immediately visible on a status console.
  3. WARNING: Message that can cause issue in future.
  4. INFO: Interesting runtime events (startup/shutdown). Expect these to be immediately visible on a console, so be conservative and keep to a minimum.
  5. DEBUG: detailed information on the flow through the system. Expect these to be written to logs only.
  6. TRACE: more detailed information. Expect these to be written to logs only.

Why Severity Level ?

Correct severity level is required while logging object, messages or errors so that easily track/debug issues and also analyze the behavior and failure cases of application while doing centralize logging.

Formatters or renderers

A Formatter is an object that that takes log line or object or exceptions from loggers and convert in formatted string representation. Below is technique to define your customize log format.

TTCC (Time Thread Category Component) is message format pattern representation used by log4j2.

For example : %r [%t] %-5p %c %x – %m%n  will print log line as below

567 [main] INFO org.apache.log4j.examples.FacingIssuesOnIT- Exiting main method.

Where

  • %r Used to output the number of milliseconds elapsed from the construction of the layout until the creation of the logging event.
  • %t Used to output the name of the thread that generated the logging event.
  • %p Used to output the priority of the logging event.
  • %c Used to output the category of the logging event.
  • %x Used to output the NDC (nested diagnostic context) associated with the thread that generated the logging event.
  • %X{key} Used to output the MDC (mapped diagnostic context) associated with the thread that generated the logging event for specified key.
  • %m Used to output the application supplied message associated with the logging event.
  • %n Used to output the platform-specific newline character or characters.

Appenders or handlers

Appenders takes message at or above a specified minimum severity level and passed and posts to appropriate message dispositions. Log4j2 supports below disposition of appenders.

  • ConsoleAppender
  • FileAppender
  • JDBCAppender
  • AsyncAppender
  • CassandraAppender
  • FailoverAppender
  • FlumeAppender
  • JMS Appender
  • JPAAppender
  • HttpAppender
  • KafkaAppender
  • MemoryMappedFileAppender
  • NoSQLAppender
  • OutputStreamAppender
  • RandomAccessFileAppender
  • RewriteAppender
  • RollingFileAppender
  • RollingRandomAccessFileAppender
  • RoutingAppender
  • SMTPAppender
  • ScriptAppenderSelector
  • SocketAppender
  • SyslogAppender
  • ZeroMQ/JeroMQ Appender

Log4j2 Configuration Support:

Log4j2 configuration can be accomplished 1 to 4 ways.

  • Through a configuration file written in XML, JSON, YAML, or properties format.
  • Programmatically, by creating a ConfigurationFactory and Configuration implementation.
  • Programmatically, by calling the APIs exposed in the Configuration interface to add components to the default configuration.
  • Programmatically, by calling methods on the internal Logger class.

Log4j2 Automatic Configuration:

Log4j2 has the ability to automatically configure itself during initialization. When Log4j starts it will look all the ConfigurationFactory plugins and arrange them in weighted order from highest to lowest. As above, Log4j contains four ConfigurationFactory implementations: one for JSON, one for YAML, one for properties, and one for XML.

  1. Log4j will inspect the “log4j.configurationFile” system property and, if set, will attempt to load the configuration using the ConfigurationFactory that matches the file extension.
  2. If no system property is set the properties ConfigurationFactory will look for log4j2-test.properties in the classpath.
  3. If no such file is found the YAML ConfigurationFactory will look for log4j2-test.yaml or log4j2-test.yml in the classpath.
  4. If no such file is found the JSON ConfigurationFactory will look for log4j2-test.json or log4j2-test.jsn in the classpath.
  5. If no such file is found the XML ConfigurationFactory will look for log4j2-test.xml in the class path.
  6. If a test file cannot be located the properties ConfigurationFactory will look for log4j2.properties on the classpath.
  7. If a properties file cannot be located the YAML ConfigurationFactory will look for log4j2.yaml or log4j2.yml on the classpath.
  8. If a YAML file cannot be located the JSON ConfigurationFactory will look for log4j2.json or log4j2.jsn on the classpath.
  9. If a JSON file cannot be located the XML ConfigurationFactory will try to locate log4j2.xml on the classpath.
  10. If no configuration file could be located the DefaultConfiguration will be used. This will cause logging output to go to the console.

Here we mainly focus on log4j2 XML configuration for ConsoleAppenderFileAppender and RollingFileAppender and will see how to apply filters for loggers on default, package level  and root level with different scenarios. also see how same java program logging work on different configuration.

Steps to configuration of log4j2 with any java application:

  • Create any console based Java application or Maven JAVA Console Application or Maven Web Application.
  • Add below dependency/jars on your application.
  • Add below log4j2.xml file in your application root folder or for maven in resource folder as below.
  • Add below JAVA program in any package of your application.

Configure as below :

log4jConfiguration

Dependencies : 

<!-- basic Log4j2 dependency -->

	org.apache.logging.log4j
	log4j-api
	2.6.1


	org.apache.logging.log4j
	log4j-core
	2.6.1

<!-- Asynchronous logging for multithreaded env -->

	com.lmax
	disruptor
	3.3.4

log4j2.xml configuration Here


<!-- Log File Name and Location -->
	
		target/FacingIssueOnIT.log
		C:/logs/
	
	
		<!-- Console Logging -->
		
			
		
		<!-- File Logging -->
		
			
				%d %p %c{1.} [%t] %m%n
			
		
	
	
		<!-- ByDefault, all log messages of level "trace" or higher will be logged.Log messages are sent to the "file" appender are severity level error or higher while  for console appender and log messages of level "error" and higher will be sent to the "STDOUT" appender. -->
		
			
			
		
	

JAVA Program Here

package com.logging;

import org.apache.logging.log4j.Logger;

import java.time.LocalDateTime;

import org.apache.logging.<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>log4j.LogManager;

public class Log4jExample {

    private static Logger logger = LogManager.getLogger(Log4jExample.class);

    public static void main(String[] args) {

    	  logger.fatal("Fatal log message :FacingIssuesOnIT");

    	  logger.error("Error log message :FacingIssuesOnIT");

    	  logger.warn("Warn log message :FacingIssuesOnIT");

    	  logger.info("Info log message :FacingIssuesOnIT");

          logger.debug("Debug log message :FacingIssuesOnIT");

          logger.trace("Trace log message :FacingIssuesOnIT");
    }
}

As below for console and file output are different because of logging configuration for STDOUT and file. If you noticed STDOUT is configured for severity level as debug that’s why in console printing all log lines for debug and above severity level except Trace. Same way for file output on location /target/FacingIssuesonIT.log are having logs for FATAL and ERROR only because file is configured for severity level as ERROR.

Console Output :

20171220 10:19:12.640 [main] FATAL com.logging.Log4jExample - Fatal log message :FacingIssuesOnIT
20171220 10:19:12.642 [main] ERROR com.logging.Log4jExample - Error log message
:FacingIssuesOnIT
20171220 10:19:12.642 [main] WARN  com.logging.Log4jExample - Warn log message :
FacingIssuesOnIT
20171220 10:19:12.642 [main] INFO  com.logging.Log4jExample - Info log message :
FacingIssuesOnIT
20171220 10:19:12.642 [main] DEBUG com.logging.Log4jExample - Debug log message
:FacingIssuesOnIT

File Output:

2017-12-20 10:19:12,640 FATAL c.l.Log4jExample [main] Fatal log message :FacingIssuesOnIT
2017-12-20 10:19:12,642 ERROR c.l.Log4jExample [main] Error log message :FacingIssuesOnIT

RollingFileAppender Configuration

The above was basic configuration and design for implement log4j2 logging so that easily understand. Now we will go in more detail for configuration  so that understand  how to log rolling and archieve logs and maintain easily by date and size of log file by implement FileAppender. We will also know about to implement logger filter on package level so that you can easily main logs for specific module or functionality.

Now making some changes in configuration file as well as in JAVA program to testing FileAppender.

log4j2.xml configuration


	<!-- Log File Name and Location -->
	
		target/FacingIssueOnIT.log
		C:/logs/
	
	
		<!-- Console Logging -->
		
			
		
		<!-- File Logging -->
		
			
				%d %p %c{1.} [%t] %m%n
			
		
		<!-- Rolling File -->
		
			
				%d{yyyyMMdd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
			
			
				
				
			
			
			
          
          
        
			
		
	
	

	<!-- package level logger -->
		<!-- Loggers classes whose package name start with com.logging will log message of level  			"debug" or higher -->
		

		
			
		

		<!-- ByDefault, all log messages of level "trace" or higher will be logged.  			Log messages are sent to the "file" appender are severity level error or higher while  for console appender 			and log messages of level "error"  			and higher will be sent to the "STDOUT" appender. and rolling file for all level as configure for root -->
		
			
			
			
		
	&lt;/Loggers

In above log4j2.xml configuration having additional changes for appender RollingFile. Let me explain about it in more detail:

%d{yyyyMMdd HH:mm:ss.SSS} [%t] %-5level %logger{36} – %msg%n : This pattern shows how your logs will format  in logs file.

filename=”${log-path}/FacingIssueOnIT.log :  Current logs will write on this file.

configurefilePattern=”${log-path}/$${date:yyyy-MM-dd}/myexample-%d{yyyy-MM-dd}-%i.log.gz : As configured for triggering policy will check in every second (interval=1) if current file size reach to 100MB (size=100MB) will create rolling file on current date folder as in below screen.

Archieve Delete Policy: represent how old logs you want to keep as backup as of now configured for last one hour. As per you application need change it to days and change path of delete achieve logs files as per your logs directory.

Here I have added RollingFile appenders in loggers as root with out any specified level so that we can do logging for all log line. If you want to filter logs and behave differently for different package you can use loggers with different severity levels as I have used for package com.logging.

JAVA Code :

Here I have added infinite loop for testing RollingFileAppender so that logs continuously added to log file. Additionally for big application prospects added condition for checking what level severity is configured in logs so that if not satisfy condition then save operation processing time of logger for logging, formatting and appending checking. In this way we can increase application performance for logging.

package com.logging;

import org.apache.logging.log4j.Logger;
import java.time.LocalDateTime;
import org.apache.logging.log4j.LogManager;

public class Log4jExample {
    private static Logger logger = LogManager.getLogger(Log4jExample.class);

    public static void main(String[] args) {

    	 do
     	{
     	 if(logger.isFatalEnabled())
    	  logger.fatal("Fatal log message :FacingIssuesOnIT");
     	if(logger.isErrorEnabled())
    	  logger.error("Error log message :FacingIssuesOnIT");
     	if(logger.isWarnEnabled())
    	  logger.warn("Warn log message :FacingIssuesOnIT");
     	if(logger.isInfoEnabled())
    	  logger.info("Info log message :FacingIssuesOnIT");
     	if(logger.isDebugEnabled())
          logger.debug("Debug log message :FacingIssuesOnIT");
     	if(logger.isTraceEnabled())
          logger.trace("Trace log message :FacingIssuesOnIT");
     	}
    }
while(1>0);
}

File output: For current log file will have log formatted as below.

20171220 10:49:55.226 [main] FATAL com.logging.Log4jExample - Fatal log message :FacingIssuesOnIT
20171220 10:49:55.227 [main] ERROR com.logging.Log4jExample - Error log message :FacingIssuesOnIT
20171220 10:49:55.228 [main] WARN  com.logging.Log4jExample - Warn log message :FacingIssuesOnIT
20171220 10:49:55.228 [main] INFO  com.logging.Log4jExample - Info log message :FacingIssuesOnIT
20171220 10:49:55.228 [main] DEBUG com.logging.Log4jExample - Debug log message :FacingIssuesOnIT

Archive Log Files:  Rolling and archive file will create as below on directory C:\logs\2017-12-20

log4j RollingFile

Summary 

In this tutorial, I have considered logging importance, ways of centralize logging, log4j2 configuration for console, file and rolling file appenders. Also explained about rolling, archive management of logs  and bit idea to increase you application performance with minor change for logging.

References :

https://logging.apache.org/log4j

Advertisements

Factorial of a Number by Java Program


The Factorial of number is denoted by n! , is the product/multiplication of all positive integers less than or equal to n.

Where n is always a non-negative number and The value of 0! is 1, according to the convention for an empty product.

Example :

0!=1=1
1!=1=1
5!=5 * 4 * 3 * 2 * 1=120
12!=12 * 11 * 10 * 9 * 8 * 7 * 6 * 5 * 4 * 3 * 2 * 1=479001600

Below are examples to calculate factorial of  a Number. I have consider all the cases like calculation of big factorial number by loop and also through recursion.

Factorial of a Number by using Java code loop.

class Factorial {
	public static void main(String args[]) {
		int n, c, fact = 1;

		System.out.println("Enter an integer to calculate it's factorial");
		Scanner in = new Scanner(System.in);

		n = in.nextInt();

		if (n < 0)
			System.out.println("Number should be non-negative.");
		else {
			for (c = 1; c <= n; c++)
				fact = fact * c;

			System.out.println("Factorial of " + n + " is = " + fact);
		}
	}
}

Factorial of a Big Integer Number by using loop

If calculation of number is too big which cross the Integer limit then use BigInteger instead if Integer.

class Factorial {
	public static void main(String args[]) {
		int n, c;
		BigInteger inc = new BigInteger("1");
		BigInteger fact = new BigInteger("1");

		Scanner input = new Scanner(System.in);

		System.out.println("Input an integer");
		n = input.nextInt();

		for (c = 1; c <= n; c++) {
			fact = fact.multiply(inc);
			inc = inc.add(BigInteger.ONE);
		}

		System.out.println(n + "! = " + fact);
	}
}

Factorial of Integer Number by using Java Recursion.

import java.util.Scanner;

public class FactorialByRecursion {

	public static void main(String[] args) {
		int n;
		System.out.println("Enter an integer to calculate it's factorial");
		Scanner in = new Scanner(System.in);

		n = in.nextInt();

		if (n < 0)
			System.out.println("Number should be non-negative.");
		else {
			System.out.println("Factorial of " + n + " is = " + factorial(n));
		}

	}

	private static int factorial(int num)
	{
		//Recursion Terminating condition if 0 terminate with value 1
		if(num==0)
			return 1;
		else
		return num*factorial(num-1);
	}

}

More Info

For more Algorithms and Java Programing Test questions and sample code follow below links

 

How to Configure Filebeat, Kafka, Logstash Input , Elasticsearch Output and Kibana Dashboard


Filebeat, Kafka, Logstash, Elasticsearch and Kibana Integration is used for big organizations where applications deployed in production on hundreds/thousands of servers and scattered around different locations and need to do analysis on data from these servers on real time.

This integration helps mostly for log level analysis , tracking issues, anomalies with data and alerts on events of particular occurrence and where accountability measures.

By using these technology provide scalable architecture to enhance systems and decoupled of each other individually.

Why these Technology?

Filebeat :

  • Lightweight agent for shipping logs.
  • Forward and centralize files and logs.
  • Robust (Not miss a single beat)

Kafka:

  • Open source distributed, Steam Processing, Message Broker platform.
  • process stream data or transaction logs on real time.
  • fault-tolerant, high throughput, low latency platform for dealing real time data feeds.

Logstash:

  •  Open source, server-side data processing pipeline that accept data from a different  sources simultaneously.
  • Parse, Format, Transform data and send to different output sources.

Elasticsearch:

  • Elasticsearch is open source, distributed cross-platform.
  • Built on top of Lucene which provide full text search and provide NRT(Near real Time) search results.
  • Support RESTFUL search  by Elasticsearch REST

Kibana:

  • Open source
  • Provide window to view Elasticsearch data in form different charts and dashboard.
  • Provide way  searches and operation of data easily with respect to time interval.
  • Easily Imported by  any web application by embedded dashboards.

How Data flow works ?

In this integration filebeat will install in all servers where your application is deployed and filebeat will read and ship  latest logs changes from these servers to Kafka topic as configured for this application.

Logstash will subscribe log lines from kafka topic and perform parsing on these lines make relevant changes, formatting, exclude and include fields then send this processed data to Elasticsearch Indexes as centralize location from different servers.

Kibana  is linked with  Elasticsearch indexes which will help to do analysis by search, charts and dashboards .

FKLEK Integration

Design Architecture

In below configured architecture considering my application is deployed on three servers and each server having current log file name as App1.log . Our goal is read real time data from these servers and do analysis on these data.

FKLEK Arch Integration

Steps to Installation, Configuration and Start

Here first we will install Kafka and Elasticsearch run individually rest of tools will install and run sequence to test with data flow.  Initially install all in same machine  and test with sample data with below steps and at end of this post will tell about what changes need to make according to your servers.

  • Kafka Installation, Configuration and Start
  • Elasticsearch Installation,Configuration and Start
  • Filebeat Installation,Configuration and Start
  • Logstash Installation,Configuration and Start
  • Kibana Installation,Start and display.

Pre-Requisite

These Filebeat,Logstash, Elasticsearch and Kibana versions should be compatible better use latest from  https://www.elastic.co/downloads.

  • Java 8+
  • Linux Server
  • Filebeat 5.XX
  • Kafka 2.11.XX
  • Logstash 5.XX
  • Elasticsearch 5.XX
  • Kibana 5.XX

Note  : Make sure JDK 8 should be install  and JAVA_HOME environment variable point to JDK 8 home directory  wherever you want in install Elasticsearch, Logstash,Kibana and Kafka.

Window   : My computer ->right click-> Properties -> Advance System Settings->System Variable

Java_Home
Set JAVA_HOME

Linux : Go to your home directory/ sudo directory and below line as below .

export JAVA_HOME=/opt/app/facingissuesonit/jdk1.8.0_66

Sample Data

For testing we will use these sample log line which is having debug as well as stacktrace of logs and grok parsing of this example is designed according to it. For real time testing and actual data you can point to your server log files but you have to modify grok pattern in Logstash configuration accordingly.

2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}
java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist

	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)
	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}

Create  App1.log file  in same machine where filebeat need to install and copy above logs lines in App1.log file.

Kafka Installation , Configuration and Start

Download latest version of Kafka from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://kafka.apache.org/downloads

tar -zxvf kafka_2.11-0.10.0.0

For more configuration and start options follow Setup Kafka Cluster for Single Server/Broker

After download and untar/unzip file it will have below files and directory structure.

ls- l
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr  3 05:18 bin
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May  8 11:05 config
drwxr-xr-x 74 facingissuesonit Saurabh   4096 May 27 20:00 kafka-logs
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 libs
-rw-r--r--  1 facingissuesonit Saurabh  28824 Apr  3 05:17 LICENSE
drwxr-xr-x  2 facingissuesonit Saurabh 487424 May 27 20:00 logs
-rw-r--r--  1 facingissuesonit Saurabh    336 Apr  3 05:18 NOTICE
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 site-docs

For more details about all these files,configuration option and other integration options follow Kafka Tutorial.

Make below changes in files config/zookeeper.properties and config/server.properties

config/zookeeper.properties

clientPort=2181
config/server.properties:

broker.id=0
listeners=PLAINTEXT://:9092
log.dir=/kafka-logs
zookeeper.connect=localhost:2181

Now Kafka is configured and ready to run. Use below command to start zookeeper and Kafka server as  background process.

screen -d -m bin/zookeeper-server-start.sh config/zookeeper.properties
screen -d -m bin/kafka-server-start.sh config/server.properties

To test  Kafka  install successfully you can check by running Kafka process on Linux “ps -ef|grep kafka” or steps for consumer and producer to/from topic in Setup Kafka Cluster for Single Server/Broker.

Elasticsearch Installation,Configuration and Start

Download latest version of Elasticsearch from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/elasticsearch

tar -zxvf elasticsearch-5.4.0.tar.gz

It will show below files and directory structure for Elasticsearch.

drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 25 19:20 bin
drwxr-xr-x  3 facingissuesonit Saurabh   4096 May 13 17:27 config
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr 24 15:56 data
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 lib
-rw-r--r--  1 facingissuesonit Saurabh  11358 Apr 17 10:50 LICENSE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May 28 05:00 logs
drwxr-xr-x 12 facingissuesonit Saurabh   4096 Apr 17 10:55 modules
-rw-r--r--  1 facingissuesonit Saurabh 194187 Apr 17 10:55 NOTICE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 plugins
-rw-r--r--  1 facingissuesonit Saurabh   9540 Apr 17 10:50 README.textile

Before going to start Elasticsearch need to make some basic changes in config/elasticsearch.yml file for cluster  and node name. You can configure it based on you application or organization name.

cluster.name: FACING-ISSUE-IN-IT
node.name: TEST-NODE-1
#network.host: 0.0.0.0
http.port: 9200

Now we are ready with elasticsearch configuration and time start elasticsearch. We can use below command to run elasticsearch in background.

screen -d -m  /bin/elasticsearch

For  checking elasticsearch starts successfully you can use below url on browser  to know cluster status . You will get result like below.

http://localhost:9200/_cluster/health?pretty

or as below if network.host configured

http://elasticseverIp:9200/_cluster/health?pretty

Result :

{
  "cluster_name" : "FACING-ISSUE-IN-IT",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

Filebeat Installation, Configuration and Start

Download latest version of filebeat from  below link and use  command to untar  and installation in Linux server. or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/beats/filebeat

tar -zxvf filebeat-<version>.tar.gz

For more configuration and start options follow Filebeat Download,Installation and Start/Run

After download and untar/unzip file it will have below files and directory structure.

ls- l
-rwxr-xr-x 1 facingissuesonit Saurabh 14908742 Jan 11 14:11 filebeat
-rw-r--r-- 1 facingissuesonit Saurabh    31964 Jan 11 14:11 filebeat.full.yml
-rw-r--r-- 1 facingissuesonit Saurabh     3040 Jan 11 14:11 filebeat.template-es2x.json
-rw-r--r-- 1 facingissuesonit Saurabh     2397 Jan 11 14:11 filebeat.template.json
-rw-r--r-- 1 facingissuesonit Saurabh     4196 Jan 11 14:11 filebeat.yml
-rw-r--r-- 1 facingissuesonit Saurabh      811 Jan 11 14:10 README.md
drwxr-xr-x 2 facingissuesonit Saurabh     4096 Jan 11 14:11 scripts

For more details about all these files,configuration option and other integration options follow Filebeat Tutorial.

Now filebeat is installaed and need to make below changes in filebeat.full.yml file

  • Inside prospectors section change paths to your log file location as
paths:
-/opt/app/facingissuesonit/App1.log
  • Comment out Elasticsearch Output default properties as below
#output.elasticsearch:
#hosts: ["localhost:9200"]
  • Configure multiline option as below so that all stacktrace line which are not starting with date  can we consider as single line.
multiline.pattern: ^\d
multiline.negate: true
multiline.match: after

For learn more on filebeat multiline configuration follow Filebeat Multiline Configuration Changes for Object, StackTrace and XML

  • Inside Kafka Output section update these properties hosts and topic. if Kafka on same machine then use localhost else update with IP of kafka machine.
output.kafka:
 hosts: ["localhost:9092"]
 topic: APP-1-TOPIC

For more on Logging configuration follow link Filebeat, Logging Configuration.

Now filebeat is configured and ready to start with  below command, it will read from configured prospector for file App1.log continiously and publish log line events to Kafka . It will also create topic as APP-1-TOPIC in Kafka if not exist.

./filebeat -e -c filebeat.full.yml -d "publish"

On console it will display output as below for sample lines.

2017/05/28 00:24:27.991828 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 194,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991907 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 375,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
  "offset": 718,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.992Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}",
  "offset": 902,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}

Now you can see from above filebeat debug statements publish event 3 is having multiline statements with stacktrace exception and each debug will have these fields like.

@timestamp:  Timestamp of data shipped.

beat.hostname : filebeat machine name from where data is shipping.

beat.version: which version of filebeat installed on server that help for compatibility check on target end.

message : Log line from logs file or multline log lines

offset: it’s represent inode value in source file

source :  it’s file name from where logs were read

Now time to check data is publish to Kafka topic or not. For this go to below directory  and you will see two files as xyz.index and xyz.log for maintaining data offset and messages.

{Kafka_home}/kafk_logs/APP-1-TOPIC
          00000000000000000000.log
          00000000000000000000.index

Now your server log lines are in Kafka topic for reading and parsing  by Logstash and send it to elasticsearch for doing analysis/search on this data.

Logstash Installation, Configuration and Start

Download latest version of Logstash from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/logstash

tar -zxvf logstash-5.4.0.tar.gz

It will show below file and directory structure.

drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 bin
-rw-r--r-- 1 facingissuesonit Saurabh 111569 Mar 22 23:49 CHANGELOG.md
drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 config
-rw-r--r-- 1 facingissuesonit Saurabh   2249 Mar 22 23:49 CONTRIBUTORS
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 12:07 data
-rw-r--r-- 1 facingissuesonit Saurabh   3945 Mar 22 23:55 Gemfile
-rw-r--r-- 1 facingissuesonit Saurabh  21544 Mar 22 23:49 Gemfile.jruby-1.9.lock
drwxr-xr-x 5 facingissuesonit Saurabh   4096 Apr 20 11:27 lib
-rw-r--r-- 1 facingissuesonit Saurabh    589 Mar 22 23:49 LICENSE
drwxr-xr-x 2 facingissuesonit Saurabh   4096 May 21 00:00 logs
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-event-java
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-plugin-api
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-queue-jruby
-rw-r--r-- 1 facingissuesonit Saurabh  28114 Mar 22 23:56 NOTICE.TXT
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 vendor

Before going to start Logstash need to create configuration file for taking input data from Kafka and parse these data in respected fields and send it elasticsearch. Create file logstash-app1.conf in logstash bin directory with below content.

/bin/logstash-app1.conf

input {
     kafka {
            bootstrap_servers => 'localhost:9092'
            topics => ["APP-1-TOPIC"]
            codec => json {}
          }
}
filter
{
//parse log line
      grok
	{
	match => {"message" => "\A%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:loglevel}\s+(?<logger>(?:[a-zA-Z0-9-]+\.)*[A-Za-z0-9$]+)\s+(-\s+)?(?=(?<msgnr>[A-Z]+[0-9]{4,5}))*%{DATA:message}({({[^}]+},?\s*)*})?\s*$(?<stacktrace>(?m:.*))?" }
	}  

    #Remove unused fields
    #mutate { remove_field =>["beat","@version" ]}
}
output {
    #Output result sent to elasticsearch and dynamically create array
    elasticsearch {
        index  => "app1-logs-%{+YYYY.MM.dd}"
        hosts => ["localhost:9200"]
        sniffing => false
  	}

     #Sysout logs
     stdout
       {
         codec => rubydebug
       }
}

To test your configuration file you can use below command.


./logstash -t -f logstash-app1.conf

If  we get result OK from above command run below to start reading and parsing data from Kafka topic.


./logstash -f logstash-app1.conf

For design your own grok pattern for you logs line formatting you can follow below link that will help to generate incrementally and also provide some sample logs grok.

http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/

Logstash console will show parse data as below  and you can remove unsed fields for storing in elasticsearch by uncomment mutate section from configuration file.

{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 194,
      "loglevel" => "WARN",
        "logger" => "CreateSomethingActivationKey",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "WhateverException for User 49-123-345678 "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,662"
}
{
         "msgnr" => "ERR1700",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 375,
      "loglevel" => "INFO",
        "logger" => "LMLogger",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "ERR1700 - u:null failures: 0  - Technical error "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,663"
}
{
        "offset" => 718,
        "logger" => "SomeCallLogger",
    "input_type" => "log",

       "message" => [
        [0] "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
        [1] "ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  "
    ],
          "type" => "log",
         "msgnr" => "ESS10005",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
    "stacktrace" => "\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
      "loglevel" => "ERROR",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
      "@version" => "1",
     "timestamp" => "2013-02-28 09:57:56,668"
}
{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 903,
      "loglevel" => "INFO",
        "logger" => "EntryFilter",
          "beat" => {
        "hostname" => "zlp0287k",
            "name" => "zlp0287k",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",

       "message" => [
        [0] "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}\n",
        [1] "Fresh on request /portalservices/foobarwhatever "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 10:04:35,723"
}

To test on elasticsearch end your data sent  successfully  you can use this url
http://localhost:9200/_cat/indices  on your browser and will display created index with current date.

yellow open app1-logs-2017.05.28                             Qjs6XWiFQw2zsiVs9Ks6sw 5 1         4     0  47.3kb  47.3kb

Kibana Installation, Configuration and Start

Download latest version of Kibana from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/kibana

tar -zxvf kibana-5.4.0.tar.gz

It will show below files and directory structure for kibana.

ls -l
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 14:23 bin
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 18:58 config
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 11:54 data
-rw-r--r--   1 facingissuesonit Saurabh    562 Apr 17 12:04 LICENSE.txt
drwxr-xr-x   6 facingissuesonit Saurabh   4096 Apr 17 12:04 node
drwxr-xr-x 485 facingissuesonit Saurabh  20480 Apr 17 12:04 node_modules
-rw-r--r--   1 facingissuesonit Saurabh 660429 Apr 17 12:04 NOTICE.txt
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 optimize
-rw-r--r--   1 facingissuesonit Saurabh    702 Apr 17 12:04 package.json
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 12:29 plugins
-rw-r--r--   1 facingissuesonit Saurabh   4909 Apr 17 12:04 README.txt
drwxr-xr-x  10 facingissuesonit Saurabh   4096 Apr 17 12:04 src
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 ui_framework
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 17 12:04 webpackShims

Before going to start Kibana need to make some basic changes in config/kibana.yml file make below changes after uncomment these properties file.

server.port: 5601
server.host: localhost
elasticsearch.url: "http://localhost:9200"

Now we are ready with Kibana configuration and time start Kibana. We can use below command to run Kibana in background.

screen -d -m  /bin/kibana

Kibana take time to start and we can test it by using below url in browser

http://localhost:5601/

For checking this data  in Kibana open above url in browser go to management tab on left side menu -> Index Pattern -> Click on Add New

Enter Index name or pattern and time field name as in below screen  and click on create button.

Kibana index setting
Index Pattern Settings

Now go to Discover Tab and select index as app1-log* will display data as below.

kibana discover data

Now make below changes according to  your application specification .

Filebeat :

  • update prospector path to your log directory current file
  •  Move Kafka on different machine because Kafka will single location where receive shipped data from different servers. Update localhost with same IP of kafka server in Kafka output section of filebeat.full.yml file  for hosts properties.
  • Copy same filebeat setup on all servers from where you application deployed and need to read logs.
  • Start all filebeat instances on each Server.

Elasticsearch :

  • Uncomment network.host properties from elasticsearch.yml file for accessing by  IP address.

Logstash:

  • Update localhost in logstash-app1.conf file input section with Kafka machine IP.
  • change grok pattern in filter section according to your logs format. You can take help from below url for incrementally design. http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/
  • Update localhost output section for elasticsearch with IP if moving on different machine.

Kibana:

  • update localhost in kibana.yml file for elasticsearch.url properties with IP if kibana on different machine.

Conclusion :

In this tutorial considers below points :

  • Installation of Filebeat, Kafka, Logstash, Elasticsearch and Kibana.
  • Filebeat is configured to shipped logs to Kafka Message Broker.
  • Logstash configured to read logs line from Kafka topic , Parse and shipped to Elasticsearch.
  • Kibana show these Elasticsearch information in form of chart and dashboard to users for doing analysis.

Read More

To read more on Filebeat, Kafka, Elasticsearch  configurations follow the links and Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Reference  :

 https://www.elastic.co/products

 

Elasticsearch REST Index Manager Auto Client for CRUD


Elasticsearch 5 REST Java Index Manager Auto Client can  help to manage index life from client end by setting configuration for keeping  index   open, close, delete indexes  for this no any third party tool required.

Below steps for auto  index management will save your time of index management manually and will take care of index life based on configure time.

Pre-requisite

  • Minimum requirement for Java 8 version required.
  • Add dependency for Elasticsearch REST and JSON Mapping in your pom.xml or add in your class path.
  • Index name format should be like IndexName-2017.06.10 for Ex. app1-logs-2017.06.08 if you have different date format change accordingly in below code.

We will follow below steps to create this client and auto run:

  • Create Java Maven Project ElasticsearchAutoIndexManager.
  • Add ElasticSearchIndexManagerClient in Project.
  • Test
  • Create auto run jar for project
  • Create script file for run auto jar
  • Create Cron Tab configuration for schedule and receive email alert.

Create Java Maven Project ElasticsearchAutoIndexManager

Create console based JAVA maven project as in below screen shot with name as ElasticsearchAutoIndexManager . To know more about Console based Java maven project follow link How to create Maven Java Console Project?

Elasticsearch REST Auto Client

Add below dependency in pom.xml file

<!--Elasticsearch REST jar-->
<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>rest</artifactId>
			<version>5.1.2</version>
</dependency>
<!--Jackson jar for mapping json to Java -->
<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.8.5</version>
</dependency>

Add below ElasticSearchIndexManagerClient class in com.facingissuesonit.es package and make below constant fields changes as per your server info and requirement.

Set INDEX_NO_ACTION_TIME so that till these days difference no action will take. For Example as set 2 means till  two days index will searchable in system.

private static final int INDEX_NO_ACTION_TIME = 2; 

Set INDEX_CLOSE_TIME so that all indexes will in close status means exist in elasticsearch server but not searchable.For Example as set 5 means if index life is more than five days  will close these indexes and keep it as long as Index delete time not reach.

private static final int INDEX_CLOSE_TIME = 5; 

Set INDEX_DELETE_TIME decide when to delete these indexes which match this criteria. For example as set 15 means will delete all indexes which are having index life more than 15 days.

private static final int INDEX_DELETE_TIME = 15;

private static final String ELASTICSEARCH_SERVER = “ServerHost”;

private static final int ELASTICSEARCH_SERVER_PORT = 9200;

Note : Set proxy server and login credential information if required else comment out.

package com.facingissuesonit.es;

import java.io.IOException;
import java.io.InputStream;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import com.fasterxml.jackson.databind.ObjectMapper;

public class ElasticSearchIndexManagerClient {
	private static final int INDEX_NO_ACTION_TIME = 2;
	private static final int INDEX_CLOSE_TIME = 5;
	private static final int INDEX_DELETE_TIME = 15;
	private static final String ELASTICSEARCH_SERVER = "ServerHost";
	private static final int ELASTICSEARCH_SERVER_PORT = 9200;
	public static void main(String[] args) {
		RestClient client;
		String indexName = "", indexDateStr = "";
		LocalDate indexDate = null;
		long days = 0;
		final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
		final LocalDate todayLocalDate = LocalDate.now();

		try {
			ElasticSearchIndexManagerClient esManager=new ElasticSearchIndexManagerClient();
			//Get Connection from Elasticsearch
			client=esManager.getElasticsearchConnectionClient();
			if(client!=null)
			{
				IndexDetail[] indexList = esManager.getIndexDetailList(client);

				if (indexList != null && indexList.length > 0) {
					for (IndexDetail indexDetail : indexList) {
						indexName = indexDetail.getIndexName();
						System.out.println(indexName);
						indexDateStr = indexName.substring(indexName.lastIndexOf("-") + 1);
						//Below code is for getting number of days difference from index creation ad current date
						try {
							indexDate = LocalDate.parse(indexDateStr.replace('.', '-'), formatter);
							days = ChronoUnit.DAYS.between(indexDate, todayLocalDate);
							esManager.performAction(indexDetail, days,client);
						} catch (Exception ex) {
							System.out.println("Index is not having formatted date as required : yyyy.MM.dd :"+indexName);
						}
					}
				}
			}
		} catch (Exception ex) {
			System.out.println("Exception found while index management");
			ex.printStackTrace();
			System.exit(1);
		} finally {
			System.out.println("Index Management successfully completed");
			System.exit(0);
		}
	}
	//Get Elasticsearch Connection
	private RestClient getElasticsearchConnectionClient() {
		final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
		credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userid", "password"));

		RestClient client = RestClient
				.builder(new HttpHost(ELASTICSEARCH_SERVER,ELASTICSEARCH_SERVER_PORT))
				.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

					public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
						return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
								.setProxy(new HttpHost("ProxyHost", "ProxyPort"));

					}
				}).setMaxRetryTimeoutMillis(60000).build();
		return client;
	}
	//Get List of Indexes in Elaticsearxh Server
	public IndexDetail[] getIndexDetailList(RestClient client)
	{
		IndexDetail[] indexDetails=null;
		HttpEntity in=null;
		try
		{
		ObjectMapper jacksonObjectMapper = new ObjectMapper();
		Response response = client.performRequest("GET", "/_cat/indices?format=json&pretty", Collections.singletonMap("pretty", "true"));
		in =response.getEntity();
		indexDetails=jacksonObjectMapper.readValue(in.getContent(), IndexDetail[].class);
		System.out.println("Index found :"+indexDetails.length);
		}
		catch(IOException ex)
		{
			ex.printStackTrace();
		}

		return indexDetails;
	}
	//This Method Decide what action need to take based based Index creation date and configured date for No Action, close and Delete indexes
	private  void performAction(IndexDetail indexDetail, long days,RestClient client) {
		String indexName = indexDetail.getIndexName();
		if (days >= INDEX_NO_ACTION_TIME) {
			if (!(indexDetail.getStatus() != null && indexDetail.getStatus().equalsIgnoreCase("close"))) {
				// Close index condition
				if (days >= INDEX_CLOSE_TIME) {
					System.out.println("Close Index :" + indexName);
					closeIndex(indexName,client);
				}
			}
			// Delete index condition
			if (days >= INDEX_DELETE_TIME) {
				if (!(indexDetail.getStatus() != null && indexDetail.getStatus().equalsIgnoreCase("close"))) {
					System.out.println("Delete Index :" + indexName);
					deleteIndex(indexName,client);
				} else {
					System.out.println("Delete Close Index :" + indexName);
					deleteCloseIndex(indexName,client);
				}
			}
		}
	}

	// Operation on Indexes
		private  void closeIndex(String indexName,RestClient client) {

			flushIndex(indexName,client);
			postDocuments(indexName + "/_close", client);
			System.out.println("Close Index :" + indexName);
		}

		private  void deleteIndex(String indexName,RestClient client) {
			flushIndex(indexName,client);
			deleteDocument(indexName,client);
			System.out.println("Delete Index :" + indexName);
		}

		private  void deleteCloseIndex(String indexName,RestClient client) {
			openIndex(indexName,client);
			flushIndex(indexName,client);
			deleteDocument(indexName, client);
			System.out.println("Delete Close Index :" + indexName);
		}

		private  void openIndex(String indexName,RestClient client) {
			postDocuments(indexName + "/_open", client);
			System.out.println("Open Index :" + indexName);
		}

		private  void flushIndex(String indexName,RestClient client) {
			postDocuments(indexName + "/_flush", client);
			System.out.println("Flush Index :" + indexName);
			try {
				Thread.sleep(3000);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
		//POST perform action used for creation and updation indexes
		public InputStream postDocuments(String endPoint,RestClient client)
		{
			InputStream in=null;
			Response response=null;
			try
			{
				response = client.performRequest("POST", endPoint, Collections.<String, String>emptyMap());
				in =response.getEntity().getContent();
			}
			catch(IOException ex)
			{
				System.out.println("Exception in post Documents :");
				ex.printStackTrace();
			}
			return in;
		}
		//DELETE perform action use for Deletion of Index
		public InputStream deleteDocument(String endPoint,RestClient client)
		{
			InputStream in=null;
			try
			{

		    Response response = client.performRequest("DELETE", endPoint, Collections.singletonMap("pretty", "true"));
			in =response.getEntity().getContent();
			}
			catch(IOException ex)
			{
				System.out.println("Exception in delete Documents :");
				ex.printStackTrace();
			}
			return in;
		}

}

In above code pretty state forward and step by step. Let’s me explain about below operation.

Open :  Index status open keep index available for searching and we can perform below operation like close and delete on indexes when it open status. For making Index open we can use below command in curl .

curl POST /indexName-2017.06.10/_open

Flush: This operation  is required before executing close and delete operation on indexes so that all running transactions on indexes complete.

curl POST /indexName-2017.06.10/_flush

Close : Close indexes persist in elasticsearch server but not available for searching. For making Index open we can use below command in curl .

curl POST /indexName-2017.06.10/_close

Delete : Delete operation on index will delete completely from server.

curl POST /indexName-2017.06.10/_delete

Now our code is ready to take care of indexes based on configured time and test . we test it after running above code.

Below steps are for making your index manager code auto runnable in Linux environment.

Create Auto Runnable Jar

Export ElasticsearchAutoIndexManager project as auto runnable jar by selecting as Launch class ElascticsearchIndexManagerClient. To learn about Auto runnable jar creation steps following link How to make and auto run /executable jar with dependencies?

Create Script File to Execute  Autorun Jar

Create script file as below with name as IndexManger.sh and save it.

#!/bin/bash
~/JAVA/jdk1.8.0_66/bin/java  -jar /opt/app/facingissuesonit/automate/IndexManagerClient.jar

Create Cron Tab configuration for schedule and receive email alert

Linux provide cron tab for executing schedule job/scripts. by using cron tab will execute  runnable jar by using above script file

  • Use command crontab -e to make and edit existing entries in cron tab.
  • Make below cron entry in this editor  for executing IndexManager.sh script on every night 1AM.
  • If you want to get execution alert to you and your team  with console logs also add your email id as below.
  • Save cron tab as ESC then (:wq)

Below are some more example for cron tab expression.

0 * * * *           : Run Every hour of day
* * * * *           : Every minute of day
30 4 * * *         : Run on 4:30 AM everyday
5 10,22 * * *   : Run twice on 10:05 and 22:05
5 0 * * *          : Run after Midnight

Read More

To read more on Elasticsearch REST , sample clients, configurations with example follow link Elasticsearch REST Tutorial and Elasticsearch Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana


Filebeat, Kafka, Logstash, Elasticsearch and Kibana Integration is used for big organizations where applications deployed in production on hundreds/thousands of servers and scattered around different locations and need to do analysis on data from these servers on real time.

This integration helps mostly for log level analysis , tracking issues, anomalies with data and alerts on events of particular occurrence and where accountability measures.

By using these technology provide scalable architecture to enhance systems and decoupled of each other individually.

Why these Technology?

Filebeat :

  • Lightweight agent for shipping logs.
  • Forward and centralize files and logs.
  • Robust (Not miss a single beat)

Kafka:

  • Open source distributed, Steam Processing, Message Broker platform.
  • process stream data or transaction logs on real time.
  • fault-tolerant, high throughput, low latency platform for dealing real time data feeds.

Logstash:

  •   Open source, server-side data processing pipeline that accept data from a different  sources simultaneously.
  • Parse, Format, Transform data and send to different output sources.

Elasticsearch:

  • Elasticsearch is open source, distributed cross-platform.
  • Built on top of Lucene which provide full text search and provide NRT(Near real Time) search results.
  • Support RESTFUL search  by Elasticsearch REST

Kibana:

  • Open source
  • Provide window to view Elasticsearch data in form different charts and dashboard.
  • Provide way  searches and operation of data easily with respect to time interval.
  • Easily Imported by  any web application by embedded dashboards.

How Data flow works ?

In this integration filebeat will install in all servers where your application is deployed and filebeat will read and ship  latest logs changes from these servers to Kafka topic as configured for this application.

Logstash will subscribe log lines from kafka topic and perform parsing on these lines make relevant changes, formatting, exclude and include fields then send this processed data to Elasticsearch Indexes as centralize location from different servers.

Kibana  is linked with  Elasticsearch indexes which will help to do analysis by search, charts and dashboards .

FKLEK Integration

Design Architecture

In below configured architecture considering my application is deployed on three servers and each server having current log file name as App1.log . Our goal is read real time data from these servers and do analysis on these data.

FKLEK Arch Integration

Steps to Installation, Configuration and Start

Here first we will install Kafka and Elasticsearch run individually rest of tools will install and run sequence to test with data flow.  Initially install all in same machine  and test with sample data with below steps and at end of this post will tell about what changes need to make according to your servers.

  • Kafka Installation, Configuration and Start
  • Elasticsearch Installation,Configuration and Start
  • Filebeat Installation,Configuration and Start
  • Logstash Installation,Configuration and Start
  • Kibana Installation,Start and display.

Pre-Requisite

These Filebeat,Logstash, Elasticsearch and Kibana versions should be compatible better use latest from  https://www.elastic.co/downloads.

  • Java 8+
  • Linux Server
  • Filebeat 5.XX
  • Kafka 2.11.XX
  • Logstash 5.XX
  • Elasticsearch 5.XX
  • Kibana 5.XX

Note  : Make sure JDK 8 should be install  and JAVA_HOME environment variable point to JDK 8 home directory  wherever you want in install Elasticsearch, Logstash,Kibana and Kafka.

Window   : My computer ->right click-> Properties -> Advance System Settings->System Variable

Java_Home
Set JAVA_HOME

Linux : Go to your home directory/ sudo directory and below line as below .

export JAVA_HOME=/opt/app/facingissuesonit/jdk1.8.0_66

Sample Data

For testing we will use these sample log line which is having debug as well as stacktrace of logs and grok parsing of this example is designed according to it. For real time testing and actual data you can point to your server log files but you have to modify grok pattern in Logstash configuration accordingly.

2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}
java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist

	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)
	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}

Create  App1.log file  in same machine where filebeat need to install and copy above logs lines in App1.log file.

Kafka Installation , Configuration and Start

Download latest version of Kafka from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://kafka.apache.org/downloads

tar -zxvf kafka_2.11-0.10.0.0

For more configuration and start options follow Setup Kafka Cluster for Single Server/Broker

After download and untar/unzip file it will have below files and directory structure.

ls- l
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr  3 05:18 bin
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May  8 11:05 config
drwxr-xr-x 74 facingissuesonit Saurabh   4096 May 27 20:00 kafka-logs
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 libs
-rw-r--r--  1 facingissuesonit Saurabh  28824 Apr  3 05:17 LICENSE
drwxr-xr-x  2 facingissuesonit Saurabh 487424 May 27 20:00 logs
-rw-r--r--  1 facingissuesonit Saurabh    336 Apr  3 05:18 NOTICE
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr  3 05:17 site-docs

For more details about all these files,configuration option and other integration options follow Kafka Tutorial.

Make below changes in files config/zookeeper.properties and config/server.properties

config/zookeeper.properties

clientPort=2181
config/server.properties:

broker.id=0
listeners=PLAINTEXT://:9092
log.dir=/kafka-logs
zookeeper.connect=localhost:2181

Now Kafka is configured and ready to run. Use below command to start zookeeper and Kafka server as  background process.

screen -d -m bin/zookeeper-server-start.sh config/zookeeper.properties
screen -d -m bin/kafka-server-start.sh config/server.properties

To test  Kafka  install successfully you can check by running Kafka process on Linux “ps -ef|grep kafka” or steps for consumer and producer to/from topic in Setup Kafka Cluster for Single Server/Broker.

Elasticsearch Installation,Configuration and Start

Download latest version of Elasticsearch from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/elasticsearch

tar -zxvf elasticsearch-5.4.0.tar.gz

It will show below files and directory structure for Elasticsearch.

drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 25 19:20 bin
drwxr-xr-x  3 facingissuesonit Saurabh   4096 May 13 17:27 config
drwxr-xr-x  3 facingissuesonit Saurabh   4096 Apr 24 15:56 data
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 lib
-rw-r--r--  1 facingissuesonit Saurabh  11358 Apr 17 10:50 LICENSE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 May 28 05:00 logs
drwxr-xr-x 12 facingissuesonit Saurabh   4096 Apr 17 10:55 modules
-rw-r--r--  1 facingissuesonit Saurabh 194187 Apr 17 10:55 NOTICE.txt
drwxr-xr-x  2 facingissuesonit Saurabh   4096 Apr 17 10:55 plugins
-rw-r--r--  1 facingissuesonit Saurabh   9540 Apr 17 10:50 README.textile

Before going to start Elasticsearch need to make some basic changes in config/elasticsearch.yml file for cluster  and node name. You can configure it based on you application or organization name.

cluster.name: FACING-ISSUE-IN-IT
node.name: TEST-NODE-1
#network.host: 0.0.0.0
http.port: 9200

Now we are ready with elasticsearch configuration and time start elasticsearch. We can use below command to run elasticsearch in background.

screen -d -m  /bin/elasticsearch

For  checking elasticsearch starts successfully you can use below url on browser  to know cluster status . You will get result like below.

http://localhost:9200/_cluster/health?pretty

or as below if network.host configured

http://elasticseverIp:9200/_cluster/health?pretty

Result :

{
  "cluster_name" : "FACING-ISSUE-IN-IT",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

Filebeat Installation, Configuration and Start

Download latest version of filebeat from  below link and use  command to untar  and installation in Linux server. or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/beats/filebeat

tar -zxvf filebeat-<version>.tar.gz

For more configuration and start options follow Filebeat Download,Installation and Start/Run

After download and untar/unzip file it will have below files and directory structure.

ls- l
-rwxr-xr-x 1 facingissuesonit Saurabh 14908742 Jan 11 14:11 filebeat
-rw-r--r-- 1 facingissuesonit Saurabh    31964 Jan 11 14:11 filebeat.full.yml
-rw-r--r-- 1 facingissuesonit Saurabh     3040 Jan 11 14:11 filebeat.template-es2x.json
-rw-r--r-- 1 facingissuesonit Saurabh     2397 Jan 11 14:11 filebeat.template.json
-rw-r--r-- 1 facingissuesonit Saurabh     4196 Jan 11 14:11 filebeat.yml
-rw-r--r-- 1 facingissuesonit Saurabh      811 Jan 11 14:10 README.md
drwxr-xr-x 2 facingissuesonit Saurabh     4096 Jan 11 14:11 scripts

For more details about all these files,configuration option and other integration options follow Filebeat Tutorial.

Now filebeat is installaed and need to make below changes in filebeat.full.yml file

  • Inside prospectors section change paths to your log file location as
paths:
-/opt/app/facingissuesonit/App1.log
  • Comment out Elasticsearch Output default properties as below
#output.elasticsearch:
#hosts: ["localhost:9200"]
  • Configure multiline option as below so that all stacktrace line which are not starting with date  can we consider as single line.
multiline.pattern: ^\d
multiline.negate: true
multiline.match: after

For learn more on filebeat multiline configuration follow Filebeat Multiline Configuration Changes for Object, StackTrace and XML

  • Inside Kafka Output section update these properties hosts and topic. if Kafka on same machine then use localhost else update with IP of kafka machine.
output.kafka:
 hosts: ["localhost:9092"]
 topic: APP-1-TOPIC

For more on Logging configuration follow link Filebeat, Logging Configuration.

Now filebeat is configured and ready to start with  below command, it will read from configured prospector for file App1.log continiously and publish log line events to Kafka . It will also create topic as APP-1-TOPIC in Kafka if not exist.

./filebeat -e -c filebeat.full.yml -d "publish"

On console it will display output as below for sample lines.

2017/05/28 00:24:27.991828 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 194,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991907 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
  "offset": 375,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.991Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
  "offset": 718,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}
2017/05/28 00:24:27.991984 client.go:184: DBG  Publish: {
  "@timestamp": "2017-05-28T00:24:22.992Z",
  "beat": {
    "hostname": "sg02870",
    "name": "sg02870",
    "version": "5.1.2"
  },
  "input_type": "log",
  "message": "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}",
  "offset": 902,
  "source": "/opt/app/facingissuesonit/App1.log",
  "type": "log"
}

Now you can see from above filebeat debug statements publish event 3 is having multiline statements with stacktrace exception and each debug will have these fields like.

@timestamp:  Timestamp of data shipped.

beat.hostname : filebeat machine name from where data is shipping.

beat.version: which version of filebeat installed on server that help for compatibility check on target end.

message : Log line from logs file or multline log lines

offset: it’s represent inode value in source file

source :  it’s file name from where logs were read

Now time to check data is publish to Kafka topic or not. For this go to below directory  and you will see two files as xyz.index and xyz.log for maintaining data offset and messages.

{Kafka_home}/kafk_logs/APP-1-TOPIC
          00000000000000000000.log
          00000000000000000000.index

Now your server log lines are in Kafka topic for reading and parsing  by Logstash and send it to elasticsearch for doing analysis/search on this data.

Logstash Installation, Configuration and Start

Download latest version of Logstash from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/logstash

tar -zxvf logstash-5.4.0.tar.gz

It will show below file and directory structure.

drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 bin
-rw-r--r-- 1 facingissuesonit Saurabh 111569 Mar 22 23:49 CHANGELOG.md
drwxr-xr-x 2 facingissuesonit Saurabh   4096 Apr 20 11:27 config
-rw-r--r-- 1 facingissuesonit Saurabh   2249 Mar 22 23:49 CONTRIBUTORS
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 12:07 data
-rw-r--r-- 1 facingissuesonit Saurabh   3945 Mar 22 23:55 Gemfile
-rw-r--r-- 1 facingissuesonit Saurabh  21544 Mar 22 23:49 Gemfile.jruby-1.9.lock
drwxr-xr-x 5 facingissuesonit Saurabh   4096 Apr 20 11:27 lib
-rw-r--r-- 1 facingissuesonit Saurabh    589 Mar 22 23:49 LICENSE
drwxr-xr-x 2 facingissuesonit Saurabh   4096 May 21 00:00 logs
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-event-java
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-plugin-api
drwxr-xr-x 3 facingissuesonit Saurabh   4096 Apr 20 11:27 logstash-core-queue-jruby
-rw-r--r-- 1 facingissuesonit Saurabh  28114 Mar 22 23:56 NOTICE.TXT
drwxr-xr-x 4 facingissuesonit Saurabh   4096 Apr 20 11:27 vendor

Before going to start Logstash need to create configuration file for taking input data from Kafka and parse these data in respected fields and send it elasticsearch. Create file logstash-app1.conf in logstash bin directory with below content.

/bin/logstash-app1.conf

input {
     kafka {
            bootstrap_servers => 'localhost:9092'
            topics => ["APP-1-TOPIC"]
            codec => json {}
          }
}
filter
{
//parse log line
      grok
	{
	match => {"message" => "\A%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:loglevel}\s+(?<logger>(?:[a-zA-Z0-9-]+\.)*[A-Za-z0-9$]+)\s+(-\s+)?(?=(?<msgnr>[A-Z]+[0-9]{4,5}))*%{DATA:message}({({[^}]+},?\s*)*})?\s*$(?<stacktrace>(?m:.*))?" }
	}  

    #Remove unused fields
    #mutate { remove_field =>["beat","@version" ]}
}
output {
    #Output result sent to elasticsearch and dynamically create array
    elasticsearch {
        index  => "app1-logs-%{+YYYY.MM.dd}"
        hosts => ["localhost:9200"]
        sniffing => false
  	}

     #Sysout logs
     stdout
       {
         codec => rubydebug
       }
}

To test your configuration file you can use below command.


./logstash -t -f logstash-app1.conf

If  we get result OK from above command run below to start reading and parsing data from Kafka topic.


./logstash -f logstash-app1.conf

For design your own grok pattern for you logs line formatting you can follow below link that will help to generate incrementally and also provide some sample logs grok.

http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/

Logstash console will show parse data as below  and you can remove unsed fields for storing in elasticsearch by uncomment mutate section from configuration file.

{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 194,
      "loglevel" => "WARN",
        "logger" => "CreateSomethingActivationKey",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,662 WARN  CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "WhateverException for User 49-123-345678 "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,662"
}
{
         "msgnr" => "ERR1700",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 375,
      "loglevel" => "INFO",
        "logger" => "LMLogger",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",
        "source" => "/opt/app/facingissuesonit/App1.log",
       "message" => [
        [0] "2013-02-28 09:57:56,663 INFO  LMLogger - ERR1700 - u:null failures: 0  - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}",
        [1] "ERR1700 - u:null failures: 0  - Technical error "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 09:57:56,663"
}
{
        "offset" => 718,
        "logger" => "SomeCallLogger",
    "input_type" => "log",

       "message" => [
        [0] "2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  {}\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
        [1] "ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call:  "
    ],
          "type" => "log",
         "msgnr" => "ESS10005",
    "@timestamp" => 2017-05-28T23:47:42.160Z,
    "stacktrace" => "\njava.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist\n\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)\n\tat oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)",
      "loglevel" => "ERROR",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
      "@version" => "1",
     "timestamp" => "2013-02-28 09:57:56,668"
}
{
    "@timestamp" => 2017-05-28T23:47:42.160Z,
        "offset" => 903,
      "loglevel" => "INFO",
        "logger" => "EntryFilter",
          "beat" => {
        "hostname" => "zlp02870",
            "name" => "zlp02870",
         "version" => "5.1.2"
    },
    "input_type" => "log",
      "@version" => "1",

       "message" => [
        [0] "2013-02-28 10:04:35,723 INFO  EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}\n",
        [1] "Fresh on request /portalservices/foobarwhatever "
    ],
          "type" => "log",
     "timestamp" => "2013-02-28 10:04:35,723"
}

To test on elasticsearch end your data sent  successfully  you can use this url
http://localhost:9200/_cat/indices  on your browser and will display created index with current date.

yellow open app1-logs-2017.05.28                             Qjs6XWiFQw2zsiVs9Ks6sw 5 1         4     0  47.3kb  47.3kb

Kibana Installation, Configuration and Start

Download latest version of Kibana from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.

Download Link : https://www.elastic.co/downloads/kibana

tar -zxvf kibana-5.4.0.tar.gz

It will show below files and directory structure for kibana.

ls -l
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 14:23 bin
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 18:58 config
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 25 11:54 data
-rw-r--r--   1 facingissuesonit Saurabh    562 Apr 17 12:04 LICENSE.txt
drwxr-xr-x   6 facingissuesonit Saurabh   4096 Apr 17 12:04 node
drwxr-xr-x 485 facingissuesonit Saurabh  20480 Apr 17 12:04 node_modules
-rw-r--r--   1 facingissuesonit Saurabh 660429 Apr 17 12:04 NOTICE.txt
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 optimize
-rw-r--r--   1 facingissuesonit Saurabh    702 Apr 17 12:04 package.json
drwxr-xr-x   2 facingissuesonit Saurabh   4096 May 22 12:29 plugins
-rw-r--r--   1 facingissuesonit Saurabh   4909 Apr 17 12:04 README.txt
drwxr-xr-x  10 facingissuesonit Saurabh   4096 Apr 17 12:04 src
drwxr-xr-x   3 facingissuesonit Saurabh   4096 Apr 17 12:04 ui_framework
drwxr-xr-x   2 facingissuesonit Saurabh   4096 Apr 17 12:04 webpackShims

Before going to start Kibana need to make some basic changes in config/kibana.yml file make below changes after uncomment these properties file.

server.port: 5601
server.host: localhost
elasticsearch.url: "http://localhost:9200"

Now we are ready with Kibana configuration and time start Kibana. We can use below command to run Kibana in background.

screen -d -m  /bin/kibana

Kibana take time to start and we can test it by using below url in browser

http://localhost:5601/

For checking this data  in Kibana open above url in browser go to management tab on left side menu -> Index Pattern -> Click on Add New

Enter Index name or pattern and time field name as in below screen  and click on create button.

Kibana index setting
Index Pattern Settings

Now go to Discover Tab and select index as app1-log* will display data as below.

kibana discover data

Now make below changes according to  your application specification .

Filebeat :

  • update prospector path to your log directory current file
  •  Move Kafka on different machine because Kafka will single location where receive shipped data from different servers. Update localhost with same IP of kafka server in Kafka output section of filebeat.full.yml file  for hosts properties.
  • Copy same filebeat setup on all servers from where you application deployed and need to read logs.
  • Start all filebeat instances on each Server.

Elasticsearch :

  • Uncomment network.host properties from elasticsearch.yml file for accessing by  IP address.

Logstash:

  • Update localhost in logstash-app1.conf file input section with Kafka machine IP.
  • change grok pattern in filter section according to your logs format. You can take help from below url for incrementally design. http://grokdebug.herokuapp.com and http://grokconstructor.appspot.com/
  • Update localhost output section for elasticsearch with IP if moving on different machine.

Kibana:

  • update localhost in kibana.yml file for elasticsearch.url properties with IP if kibana on different machine.

ce

Elasticsearch REST JAVA Client to get Index Details List


Below is example to get Index Detail in Java Array by using Elasticsearch REST Java client. Here client will call endpoint  “/_cat/indices?format=json” to retrieve all detail of index list. It is same as we use GET by CURL

GET http://elasticsearchHost:9200/_cat/indices?format=json
 

Pre-requisite

  • Minimum requirement for Java 7 version required.
  • Add below dependency for Elasticsearch REST and JSON Mapping in your pom.xml or add in your class path.

Dependency

<!--Elasticsearch REST jar-->
<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>rest</artifactId>
			<version>5.1.2</version>
</dependency>
<!--Jackson jar for mapping json to Java -->
<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.8.5</version>
</dependency>

Sample Code

import java.io.IOException;
import java.util.Collections;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import com.fasterxml.jackson.databind.ObjectMapper;

public class ElasticsearchRESTIndexClient {

	public static void main(String[] args) {
		IndexInfo []indexArr = null;
		RestClient client = null;
		try {
			client = openConnection();
			if (client != null) {
				// performRequest GET method will retrieve all index detail list
				// information from elastic server
				Response response = client.performRequest("GET", "/_cat/indices?format=json",
						Collections.singletonMap("pretty", "true"));
				// GetEntity api will return content of response in form of json
				// in Http Entity
				HttpEntity entity = response.getEntity();
				ObjectMapper jacksonObjectMapper = new ObjectMapper();
				// Map json response to Java object in IndexInfo Array
				// Cluster Info
				indexArr = jacksonObjectMapper.readValue(entity.getContent(), IndexInfo[].class);
				for(IndexInfo indexInfo:indexArr)
				{
				System.out.println(indexInfo);
			    }
			}

		} catch (Exception ex) {
			System.out.println("Exception found while getting cluster detail");
			ex.printStackTrace();
		} finally {
			closeConnnection(client);
		}

	}

	// Get Rest client connection
	private static RestClient openConnection() {
		RestClient client = null;
		try {
			final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
			credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userid", "password"));
			client = RestClient.builder(new HttpHost("elasticHost", Integer.parseInt("9200")))
					.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
						// Customize connection as per requirement
						public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
							return httpClientBuilder
									// Credentials
									.setDefaultCredentialsProvider(credentialsProvider)
									// Proxy
									.setProxy(new HttpHost("proxyServer", 8080));

						}
					}).setMaxRetryTimeoutMillis(60000).build();

		} catch (Exception ex) {
			ex.printStackTrace();
		}
		return client;
	}

	// Close Open connection
	private static void closeConnnection(RestClient client) {
		if (client != null) {
			try {
				client.close();
			} catch (IOException ex) {
				ex.printStackTrace();
			}
		}
	}

}

Index Info Object where JSON index detail will map

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class IndexInfo {
@JsonProperty(value = "health")
private String health;
@JsonProperty(value = "index")
private String indexName;
@JsonProperty(value = "status")
private String status;
@JsonProperty(value = "pri")
private int shards;
@JsonProperty(value = "rep")
private int replica;
@JsonProperty(value = "pri.store.size")
private String dataSize;
@JsonProperty(value = "store.size")
private String totalDataSize;
@JsonProperty(value = "docs.count")
private String documentCount;

@Override
public String toString()
{
	StringBuffer str=new StringBuffer(60);
	str.append("{\n");
	str.append("    \"").append("indexName").append("\":\"").append(indexName).append("\",\n");
	str.append("    \"").append("health").append("\":\"").append(health).append("\",\n");
	str.append("    \"").append("status").append("\":\"").append(status).append("\",\n");
	str.append("    \"").append("shards").append("\":\"").append(shards).append("\",\n");
	str.append("    \"").append("replica").append("\":\"").append(replica).append("\",\n");
	str.append("    \"").append("dataSize").append("\":\"").append(dataSize).append("\",\n");
	str.append("    \"").append("totalDataSize").append("\":\"").append(totalDataSize).append("\",\n");
	str.append("    \"").append("documentCount").append("\":\"").append(documentCount).append("\"\n");
	str.append("    \"");
	return str.toString();
}
public String getIndexName() {
	return indexName;
}
public void setIndexName(String indexName) {
	this.indexName = indexName;
}
public int getShards() {
	return shards;
}
public void setShards(int shards) {
	this.shards = shards;
}
public int getReplica() {
	return replica;
}
public void setReplica(int replica) {
	this.replica = replica;
}
public String getDataSize() {
	return dataSize;
}
public void setDataSize(String dataSize) {
	this.dataSize = dataSize;
}
public String getTotalDataSize() {
	return totalDataSize;
}
public void setTotalDataSize(String totalDataSize) {
	this.totalDataSize = totalDataSize;
}
public String getDocumentCount() {
	return documentCount;
}
public void setDocumentCount(String documentCount) {
	this.documentCount = documentCount;
}
public String getStatus() {
	return status;
}
public void setStatus(String status) {
	this.status = status;
}
public String getHealth() {
	return health;
}
public void setHealth(String health) {
	this.health = health;
}
}

Read More on Elasticsearch REST

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Elasticsearch REST JAVA Client for Cluster Detail


Below is example to get Cluster Detail in Java Object by using Elasticsearch REST Java client. Here client will call endpoint  “/_cluster/health” to retrieve all detail of index list. It is same as we use GET by CURL

GET http://elasticsearchHost:9200/_cluster/health

Pre-requisite

  • Minimum requirement for Java 7 version required.
  • Add below dependency for Elasticsearch REST and JSON Mapping in your pom.xml or add in your class path.

Dependency

<!--Elasticsearch REST jar-->
<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>rest</artifactId>
			<version>5.1.2</version>
</dependency>
<!--Jackson jar for mapping json to Java -->
<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.8.5</version>
</dependency>

Sample Code

import java.io.IOException;
import java.util.Collections;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import com.fasterxml.jackson.databind.ObjectMapper;

public class ElasticsearchRESTClusterClient {

	public static void main(String[] args) {
		ClusterInfo clusterHealth = null;
		RestClient client = null;
		try {
			client = openConnection();
			if (client != null) {
				// performRequest GET method will retrieve all cluster health
				// information from elastic server
				Response response = client.performRequest("GET", "/_cluster/health",
						Collections.singletonMap("pretty", "true"));
				// GetEntity api will return content of response in form of json
				// in Http Entity
				HttpEntity entity = response.getEntity();
				ObjectMapper jacksonObjectMapper = new ObjectMapper();
				// Map json response to Java object in ClusterInfo
				// Cluster Info
				clusterHealth = jacksonObjectMapper.readValue(entity.getContent(), ClusterInfo.class);
				System.out.println(clusterHealth);
			}

		} catch (Exception ex) {
			System.out.println("Exception found while getting cluster detail");
			ex.printStackTrace();
		} finally {
			closeConnnection(client);
		}

	}

	// Get Rest client connection
	private static RestClient openConnection() {
		RestClient client = null;
		try {
			final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
			credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userId", "password"));
			client = RestClient.builder(new HttpHost("elasticHost", Integer.parseInt("9200")))
					.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
						// Customize connection as per requirement
						public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
							return httpClientBuilder
									// Credentials
									.setDefaultCredentialsProvider(credentialsProvider)
									// Proxy
									.setProxy(new HttpHost("ProxyServer", 8080));

						}
					}).setMaxRetryTimeoutMillis(60000).build();

		} catch (Exception ex) {
			ex.printStackTrace();
		}
		return client;
	}

	// Close Open connection
	private static void closeConnnection(RestClient client) {
		if (client != null) {
			try {
				client.close();
			} catch (IOException ex) {
				ex.printStackTrace();
			}
		}
	}

}

Cluster Info Java Object where retrieve json response will map.

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusterInfo {

@JsonProperty(value = "cluster_name")
private String clusterName;
@JsonProperty(value = "status")
private String clusterStatus;
@JsonProperty(value = "active_primary_shards")
private int primaryActiveShards;
@JsonProperty(value = "active_shards")
private int activeShards;
@JsonProperty(value = "delayed_unassigned_shards")
private int delayedUnAssignedShards;
@JsonProperty(value = "unassigned_shards")
private int unAssignedShards;
@JsonProperty(value = "initializing_shards")
private int initializingShards;
@JsonProperty(value = "relocating_shards")
private int relocatingShards;
@JsonProperty(value = "number_of_nodes")
private int totalNodeCount;
@JsonProperty(value = "number_of_data_nodes")
private int dataNodeCount;

@Override
public String toString()
{
	StringBuffer str=new StringBuffer(60);
	str.append("{\n");
	str.append("    \"").append("clusterName").append("\":\"").append(clusterName).append("\",\n");
	str.append("    \"").append("clusterStatus").append("\":\"").append(clusterStatus).append("\",\n");
	str.append("    \"").append("primaryActiveShards").append("\":\"").append(primaryActiveShards).append("\",\n");
	str.append("    \"").append("activeShards").append("\":\"").append(activeShards).append("\",\n");
	str.append("    \"").append("delayedUnAssignedShards").append("\":\"").append(delayedUnAssignedShards).append("\",\n");
	str.append("    \"").append("unAssignedShards").append("\":\"").append(unAssignedShards).append("\",\n");
	str.append("    \"").append("initializingShards").append("\":\"").append(initializingShards).append("\",\n");
	str.append("    \"").append("relocatingShards").append("\":\"").append(relocatingShards).append("\",\n");
	str.append("    \"").append("totalNodeCount").append("\":\"").append(totalNodeCount).append("\",\n");
	str.append("    \"").append("dataNode").append("\":\"").append(dataNodeCount).append("\"");
	str.append("    \"");
	return str.toString();
}

public String getClusterName() {
	return clusterName;
}
public void setClusterName(String clusterName) {
	this.clusterName = clusterName;
}
public String getClusterStatus() {
	return clusterStatus;
}
public void setClusterStatus(String clusterStatus) {
	this.clusterStatus = clusterStatus;
}
public int getPrimaryActiveShards() {
	return primaryActiveShards;
}
public void setPrimaryActiveShards(int primaryActiveShards) {
	this.primaryActiveShards = primaryActiveShards;
}
public int getActiveShards() {
	return activeShards;
}
public void setActiveShards(int activeShards) {
	this.activeShards = activeShards;
}
public int getDelayedUnAssignedShards() {
	return delayedUnAssignedShards;
}
public void setDelayedUnAssignedShards(int delayedUnAssignedShards) {
	this.delayedUnAssignedShards = delayedUnAssignedShards;
}
public int getUnAssignedShards() {
	return unAssignedShards;
}
public void setUnAssignedShards(int unAssignedShards) {
	this.unAssignedShards = unAssignedShards;
}
public int getInitializingShards() {
	return initializingShards;
}
public void setInitializingShards(int initializingShards) {
	this.initializingShards = initializingShards;
}
public int getRelocatingShards() {
	return relocatingShards;
}
public void setRelocatingShards(int relocatingShards) {
	this.relocatingShards = relocatingShards;
}
public int getDataNodeCount() {
	return dataNodeCount;
}
public void setDataNodeCount(int dataNodeCount) {
	this.dataNodeCount = dataNodeCount;
}
public int getTotalNodeCount() {
	return totalNodeCount;
}
public void setTotalNodeCount(int totalNodeCount) {
	this.totalNodeCount = totalNodeCount;
}
}

Read More on Elasticsearch REST

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana