Elasticsearch is a provider of Spring Boot for handling search and CRUD operations in Elastic Search. Elasticsearch is a Full Text search engine and also provide REST based Elasticsearch APIs for operations. To use Elastic Search in your Spring boot application you have to add this Elasticsearch starter in your pom.xml .
After adding Elasticsearch starter in your application it will automatically download and add the required dependencies in your application and initialize with default values. You can overwrite these values through application.properties / application.yaml .
ElasticSearch Configuration Properties
Spring Boot load these properties in ElasticsearchProperties class.
Name
Default Value
Description
spring.data.elasticsearch.cluster-name
elasticsearch
cluster name.
spring.data.elasticsearch.cluster-nodes
Comma-separated cluster node addresses. If not specified, starts a client node.
spring.data.elasticsearch.properties.*
Additional properties used to configure the client.
spring.data.elasticsearch.repositories.enabled
true
Enable Elasticsearch repositories.
Elastic Search Spring Boot Properties
JEST (Elasticsearch HTTP client) Configuration Properties
Spring Boot load these properties in JestProperties class.
Here is you will know about configuration for Elasticsearch Ingest Node, Creation of pipeline and processors for Ingest Node. You will see to configuration for filebeat to shipped logs to Ingest Node.
Below is some sample logs line which will be shipped through filebeat to Elasticsearch Ingest Node.
Sample Logs
2016-06-01 05:14:51,921 INFO main [com.mchange.v2.log.MLog] - MLog clients using log4j logging.
2016-06-01 05:14:52,065 INFO main [com.mchange.v2.c3p0.C3P0Registry] - Initializing c3p0-0.9.1.2 [built 21-May-2007 15:04:56; debug? true; trace: 10]
2016-06-01 05:14:52,728 INFO net.sf.ehcache.CacheManager@204119 [net.sf.ehcache.util.UpdateChecker] - New update(s) found: 2.4.7 [http://www.terracotta.org/confluence/display/release/Release+Notes+Ehcache+Core+2.4]. Please check http://ehcache.org for the latest version.
2016-06-01 05:14:53,198 WARN main [com.ibatis.sqlmap.engine.builder.xml.SqlMapParser] - Duplicate -include 'rowSubSelectEnd' found.
2016-06-01 05:14:53,199 WARN main [com.ibatis.sqlmap.engine.builder.xml.SqlMapParser] - Duplicate -include 'activitiesSelect' found.
2016-06-01 05:14:56,172 ERROR main [com.fsvps.clientPortal.service.clientReport.ReportsServiceImpl] - Invalid/unreadable reportsBasePath dir: C:homeclientReportsBase
2016-06-01 05:15:51,784 INFO main [com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource] - Initializing c3p0 pool... com.mchange.v2.c3p0.ComboPooledDataSource [ acquireIncrement -> 1, acquireRetryAttempts -> 0, acquireRetryDelay -> 1000, autoCommitOnClose -> false, automaticTestTable -> null, breakAfterAcquireFailure -> false, checkoutTimeout -> 0, connectionCustomizerClassName -> null, connectionTesterClassName -> com.mchange.v2.c3p0.impl.DefaultConnectionTester, dataSourceName -> 2x2tdj9h36ggie1v52d93|144b457, debugUnreturnedConnectionStackTraces -> false, description -> null, driverClass -> oracle.jdbc.driver.OracleDriver, factoryClassLocation -> null, forceIgnoreUnresolvedTransactions -> false, identityToken -> 2x2tdj9h36ggie1v52d93|144b457, idleConnectionTestPeriod -> 140, initialPoolSize -> 0, jdbcUrl -> jdbc:oracle:thin:@10.226.168.6:1521:FSVD2, maxAdministrativeTaskTime -> 0, maxConnectionAge -> 0, maxIdleTime -> 150, maxIdleTimeExcessConnections -> 0, maxPoolSize -> 20, maxStatements -> 0, maxStatementsPerConnection -> 0, minPoolSize -> 1, numHelperThreads -> 3, numThreadsAwaitingCheckoutDefaultUser -> 0, preferredTestQuery -> select 1, properties -> {user=******, password=******}, propertyCycle -> 0, testConnectionOnCheckin -> false, testConnectionOnCheckout -> false, unreturnedConnectionTimeout -> 0, usesTraditionalReflectiveProxies -> false ]
2016-06-01 05:16:13,317 INFO com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0 [com.mchange.v2.resourcepool.BasicResourcePool] - An exception occurred while acquiring a poolable resource. Will retry.
java.sql.SQLException: The Network Adapter could not establish the connection
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:412)
at oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:531)
at oracle.jdbc.driver.T4CConnection.(T4CConnection.java:221)
at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:32)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:503)
at com.mchange.v2.c3p0.DriverManagerDataSource.getConnection(DriverManagerDataSource.java:134)
at com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:182)
at com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:171)
at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool$1PooledConnectionResourcePoolManager.acquireResource(C3P0PooledConnectionPool.java:137)
at com.mchange.v2.resourcepool.BasicResourcePool.doAcquire(BasicResourcePool.java:1014)
at com.mchange.v2.resourcepool.BasicResourcePool.access$800(BasicResourcePool.java:32)
at com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask.run(BasicResourcePool.java:1810)
at com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:547)
Caused by: oracle.net.ns.NetException: The Network Adapter could not establish the connection
at oracle.net.nt.ConnStrategy.execute(ConnStrategy.java:359)
at oracle.net.resolver.AddrResolution.resolveAndExecute(AddrResolution.java:422)
at oracle.net.ns.NSProtocol.establishConnection(NSProtocol.java:672)
at oracle.net.ns.NSProtocol.connect(NSProtocol.java:237)
at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1042)
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:301)
... 12 more
Caused by: java.net.ConnectException: Connection timed out: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at oracle.net.nt.TcpNTAdapter.connect(TcpNTAdapter.java:141)
at oracle.net.nt.ConnOption.connect(ConnOption.java:123)
at oracle.net.nt.ConnStrategy.execute(ConnStrategy.java:337)
... 17 more
2016-06-01 05:16:13,317 INFO com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2 [com.mchange.v2.resourcepool.BasicResourcePool] - An exception occurred while acquiring a poolable resource. Will retry.
java.sql.SQLException: The Network Adapter could not establish the connection
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:412)
at oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:531)
at oracle.jdbc.driver.T4CConnection.(T4CConnection.java:221)
at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:32)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:503)
at com.mchange.v2.c3p0.DriverManagerDataSource.getConnection(DriverManagerDataSource.java:134)
at com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:182)
at com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:171)
at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool$1PooledConnectionResourcePoolManager.acquireResource(C3P0PooledConnectionPool.java:137)
at com.mchange.v2.resourcepool.BasicResourcePool.doAcquire(BasicResourcePool.java:1014)
at com.mchange.v2.resourcepool.BasicResourcePool.access$800(BasicResourcePool.java:32)
at com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask.run(BasicResourcePool.java:1810)
at com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:547)
Caused by: oracle.net.ns.NetException: The Network Adapter could not establish the connection
at oracle.net.nt.ConnStrategy.execute(ConnStrategy.java:359)
at oracle.net.resolver.AddrResolution.resolveAndExecute(AddrResolution.java:422)
at oracle.net.ns.NSProtocol.establishConnection(NSProtocol.java:672)
at oracle.net.ns.NSProtocol.connect(NSProtocol.java:237)
at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1042)
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:301)
... 12 more
Filebeat Configuration
To check this example put above sample logs in file app.log and configure filebeat.yaml file as below:
Prospectors Configuration
filebeat.prospectors:
- type: log
- D:your-directory-path*.log*
#Multiline configutaion as per above logs to stack trace
multiline.pattern: '^[([0-9]{4})-([0-9]{2})-([0-9]{2})'
multiline.negate: true
multiline.match: after
#Elasticsearch Ingest Node Pipe Line configuration
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
pipeline: grok-pipeline
Below is complete sample filebeat.yaml file
######Filebeat Configuration Example ########
#=========================== Filebeat prospectors =============================
filebeat.prospectors:
# Below are the prospector specific configurations.
- type: log
# Change to true to enable this prospector configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- D:fsv_logs*.log*
#- c:programdataelasticsearchlogs*
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz
In above filbeat.yaml you can check for configuration for filbeat, multiline processing logs and Ingest Node pipe line.
Ingest Node pipeline Creation and configure Processors
You can execute below commands by using curl or Kibana Adsense /Dev tools to create pipeline to execute processors.
Here I have configured below processors with in pipeline:
grok processor : grok processor will parse logs message to fields values which will help to do analysis.
date processor : date processor will change @timestamp values corresponding timestamp of each logs line. Which will help while indexing and sorting of logs based on timestamp.
date_index_name : date_index_name processor will create elasticsearch indexes with suffix as date so that easily maintain indexes as per durability.
# Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1 ### Multiline options # Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation # The regexp Pattern that has to be matched all lines starting with [ multiline.pattern: '^[([0-9]{4})-([0-9]{2})-([0-9]{2})' # Defines if the pattern set under pattern should be negated or not. Default is false. multiline.negate: true # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash multiline.match: after #============================= Filebeat modules =============================== filebeat.config.modules: # Glob pattern for configuration loading path: ${path.config}modules.d*.yml # Set to true to enable config reloading reload.enabled: false # Period on which files under path should be checked for changes #reload.period: 10s #==================== Elasticsearch template setting ========================== setup.template.settings: index.number_of_shards: 3 #index.codec: best_compression #_source.enabled: false #================================ Outputs ===================================== # Configure what output to use when sending the data collected by the beat. #-------------------------- Elasticsearch output ------------------------------ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] pipeline: grok-pipeline # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme"
In above filbeat.yaml you can check for configuration for filbeat, multiline processing logs and Ingest Node pipe line.
Ingest Node pipeline Creation and configure Processors
You can execute below commands by using curl or Kibana Adsense /Dev tools to create pipeline to execute processors.
Here I have configured below processors with in pipeline:
grok processor : grok processor will parse logs message to fields values which will help to do analysis.
date processor : date processor will change @timestamp values corresponding timestamp of each logs line. Which will help while indexing and sorting of logs based on timestamp.
date_index_name : date_index_name processor will create elasticsearch indexes with suffix as date so that easily maintain indexes as per durability.
Ingest node use to pre-process documents before the actual document indexing
happens. The ingest node intercepts bulk and index requests, it applies transformations, and it then passes the documents back to the index or bulk APIs.
Logstash
Logstash is a server-side data processing pipeline that ingests data from multiple sources simultaneously, transforms it, and then sends it to different output sources like Elasticsearch, Kafka Queues, Databases etc.
Filebeat
Filebeat is lightweight log shipper which reads logs from thousands of logs files and forward those log lines to centralize system like Kafka topics to further processing on Logstash, directly to Logstash or Elasticsearch search.
There is overlap in functionality between Elasticsearch Ingest Node , Logstash and Filebeat.All have there weakness and strength based on architectures and area of uses. You cam also integrate all of these Filebeat, Logstash and Elasticsearch Ingest node by minor configuration to optimize performance and analyzing of data.
Below are some key points to compare Elasticsearch Ingest Node , Logstash and Filebeat.
Points
Elasticsearch Ingest Node
Logstash
Filebeat
Data In and Out
As ingest node runs as pipeline within the indexing flow in Elasticsearch, data has to be pushed to it
through bulk or indexing requests and configure pipeline processors process documents before indexing of actively writing data
to Elasticsearch.
Logstash supports wide variety of input and output plugins. It can act as middle server to accept pushed data from clients over TCP, UDP and HTTP and filebeat, message queues and databases.
It parse and process data for variety of output sources e.g elasticseach, message queues like Kafka and RabbitMQ or long term data analysis on S3 or HDFS.
Filebeat specifically to shipped logs files data to Kafka, Logstash or Elasticsearch.
Queuing
Elasticsearch Ingest Node is not having any built in queuing mechanism in to pipeline processing.
If the data nodes are not able to accept data, the ingest node will stop accepting data as well.
Logstash provide persistent queuing feature mechanism features by storing on disk.
Filebeat provide queuing mechanism with out data loss.
Back-pressure
Clients pushing data to ingest node need to be able to handle back-pressure by queuing data In case elasticsearch is not reachable or able to accept data for extended period otherwise there would be data loss.
Logstash provide at least once delivery guarantees and buffer data locally through ingestion spikes.
Filebeat designed architecture like that with out losing single bit of log line if out put systems like kafka, Logstash or Elasticsearch not available
Data Processing
Ingest node comes around 20 different processors, covering the functionality of
the most commonly used Logstash plugins.
Ingest node have some limitation like pipeline can only work in the context of a single event. Processors are
also generally not able to call out to other systems or read data from disk. It’s also not having filters as in beats and logstash. Logstash has a larger selection of plugins to choose from. This includes
plugins to add or transform content based on lookups in configuration files,
Elasticsearch, Beats or relational databases.
Logstash support filtering out and dropping events based on
configurable criteria.
Beats support filtering out and dropping events based on
configurable criteria.
Configuration
“Each document can only be processed by a single pipeline when passing through the ingest node.
“
“Logstash supports to define multiple logically separate pipelines by conditional control flow s to handle complex and multiple data formats.
Logstash is easier to measuring and optimizing performance of the pipeline to supports monitoring and resolve potential issues quickly by excellent pipeline viewer UI.
“
Minor configuration to read , shipping and filtering of data. But limitation with parsing.
Specialization
Ingest Node pipeline processed data before doing indexing on elasticsearch.
Its middle server to parse process and filter data from multiple input plugins and send processes data to output plugins.
Specific to read and shipped logs from different servers to centralize location on Elasticsearch, Kafka and if require parsing processed through Logstash.
Integration
Logstash supports sending data to an Ingest Pipeline. Ingest node can accept data from Filebeat and Logstash etc, Filebeat can send data to Logstash , Elasticsearch Ingest Node or Kafka.
java.lang.NoClassDefFoundError is sub class of LinkageError. This exception thrown if Java Virtual Machine or a ClassLoader instance tries to load in the definition of a class (as part of a normal method call or as part of creating a new instance using the new expression) and no definition of the class could be found.
The searched-for class definition existed when the currently executing class was compiled, but the definition can no longer be found.
Constructor
NoClassDefFoundError(): Constructs a NoClassDefFoundError with no detail message.
NoClassDefFoundError(String s): Constructs a NoClassDefFoundError with specific detail message.
Exception Stack Trace
Exception in thread "elasticsearch[2befByj][clusterService#updateTask][T#1]" java.lang.NoClassDefFoundError: org/apache/logging/log4j/core/config/Configurator
at org.elasticsearch.common.logging.Loggers.setLevel(Loggers.java:149)
at org.elasticsearch.common.logging.Loggers.setLevel(Loggers.java:144)
at org.elasticsearch.index.SearchSlowLog.setLevel(SearchSlowLog.java:111)
at org.elasticsearch.index.SearchSlowLog.(SearchSlowLog.java:106)
at org.elasticsearch.index.IndexModule.(IndexModule.java:127)
at org.elasticsearch.indices.IndicesService.createIndexService(IndicesService.java:440)
at org.elasticsearch.indices.IndicesService.createIndex(IndicesService.java:413)
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService$1.execute(MetaDataCreateIndexService.java:378)
at org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterStateUpdateTask.java:45)
at org.elasticsearch.cluster.service.ClusterService.executeTasks(ClusterService.java:634)
at org.elasticsearch.cluster.service.ClusterService.calculateTaskOutputs(ClusterService.java:612)
at org.elasticsearch.cluster.service.ClusterService.runTasks(ClusterService.java:571)
at org.elasticsearch.cluster.service.ClusterService$ClusterServiceTaskBatcher.run(ClusterService.java:263)
at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)
at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:575)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:247)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:210)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.logging.log4j.core.config.Configurator
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 21 more
Issue
Lets focus on particularly on this exception. This exception was occurred because trying to use Spring Boot with Elasticsearch framework. ElasticsearchTemaplate framework internally use log4j dependency which was not included on pom.xml
Solutions
To resolve this issue include below dependency in you pom.xml to add log4j2 jar to your application.
Top 50 Elasticsearch Frequently Asked Interview Questions are collected based on my Interview Experience on ELK (Elasticsearch, Logstash and Kibana) with different Organization. I have divided these question in three categories as below .
Elasticsearch Overview Questions and Answers.
Basic Concepts and Terminology Questions and Answers.
Advance and Practical Questions and Answers.
Elasticsearch Overview Questions and Answers
1.What is Elasticsearch?
“Elasticsearch is Open source, cross-paltform, scalable, full-text search and analytical engine based on Apache Lucene technology. It help in NRT (Near Real Time) analysis and full text search on big volume of data on distributed clustered environment.”
Elasticsearch is developed by Apache in Java Language.
Elasticsearch store records in form of JSON documents as key and value.
By Default Schema free if required schema can added by mapping from client app.
Access by HTTP over the browser, by application through Elasticsearch REST Client API or Elasticsearch Transport Client.
Elasticsearch Organization provide some application and plug-in for making Elasticsearch more useful like Kibana for doing search and Analysis by different charts and Dashboard.
2. What are the advantages of Elasticsearch?
Elasticsearch is implemented on Java, which makes it compatible on almost every platform.
Elasticsearch is Near Real Time (NRT), in other words after one second the added document is searchable in this engine.
Elasticsearch cluster is distributed, which makes it easy to scale and integrate in any big organizations.
Creating full backups of data are easy by using the concept of gateway, which is present in Elasticsearch.
Elasticsearch REST uses JSON objects as responses, which makes it possible to invoke the Elasticsearch server with a large number of different programming languages.
Elasticsearch supports almost every document type except those that do not support text rendering.
Handling multi-tenancy is very easy in Elasticsearch when compared to Apache Solr.
3. What are the Disadvantages of Elasticsearch?
Elasticsearch does not have multi-language support in terms of handling request and response data in JSON while in Apache Solr, where it is possible in CSV, XML and JSON formats.
Elasticsearch have a problem of Split Brain situations, but in rare cases.
4. What is difference and Similarities between NoSQL MongoDB and Elasticsearch?
Elasticsearch is Apache Lucene based RESTful NRT(Near Real Time) search and analytics engine while MongoDB is an open source document-oriented Database Management System.
Similarities
Certain features are common between both products like Document-oriented Store, Schema free, Distributed Data Storage, High-Availability, Sharding, Replication etc.
Difference
There are many differences between both products as below
Type
Elasticsearch
MongoDB
Indexing
Uses Apache Lucene for indexing.
Real-time indexing and searching power from Lucene, which allows creation of index on every field of a document by default.
Based on traditional B+ Tree.
Define the index, which improves query performance, but affects write operations.
Language
Implemented in Java
Implemented in C++
Documents
Stores JSON documents
Stores them in BSON (Binary JSON) format. (though, it looks same like a JSON document to the end user)
REST Interface
RESTful
Not RESTful
Map Reduce
Not Support MapReduce
Allow Map Reduce Operation
Huge Data
Store and Retrieve Huge Data
Store and Search Huge Data
5. What are common area of use Elasticsearch?
It’s useful in application where need to do analysis, statics and need to find out anomalies on data based on pattern.
It’s useful where need to send alerts when particular condition matched like stock market, exception from logs etc.
It’s useful with application where log analysis and issue solution provide because of full search in billions of records in milliseconds.
It’s compatible with application like Filebeat, Logstash and Kibana for storage of high volume data for analysis and visualize in form of chart and dashboards.
6. What are operations can be performed on Elasticsearch Documents?
Elasticsearch perform some basic operations like:
Indexing
Searching
Fetching
Updating
Delete Documents.
Basic Concepts and Terminology Questions and Answers
7. What is Elasticsearch Cluster ?
Cluster is a collection of one or more nodes which provide capabilities to search text on scattered data on nodes. It’s identified by unique name with in network so that all associated nodes will join together by cluster name.
Operation Persistent : Cluster also maintain keep records of all transaction level changes for schema if anything get change in data for index and track of availability of Nodes in cluster so that make data easily available if any fail-over of any node.
Elasticsearch Cluster
In above screen Elasticsearch cluster “FACING_ISSUE_IN_IT” having three master and four data node.
8.What is Elasticsearch Node?
Node is a Elasticsearch server which associate with in a cluster. It’s store data , help cluster for indexing data and search query. It’s identified by unique name in Cluster if name is not provided in elasticsearch will generate random Universally Unique Identifier(UUID) on time of server start.
A Cluster can have one or more Nodes .If first node start that will have Cluster with single node and when other node will start will add with that cluster.
Data Node Documents Storage
In above screen trying to represent data of two indexes like I1 and I2. Where Index I1 is having two type of documents T1 and T2 while index I2 is having only type T2 and these shards are distributes over all nodes in cluster. This data node is having documents of shard (S1) for Index I1 and shard (S3) for Index I2. It’s also keeping replica of documents of shards S2 of Index I2 and I1 which are store some other nodes in cluster.
9. What are types of Node in Elasticsearch?
With in Elasticsearch Cluster each Node know others Node based on configuration decide role/responsibility of each individual Node. Below are Elasticsearch Node Types.
Master-Eligible Node.
Data Node.
Ingest Node.
Tribe Node/Coordinating Node.
10. What is Master Node and Master Eligible Node in Elasticsearch?
Master Node control cluster wide operations like creating or deleting an index, tracking which nodes are part of the cluster, and deciding which shards to allocate to which nodes. It is important for cluster health to have a stable master node. Master Node elected based on configuration properties node.master=true (Default).
Master Eligible Node decide based on below configuration
discovery.zen.minimum_master_node : number (default 1)
and above number decide based (master_eligible_nodes / 2) + 1
11. What is Data Node in Elasticsearch?
Data nodes hold the shards/replica that contain the documents that was indexed. Data Nodes perform data related operation such as CRUD, search aggregation etc. Set node.data=true (Default) to make node as Data Node.
Data Node operations are I/O-, memory-, and CPU-intensive. It is important to monitor these resources and to add more data nodes if they are overloaded.The main benefit of having dedicated data nodes is the separation of the master and data roles.
12. What is Ingest Node in Elasticsearch?
Ingest nodes can execute pre-processing an ingest pipeline to a document in order to transform and enrich the document before indexing. With a heavy ingest load, it makes sense to use dedicated ingest nodes and to mark the master and data nodes as false and node.ingest=true.
13. What is Tribe Node and Coordinating Node in Elasticsearch?
Tribe node, is special type of node that coordinate to connect to multiple clusters and perform search and others operation across all connected clusters. Tribe Node configured by settings tribe.*.
Coordinating Node behave like Smart Load balancer which able to handle master duties, to hold data, and pre-process documents, then you are left with a coordinating node that can only route requests, handle the search reduce phase, and distribute bulk indexing.
Every node is implicitly a coordinating node. This means that a node that has all three node.master, node.data and node.ingest set to false will only act as a coordinating node, which cannot be disabled. As a result, such a node needs to have enough memory and CPU in order to deal with the gather phase.
14. What is Index in Elasticsearch?
An Index is collection of documents with same characteristics which stores on nodes in distributed fashion and its identify by unique name on which perform different operations like insert , search query, update and delete for documents. A cluster can have as many indexes with unique name.
A document store in Index and assigned a type to it and an Index can have multiple types of documents.
15. What is Shards in Elasticsearch?
Shards are partitions of indexes scattered on nodes in order to make scal. It provide capability to store large amount (billions) of documents for same index to store in cluster even one disk of node is not capable to store it. Shards also maintain Inverted Index of documents token to make full-text search fast.
16. What is Replica in Elasticsearch?
Replica is copy of shard which store on different node or same node. A shard can have zero or more replica. If shard on one node then replica of shard will store on another node.
17. What are Benefits of Shards and Replica in Elasticsearch?
Shards splits indexes in horizontal partition for high volumes of data.
It perform operations parallel to each shards or replica on multiple node for index so that increase system performance and throughput.
Recovered easily in case of fail-over of node because data replica exist on another node because replica always store on different node where shards exist.
Some Important Points:
When we create index by default elasticseach index configure as 5 shards and 1 replica but we can configure it from config/elasticsearch.yml file or by passing shards and replica values in mapping when index create.
Once index created we can’t change shards configuration but modify in replica. If need to update in shards only option is re-indexing.
Each Shard itself a Lucene index and it can keep max 2,147,483,519 (= Integer.MAX_VALUE – 128) documents. For merging of search results and failover taken care by elasticsearch cluster.
18. What is Document in Elasticsearch?
Each Record store in index is called a document which store in JSON object. Document is Similar to row in term of RDBMS only difference is that each document will have different number of fields and structure but common fields should have same data type.
19. What is a Type in Elasticsearch ?
Type is logical category/grouping/partition of index whose semantics is completely up to user and type will always have same number of columns for each documents.
ElasticSearch => Indices => Types => Documents with Fields/Properties
20. What is a Document Type in Elaticsearch?
A document type can be seen as the document schema / mapping definition, which has the mapping of all the fields in the document along with its data types.
21. What is indexing in ElasticSearch ?
The process of storing data in an index is called indexing in ElasticSearch. Data in ElasticSearch can be dividend into write-once and read-many segments. Whenever an update/modification is attempted, a new version of the document is written to the index.
22. What is inverted index in Elasticsearch ?
Inverted Index is backbone of Elasticsearch which make full-text search fast. Inverted index consists of a list of all unique words that occurs in documents and for each word, maintain a list of documents number and positions in which it appears.
For Example : There are two documents and having content as :
1: FacingIssuesOnIT is for ELK.
2: If ELK check FacingIssuesOnIT.
To make inverted index each document will split in words (also called as terms or token) and create below sorted index .
Term Doc_1 Doc_2
-------------------------
FacingIssuesOnIT | X | X
is | X |
for | X |
ELK | X | X
If | | X
check | | X
Now when we do some full-text search for String will sort documents based on existence and occurrence of matching counts .
Usually in Books we have inverted indexes on last pages. Based on the word we can thus find the page on which the word exists.
23. What is an Analyzer in ElasticSearch ?
While indexing data in Elastic Search, data is transformed internally by the Analyzer defined for the index, and then indexed. An analyzer is building block of character filters, tokenizers and token filters. Following types of Built-in Analyzers are available in Elasticsearch 5.6.
Analyzer
Description
Standard Analyzer
Divides text into terms on word boundaries, as defined by the Unicode Text Segmentation algorithm. It removes most punctuation, lower cases terms, and supports removing stop words.
Simple Analyzer
Divides text into terms whenever it encounters a character which is not a letter. It lower cases all terms.
White space Analyzer
Divides text into terms whenever it encounters any white space character. It does not lowercase terms.
Stop Analyzer
It is like the simple analyzer, but also supports removal of stop words.
Keyword Analyzer
A “noop” analyzer that accepts whatever text it is given and outputs the exact same text as a single term.
Pattern Analyzer
Uses a regular expression to split the text into terms. It supports lower-casing and stop words.
Language Analyzer
Elasticsearch provides many language-specific analyzers like English or French.
Finger Print Analyzer
A specialist analyzer which creates a fingerprint which can be used for duplicate detection.
24. What is a Tokenizer in ElasticSearch ?
A tokenizer receives a stream of characters, breaks it up into individual tokens (usually individual words), and outputs a stream of tokens. Inverted indexes are created and updates using these token values by recording the order or position of each term and the start and end character offsets of the original word which the term represents.
An analyzer must have exactly one Tokenizer.
25. What is Character Filter in Elasticsearch Analyzer?
A character filter receives the original text as a stream of characters and can transform the stream by adding, removing, or changing characters. For instance, a character filter could be used to convert Hindu-Arabic numerals (٠١٢٣٤٥٦٧٨٩) into their Arabic-Latin equivalents (0123456789), or to strip HTML elements like from the stream.
An analyzer may have zero or more character filters, which are applied in order.
26.What is Token filters in Elasticsearch Analyzer?
A token filter receives the token stream and may add, remove, or change tokens. For example, a lowercase token filter converts all tokens to lowercase, a stop token filter removes common words (stop words) like the from the token stream, and a synonym token filter introduces synonyms into the token stream.
Token filters are not allowed to change the position or character offsets of each token.
An analyzer may have zero or more token filters, which are applied in order.
27. What are Type of Token Filters in Elasticsearch Analyzer?
Elasticsearch have number of built in Token filters which can use in custom filters.
28. What is the is use of attributes- enabled, index and store ?
The enabled attribute applies to various ElasticSearch specific/created fields such as _index and _size. User-supplied fields do not have an “enabled” attribute.
Store means the data is stored by Lucene will return this data if asked. Stored fields are not necessarily searchable. By default, fields are not stored, but full source is. Since you want the defaults (which makes sense), simply do not set the store attribute.
The index attribute is used for searching. Only indexed fields can be searched. The reason for the differentiation is that indexed fields are transformed during analysis, so you cannot retrieve the original data if it is required.
29.What is the query language of ElasticSearch ?
Elasticsearch uses the Apache Lucene query language, which is called as Query DSL.
30. Does Elasticsearch have a schema ?
Yes, Elasticseach can have mappings which can be used to enforce schema on documents. We define Elasticsearch Index Schema by defining Mappings.
Advance and Practical Interview Questions and Answers
31.What are Scripting Languages Support by Elasticsearch?
Elasticsearch supports custom scripting available in Lucene Expression, Groovy, Python,Java Script and Painless.
32. What is Painless and their benefits in Elasticsearch?
Painlessis a simple, secure scripting language designed specifically for use with Elasticsearch 5.XX . It is the default scripting language for Elasticsearch and can safely be used for inline and stored scripts. Painless use anywhere scripts can be used in Elasticsearch.
Benefits of Painless :
Fast performance: Painless scripts run several times faster than the alternatives.
Safety: Fine-grained whitelist with method call/field granularity.
Optional typing: Variables and parameters can use explicit types or the dynamic def type.
Syntax: Extends Java’s syntax to provide Groovy-style scripting language features that make scripts easier to write.
Optimizations: Designed specifically for Elasticsearch scripting.
33. How to store Elasticsearch Node Data to external Directory?
By default in Elasticsearch data path location is $ES_HOME/data. Keeping data in external path from Elasticsearch directory is beneficial while doing upgrade or any modification of Elasticsearch so that no any data loss.
For pointing to external path there are two ways to do :
First : Set static path on elasticsearch.yml file as below .
path.data: /opt/app/FacingIssuesOnIT/data
Second : By Passing argument from command line while starting Elasticsearch.
33. What is Restore and Snapshot in Elasticsearch?
Snapshot : Snapshot is copy or backup of individual indices or an entire cluster into a remote repository like shared file system, S3, or HDFS. Snapshots are not archival because they can only be restored to versions of Elasticsearch that can read the index.
Steps to create Snapshot:
Setup Backup directory
PUT /_snapshot/facingIssueOnIT_bkp
{
"type":"fs","settings":{"compress":true,"location":"/mount/backups/facingIssueOnIT_bkp" }
}
Check status
GET /_snapshot/facingIssueOnIT_bkp
or
GET /_snapshot/_all
After registering repository create Snapshot of Cluster or Index as Below
For Cluster
PUT /_snapshot/facingIssueOnIT_bkp/snapshot_1?wait_for_completion=true
For indexes
PUT /_snapshot/facingIssueOnIT_bkp/snapshot_1
{"indices":"index_1,index_2","ignore_unavailable":true,"include_global_state":false}
wait_for_completion=true is use to complete prompt then only you can execute any other action for doing it in background use as false.
Restored : Restored is used to retrieve backup/snapshot indexes again in cluster. Restore can we done on cluster level and index level.
Cluster Level
POST /_snapshot/facingIssueOnIT_bkp/snapshot_1/_restore
Index Level
POST /_snapshot/facingIssueOnIT_bkp/snapshot_1/_restore
{"indices":"index_1,index_2","ignore_unavailable":true,"include_global_state":true,"rename_pattern":"index_(.+)","rename_replacement":"restored_index_$1"}
34. What is Elasticsearch REST API and use of it?
Elasticsearch provides a very comprehensive and powerful REST API that you can use to interact with your cluster. Among the few things that can be done with the API are as follows:
Check your cluster, node, and index health, status, and statistics
Administer your cluster, node, and index data and metadata
Perform CRUD (Create, Read, Update, and Delete) and search operations against your indexes
Execute advanced search operations such as paging, sorting, filtering, scripting, aggregations, and many others
To know about cluster health follow below URL over curl or on your browser.
GET /_cat/health?v
36. What are type of Cluster Health Status?
Green means everything is good (cluster is fully functional).
Yellow means all data is available but some replicas are not yet allocated (cluster is fully functional)
Red means some data is not available for whatever reason.
Note: that even if a cluster is red, it still is partially functional (i.e. it will continue to serve search requests from the available shards) but you will likely need to fix it ASAP since you have missing data
37. How to know Number of Nodes?
GET /_cat/nodes?v
Response:
ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
127.0.0.110554.46 mdi * PB2SGZY
Here, we can see our one node named “PB2SGZY”, which is the single node that is currently in our cluster.
38. How to get list of available Indices in Elasticsearch Cluster?
GET /_cat/indices?v
39. How to create Indexes?
PUT /customer?pretty
GET /_cat/indices?v
39. How to delete Index and records?
DELETE /customer?pretty
GET /_cat/indices?v
and
PUT /customer
PUT /customer/external/1{"name":"John Doe"}
GET /customer/external/1
DELETE /customer
If we study the above commands carefully, we can actually see a pattern of how we access data in Elasticsearch. That pattern can be summarized as follows:
<REST Verb>//<Type>/<ID>
This REST access pattern is so pervasive throughout all the API commands that if you can simply remember it, you will have a good head start at mastering Elasticsearch.
40. How to update record and document fields value in Index?
We’ve previously seen how we can index a single document. Let’s recall that command again:
PUT /customer/external/1?pretty
{"name":"John Doe"}
Again, the above will index the specified document into the customer index, external type, with the ID of 1. If we then executed the above command again with a different (or same) document, Elasticsearch will replace (i.e. reindex) a new document on top of the existing one with the ID of 1:
PUT /customer/external/1?pretty
{"name":"Jane Doe"}
The above changes the name of the document with the ID of 1 from “John Doe” to “Jane Doe”. If, on the other hand, we use a different ID, a new document will be indexed and the existing document(s) already in the index remains untouched.
PUT /customer/external/2?pretty
{"name":"Jane Doe"}
The above indexes a new document with an ID of 2.
When indexing, the ID part is optional. If not specified, Elasticsearch will generate a random ID and then use it to index the document. The actual ID Elasticsearch generates (or whatever we specified explicitly in the previous examples) is returned as part of the index API call.
This example shows how to index a document without an explicit ID:
POST /customer/external?pretty
{"name":"Jane Doe"}
Note that in the above case, we are using the POST verb instead of PUT since we didn’t specify an ID.
Read More
To read more on Elasticsearch Configuration, Sample Elasticsearch REST Clients, Search Queries Types with example follow link Elasticsearch Tutorial and Elasticsearch Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
Filebeat, Kafka, Logstash, Elasticsearch and Kibana Integration is used for big organizations where applications deployed in production on hundreds/thousands of servers and scattered around different locations and need to do analysis on data from these servers on real time.
This integration helps mostly for log level analysis , tracking issues, anomalies with data and alerts on events of particular occurrence and where accountability measures.
By using these technology provide scalable architecture to enhance systems and decoupled of each other individually.
Provide window to view Elasticsearch data in form different charts and dashboard.
Provide way searches and operation of data easily with respect to time interval.
Easily Imported by any web application by embedded dashboards.
How Data flow works ?
In this integration filebeat will install in all servers where your application is deployed and filebeat will read and ship latest logs changes from these servers to Kafka topic as configured for this application.
Logstash will subscribe log lines from kafka topic and perform parsing on these lines make relevant changes, formatting, exclude and include fields then send this processed data to Elasticsearch Indexes as centralize location from different servers.
Kibana is linked with Elasticsearch indexes which will help to do analysis by search, charts and dashboards .
Design Architecture
In below configured architecture considering my application is deployed on three servers and each server having current log file name as App1.log . Our goal is read real time data from these servers and do analysis on these data.
Steps to Installation, Configuration and Start
Here first we will install Kafka and Elasticsearch run individually rest of tools will install and run sequence to test with data flow. Initially install all in same machine and test with sample data with below steps and at end of this post will tell about what changes need to make according to your servers.
Kafka Installation, Configuration and Start
Elasticsearch Installation,Configuration and Start
Filebeat Installation,Configuration and Start
Logstash Installation,Configuration and Start
Kibana Installation,Start and display.
Pre-Requisite
These Filebeat,Logstash, Elasticsearch and Kibana versions should be compatible better use latest from https://www.elastic.co/downloads.
Java 8+
Linux Server
Filebeat 5.XX
Kafka 2.11.XX
Logstash 5.XX
Elasticsearch 5.XX
Kibana 5.XX
Note : Make sure JDK 8 should be install and JAVA_HOME environment variable point to JDK 8 home directory wherever you want in install Elasticsearch, Logstash,Kibana and Kafka.
Window : My computer ->right click-> Properties -> Advance System Settings->System Variable
Set JAVA_HOME
Linux : Go to your home directory/ sudo directory and below line as below .
For testing we will use these sample log line which is having debug as well as stacktrace of logs and grok parsing of this example is designed according to it. For real time testing and actual data you can point to your server log files but you have to modify grok pattern in Logstash configuration accordingly.
2013-02-28 09:57:56,662 WARN CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,663 INFO LMLogger - ERR1700 - u:null failures: 0 - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call: {}
java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
2013-02-28 10:04:35,723 INFO EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}
Create App1.log file in same machine where filebeat need to install and copy above logs lines in App1.log file.
Kafka Installation , Configuration and Start
Download latest version of Kafka from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.
To test Kafka install successfully you can check by running Kafka process on Linux “ps -ef|grep kafka” or steps for consumer and producer to/from topic in Setup Kafka Cluster for Single Server/Broker.
Elasticsearch Installation,Configuration and Start
Download latest version of Elasticsearch from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.
Before going to start Elasticsearch need to make some basic changes in config/elasticsearch.yml file for cluster and node name. You can configure it based on you application or organization name.
Download latest version of filebeat from below link and use command to untar and installation in Linux server. or if window just unzip downloaded file.
Now filebeat is configured and ready to start with below command, it will read from configured prospector for file App1.log continiously and publish log line events to Kafka . It will also create topic as APP-1-TOPIC in Kafka if not exist.
./filebeat -e -c filebeat.full.yml -d "publish"
On console it will display output as below for sample lines.
Now you can see from above filebeat debug statements publish event 3 is having multiline statements with stacktrace exception and each debug will have these fields like.
@timestamp: Timestamp of data shipped.
beat.hostname : filebeat machine name from where data is shipping.
beat.version: which version of filebeat installed on server that help for compatibility check on target end.
message : Log line from logs file or multline log lines
offset: it’s represent inode value in source file
source : it’s file name from where logs were read
Now time to check data is publish to Kafka topic or not. For this go to below directory and you will see two files as xyz.index and xyz.log for maintaining data offset and messages.
Before going to start Logstash need to create configuration file for taking input data from Kafka and parse these data in respected fields and send it elasticsearch. Create file logstash-app1.conf in logstash bin directory with below content.
To test your configuration file you can use below command.
./logstash -t -f logstash-app1.conf
If we get result OK from above command run below to start reading and parsing data from Kafka topic.
./logstash -f logstash-app1.conf
For design your own grok pattern for you logs line formatting you can follow below link that will help to generate incrementally and also provide some sample logs grok.
Logstash console will show parse data as below and you can remove unsed fields for storing in elasticsearch by uncomment mutate section from configuration file.
To test on elasticsearch end your data sent successfully you can use this url http://localhost:9200/_cat/indices on your browser and will display created index with current date.
yellow open app1-logs-2017.05.28 Qjs6XWiFQw2zsiVs9Ks6sw 5 1 4 0 47.3kb 47.3kb
Kibana Installation, Configuration and Start
Download latest version of Kibana from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.
Now we are ready with Kibana configuration and time start Kibana. We can use below command to run Kibana in background.
screen -d -m /bin/kibana
Kibana take time to start and we can test it by using below url in browser
http://localhost:5601/
For checking this data in Kibana open above url in browser go to management tab on left side menu -> Index Pattern -> Click on Add New
Enter Index name or pattern and time field name as in below screen and click on create button.
Index Pattern Settings
Now go to Discover Tab and select index as app1-log* will display data as below.
Now make below changes according to your application specification .
Filebeat :
update prospector path to your log directory current file
Move Kafka on different machine because Kafka will single location where receive shipped data from different servers. Update localhost with same IP of kafka server in Kafka output section of filebeat.full.yml file for hosts properties.
Copy same filebeat setup on all servers from where you application deployed and need to read logs.
Start all filebeat instances on each Server.
Elasticsearch :
Uncomment network.host properties from elasticsearch.yml file for accessing by IP address.
Logstash:
Update localhost in logstash-app1.conf file input section with Kafka machine IP.
Update localhost output section for elasticsearch with IP if moving on different machine.
Kibana:
update localhost in kibana.yml file for elasticsearch.url properties with IP if kibana on different machine.
Conclusion :
In this tutorial considers below points :
Installation of Filebeat, Kafka, Logstash, Elasticsearch and Kibana.
Filebeat is configured to shipped logs to Kafka Message Broker.
Logstash configured to read logs line from Kafka topic , Parse and shipped to Elasticsearch.
Kibana show these Elasticsearch information in form of chart and dashboard to users for doing analysis.
Read More
To read more on Filebeat, Kafka, Elasticsearch configurations follow the links and Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.
In below example I will explain about how to create Logstash configuration file by using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .
Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.
defect_id owned_by severity status summary application created_by creation_date modified_by modified_date assigned_to
530812 Ramesh Severity 3 Cacelled Customer call 5 time TEST-APP Saurabh 7/3/2017 15:44 Gaurav 8/19/2017 6:22 Development
530828 Neha Severity 1 Cancelled Dealer Code Buyer on behalf TEST-APP-5 Rajan 7/3/2017 16:20 Nilam 8/17/2017 9:29 Development
540829 Ramesh Severity 1 Retest Completed Client Not want Bulk call TEST-APP-4 Rajiv 7/24/2017 11:29 Raghav 8/5/2017 20:00 IST
Configuration File :
Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.
If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.
Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf
input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library ="../jar/ojdbc6.jar";
# ORACLE Driver Class
jdbc_driver_class ="Java::oracle.jdbc.driver.OracleDriver";
# ORACLE database jdbc connection string , jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string ="jdbc:oracle:thin:@hostname:1521/service";
#The user and password to connect to database
jdbc_user ="username";
jdbc_password ="password";
#Use when need to read password from file
#jdbc_password_filepath ="/opt/app/password-path-location";
jdbc_paging_enabled ="true";
jdbc_page_size ="50000";
#Configure Cron to How frequent want execute query in database
schedule ="*/15 * * * *";
#Use below if query is big and want to store in separate file
#statement_filepath ="../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement ="select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=true
use_column_value =true
tracking_column =defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type ="timestamp"
record_last_run =true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path ="logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type ="t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone ="UTC";
}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert =[ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly
date {
match =["creation_date","MM/dd/yyyy HH:mm"]
timezone ="America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index = "defect-data-%{+YYYY.MM}"
hosts = ["elasticsearch-server:9200"]
document_type = "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id = "%{defect_id}"
}
#Output to console
stdout { codec = rubydebug}
}
I try to give descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and more information just drop comments and send email will discuss in detail.
Date Filter : This filter will map CREATION_DATE to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.
If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create indexes for every month based on @timestamp value as defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.
JDBC Input changes for sql_last_value for numeric and timestamp
Read password and multi-line query from separate file.
Date Filter to get Index Timestamp value based on fields and pattern.
Dynamic Index Name for each day by appending date format.
Duplicate insert record prevention on Elasticsearch.
Start Logstash on background for configuration file.
Send Logstash output to Elasticsearch and Console.
Read More
To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.
In below example I will explain about how to create Logstash configuration file by using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .
Pre-requisite:
Logstash 5.xx installed
Elasticsearch 5.xx installed
Java 7/8 Installed
Sample Data:
Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.
defect_id owned_by severity status summary application created_by creation_date modified_by modified_date assigned_to
530812 Ramesh Severity 3 Cacelled Customer call 5 time TEST-APP Saurabh 7/3/2017 15:44 Gaurav 8/19/2017 6:22 Development
530828 Neha Severity 1 Cancelled Dealer Code Buyer on behalf TEST-APP-5 Rajan 7/3/2017 16:20 Nilam 8/17/2017 9:29 Development
540829 Ramesh Severity 1 Retest Completed Client Not want Bulk call TEST-APP-4 Rajiv 7/24/2017 11:29 Raghav 8/5/2017 20:00 IST
Configuration File :
Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.
If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.
Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf
input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library ="../jar/ojdbc6.jar";
# ORACLE Driver Class
jdbc_driver_class ="Java::oracle.jdbc.driver.OracleDriver";
# ORACLE database jdbc connection string , jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string ="jdbc:oracle:thin:@hostname:1521/service";
#The user and password to connect to database
jdbc_user ="username";
jdbc_password ="password";
#Use when need to read password from file
#jdbc_password_filepath ="/opt/app/password-path-location";
jdbc_paging_enabled ="true";
jdbc_page_size ="50000";
#Configure Cron to How frequent want execute query in database
schedule ="*/15 * * * *";
#Use below if query is big and want to store in separate file
#statement_filepath ="../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement ="select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=true
use_column_value =true
tracking_column =defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type ="timestamp"
record_last_run =true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path ="logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type ="t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone ="UTC";
}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert =[ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly
date {
match =["creation_date","MM/dd/yyyy HH:mm"]
timezone ="America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index = "defect-data-%{+YYYY.MM}"
hosts = ["elasticsearch-server:9200"]
document_type = "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id = "%{defect_id}"
}
#Output to console
stdout { codec = rubydebug}
}
I try to give descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and more information just drop comments and send email will discuss in detail.
Date Filter : This filter will map CREATION_DATE to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.
For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start
Logstash Console Output
If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create indexes for every month based on @timestamp value as defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.
JDBC Input changes for sql_last_value for numeric and timestamp
Read password and multi-line query from separate file.
Date Filter to get Index Timestamp value based on fields and pattern.
Dynamic Index Name for each day by appending date format.
Duplicate insert record prevention on Elasticsearch.
Start Logstash on background for configuration file.
Send Logstash output to Elasticsearch and Console.
Read More
To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.
In below example I will explain about how to create Logstash configuration file by using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .
Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.
defect_id owned_by severity status summary application created_by creation_date modified_by modified_date assigned_to
530812 Ramesh Severity 3 Cacelled Customer call 5 time TEST-APP Saurabh 7/3/2017 15:44 Gaurav 8/19/2017 6:22 Development
530828 Neha Severity 1 Cancelled Dealer Code Buyer on behalf TEST-APP-5 Rajan 7/3/2017 16:20 Nilam 8/17/2017 9:29 Development
540829 Ramesh Severity 1 Retest Completed Client Not want Bulk call TEST-APP-4 Rajiv 7/24/2017 11:29 Raghav 8/5/2017 20:00 IST
Configuration File :
Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.
If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.
Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf
input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library => "../jar/ojdbc6.jar"
# ORACLE Driver Class
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
# ORACLE database jdbc connection string , jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string => "jdbc:oracle:thin:@hostname:1521/service"
#The user and password to connect to database
jdbc_user => "username"
jdbc_password => "password"
#Use when need to read password from file
#jdbc_password_filepath => "/opt/app/password-path-location"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#Configure Cron to How frequent want execute query in database
schedule => "*/15 * * * *"
#Use below if query is big and want to store in separate file
#statement_filepath =>"../query/remedy-tickets-details.sql"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement => "select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id"
#Below is configuration when want to use last run value
clean_run=>true
use_column_value => true
tracking_column => defect_id
#Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp
#tracking_column_type => "timestamp"
record_last_run => true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path =>"logstash_jdbc_last_run_t_data.txt"
#Define type of data from database
type => "t-data"
#Configure Timestamp according to database location
#jdbc_default_timezone => "UTC"</code>
}
}
filter
{
#To map your creation_date column with elasticsearch @timestamp use below Date filter
mutate
{
convert => [ "creation_date", "string" ]
}
#Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm"
#and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly
date {
match => ["creation_date","MM/dd/yyyy HH:mm"]
timezone => "America/New_York"
}
}
output
{
#output to elasticsearch
elasticsearch {
index => "defect-data-%{+YYYY.MM}"
hosts => ["elasticsearch-server:9200"]
document_type => "t-type"
#Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch
document_id => "%{defect_id}"
}
#Output to console
stdout { codec => rubydebug}
}
I try to give descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and more information just drop comments and send email will discuss in detail.
Date Filter : This filter will map CREATION_DATE to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.
If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create indexes for every month based on @timestamp value as defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.
JDBC Input changes for sql_last_value for numeric and timestamp
Read password and multi-line query from separate file.
Date Filter to get Index Timestamp value based on fields and pattern.
Dynamic Index Name for each day by appending date format.
Duplicate insert record prevention on Elasticsearch.
Start Logstash on background for configuration file.
Send Logstash output to Elasticsearch and Console.
Read More
To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
Logstash, File Input Plugin, CSV Filter and Elasticsearch Output Plugin Example will read data from CSV file, Logstash will parse this data and store in Elasticsearch.
Pre-Requisite
Logstash 5.X
Elasticsearch 5.X
Below Logstash configuration file is considered based data in CSV file.You can modify this configuration file as per you data in your CSV file.
Create Logstastash configuration file logstash- installation-dir/bin/transaction-test.conf and paste below content.
input {
file {
path => "/opt/app/facinissuesonit/transactions-sample-data.txt"
start_position => beginning
}
}
filter {
csv {
#add mapping columns name correspondily values assigned
columns => ["TRANSACTION_COUNT","TRANSACTION_DATE","TRANSACTION_TYPE","SERVER"]
separator => "|"
remove_field => ["message"]
}
#Date filter is used to convert date to @Timestamp sho that chart in Kibana will show as per date
date {
match => ["TRANSACTION_DATE", "MM/dd/yyyy"]
}
#Remove first header line to insert in elasticsearch
if [TRANSACTION_TYPE] =~ "TRANSACTION_TYPE"
{
drop {}
}
}
output {
elasticsearch {
# Create Index based on date
index => "app-transactions-%{+YYYY.MM.dd}"
hosts => ["elasticsearver:9200"]
}
#Console Out put
stdout
{
codec => rubydebug
# debug => true
}
}
Information about configuration file :
File Input Plugin : will read data from file and because we set as start-position as “Beginning” will always read file form start.
CSV Filter : This filter will read each line message , split based on “|” and map with corresponding column mentioned position and finally will remove this message field because data is parsed now.
Date Filter : This filter will map TRANSACTION_DATE to @timestamp value for Index for each document and it says to TRANSACTION_DATE is having pattern as “MM/dd/YYYY” so that when converting to timestamp will follow same.
drop: Drop is for removing header line if field name match with content.
If you noticed by using Date filter index @timestamp value is generating based on value of TRANSACTION_DATE and for elasticsearch output configuration for index name app-transactions-%{+YYYY.MM.dd} will create 3 indexes based on @timestamp value as app-transactions-2017.07.24 , app-transactions-2017.07.25, app-transactions-2017.07.26 for sample data.
How to apply CSV filter for “|” and map with fields.
How to drop header line if exist in CSV file
Date Filter to get Index Timestamp value based on fields and pattern
Dynamic Index Name for each day by appending date format
Start Logstash on background for configuration file.
Read More
To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
cluster.name : Elasticsearch Cluster name is unique name with in network which links all nodes.
node.name : Each node in cluster have unique name by which Cluster identify nodes.
http.port: Default http port for Elasticsearch is 9200 . You can update it .
network.host: This property will set when elasticsearch need to access other machine or by IP. For more on network.host follow link Why network.host?
Now Elasticsearch is ready for start.
How to start Elasticsearch?
There are multiple way to start elasticsearch as below.
Run in foreground
Run in background
Run in background with commandline arguments
Run in Foreground
To run elastic search in Linux server as foreground process to see sysout in console use below command in elasticsearch home directory. When you will see started as in below screen means elasticsearch is started successfully.
/bin/elasticsearch
In above screen elasticsearch is started on port 9200.
Note: To stop elasticsearch use CNTR+C.
Run in Background
use option “screen -d -m” to run elasticsearch in background
screen -d -m /bin/elasticsearch
or
/bin/elasticsearch -d
Elasticsearch in Background by Command-line configuration
Instead of passing hard code value in elasticsearch.yml file .We can also pass all these above configurable fields from command-line as given below. For more detail follow link Elasticsearch Cluster with multi node on same machine.
For learn more about Cluster follow link Cluster Configuration and Health
Same way you can get detail about your configured Node from below URLs.
http://localhost:9200/_nodes?pretty
or as below if network.host configured
http://elasticsearverip:9200/_nodes?pretty
For learn more about Node follow link Node Type,Configuration and status etc. Follow Link for Elasticsearch Tutorials.
Read More
To read more on Elasticsearch Configuration, sample Elasticsearch REST clients, Queries Type configurations with example follow link Elasticsearch Tutorial and Elasticsearch Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
For learn more about Cluster follow link Cluster Configuration and Health
Same way you can get detail about your configured Node from below URLs.
http://localhost:9200/_nodes?pretty
or as below if network.host configured
http://elasticsearverip:9200/_nodes?pretty
For learn more about Node follow link Node Type,Configuration and status etc. Go to below link for Elasticsearch Tutorials.
Read More
To read more on Elasticsearch Configuration, sample Elasticsearch REST clients, Queries Type configurations with example follow link Elasticsearch Tutorial and Elasticsearch Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
“Elasticsearch is open source cross-platform developed completely in Java. It’s built on top of Lucene which provide full text search on high volumes of data quickly and easily do analysis based on indexing. It is schema free and provide NRT(Near real Time) search results.”
Advantage of Elasticsearch
Full Text Search
Elasticserach built on top of Lucene which provide full-featured library to search full-text on any open source.
Schema Free
Elasticsearch stores documents in JSON format and based on it detects words and type to make it searchable.
Restful API
Elastisearch is easily accessible over browser by using URL and also support for Restful API to perform Operation. For read more on Elasticsearch REST follow link for Elasticsearch REST JAVA API Overview.
Operation Persistence
Elasticsearch cluster keep records of all transaction level changes for schema if anything get change in data for index and track of availability of Nodes in cluster so that make data easily available if any fail-over of any node.
Area of use Elasticsearch?
It’s useful in application where need to do analysis, statics and need to find out anomalies on data based on pattern.
It’s useful where need to send alerts when particular condition matched like stock market, exception from logs etc.
It’s useful with application where log analysis and issue solution provide because of full search in billions of records in milliseconds.
It’s compatible with application like Filebeat, Logstash and Kibana for storage of high volume data for analysis and visualize in form of chart and dashboards.
Basic Concepts and Terminology
Cluster
Cluster is a collection of one or more nodes which provide capabilities to search text on scattered data on nodes. It’s identified by unique name with in network so that all associated nodes will join together by cluster name.
For more info on Cluster configuration and query follow link Elasticsearch Cluster.
Elasticsearch Cluster
In above screen elasticsearch cluster “FACING_ISSUE_IN_IT” having three master and four data node.
Node
Node is a Elasticsearch server which associate with a cluster. It’s store data , help cluster for indexing data and search query. It’s identified by unique name in Cluster if name is not provided elasticsearch will generate random Universally Unique Identifier(UUID) on time of server start.
A Cluster can have one or more Nodes .If first node start that will have Cluster with single node and when other node will start will add with that cluster.
For more info on Node Configuration, Master Node, Data Node, Ingest node follow link Elasticsearch Node.
Data Node Documents Storage
In above screen trying to represent data of two indexes like I1 and I2.Where Index I1 is having two type of documents T1 and T2 while index I2 is having only type T2 and these shards are distributes over all nodes in cluster. This data node is having documents of shard (S1) for Index I1 and shard (S3) for Index I2. It’s also keeping replica of documents of shards S2 of Index I2 and I1 which are store some other nodes in cluster.
Index
An Index is collection of documents with same characteristics which stores on nodes in distributed fashion and its identify by unique name on which perform different operation like search query, update and delete for documents. A cluster can have as many indexes with unique name.
A document store in Index and assigned a type to it and an Index can have multiple types of documents.
For more info on Index Creation, Mapping Template , CRUD follow link Elasticsearch Index.
Shards
Shards are partitions of indexes scattered on nodes. It provide capability to store large amount (billions) of documents for same index to store in cluster even one disk of node is not capable to store it.
Replica
Replica is copy of shard which store on different node. A shard can have zero or more replica. If shard on one node then replica of shard will store on another node.
Benefits of Shards and Replica
Shards splits indexes in horizontal partition for high volumes of data.
It perform operations parallel to each shards or replica on multiple node for index so that increase system performance and throughput.
Recovered easily in case of fail-over of node because data replica exist on another node because replica always store on different node where shards exist.
Note
When we create index by default elasticseach index configure as 5 shards and 1 replica but we can configure it from config/elasticsearch.yml file or by passing shards and replica values in mapping when index create.
Once index created we can’t change shards configuration but modify in replica. If need to update in shards only option is re-indexing.
Each Shard itself a Lucene index and it can keep max 2,147,483,519 (= Integer.MAX_VALUE – 128) documents. For merging of search results and failover taken care by elasticsearch cluster.
For more info on elasticsearch Shards and Replica follow Elasticsearch Shards and Replica configuration.
Document
Each Record store in index is called a document which store in JSON object. JSON data exchange is fast over internet and easy to handle on browser side display.
Read More
To read more on Elasticsearch Configuration, Sample Elasticsearch REST Clients, Search Queries Types with example follow link Elasticsearch Tutorial and Elasticsearch Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
Elasticsearch 5 REST Java Index Manager Auto Client can help to manage index life from client end by setting configuration for keeping index open, close, delete indexes for this no any third party tool required.
Below steps for auto index management will save your time of index management manually and will take care of index life based on configure time.
Pre-requisite
Minimum requirement for Java 8 version required.
Add dependency for Elasticsearch REST and JSON Mapping in your pom.xml or add in your class path.
Index name format should be like IndexName-2017.06.10 for Ex. app1-logs-2017.06.08 if you have different date format change accordingly in below code.
We will follow below steps to create this client and auto run:
Create console based JAVA maven project as in below screen shot with name as ElasticsearchAutoIndexManager . To know more about Console based Java maven project follow link How to create Maven Java Console Project?
Add below dependency in pom.xml file
<!--Elasticsearch REST jar-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>rest</artifactId>
<version>5.1.2</version>
</dependency>
<!--Jackson jar for mapping json to Java -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.5</version>
</dependency>
Add below ElasticSearchIndexManagerClient class in com.facingissuesonit.es package and make below constant fields changes as per your server info and requirement.
Set INDEX_NO_ACTION_TIME so that till these days difference no action will take. For Example as set 2 means till two days index will searchable in system.
private static final int INDEX_NO_ACTION_TIME = 2;
Set INDEX_CLOSE_TIMEso that all indexes will in close status means exist in elasticsearch server but not searchable.For Example as set 5 means if index life is more than five days will close these indexes and keep it as long as Index delete time not reach.
private static final int INDEX_CLOSE_TIME = 5;
Set INDEX_DELETE_TIME decide when to delete these indexes which match this criteria. For example as set 15 means will delete all indexes which are having index life more than 15 days.
private static final int INDEX_DELETE_TIME = 15;
private static final String ELASTICSEARCH_SERVER = “ServerHost”;
private static final int ELASTICSEARCH_SERVER_PORT = 9200;
Note : Set proxy server and login credential information if required else comment out.
package com.facingissuesonit.es;
import java.io.IOException;
import java.io.InputStream;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ElasticSearchIndexManagerClient {
private static final int INDEX_NO_ACTION_TIME = 2;
private static final int INDEX_CLOSE_TIME = 5;
private static final int INDEX_DELETE_TIME = 15;
private static final String ELASTICSEARCH_SERVER = "ServerHost";
private static final int ELASTICSEARCH_SERVER_PORT = 9200;
public static void main(String[] args) {
RestClient client;
String indexName = "", indexDateStr = "";
LocalDate indexDate = null;
long days = 0;
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
final LocalDate todayLocalDate = LocalDate.now();
try {
ElasticSearchIndexManagerClient esManager=new ElasticSearchIndexManagerClient();
//Get Connection from Elasticsearch
client=esManager.getElasticsearchConnectionClient();
if(client!=null)
{
IndexDetail[] indexList = esManager.getIndexDetailList(client);
if (indexList != null && indexList.length > 0) {
for (IndexDetail indexDetail : indexList) {
indexName = indexDetail.getIndexName();
System.out.println(indexName);
indexDateStr = indexName.substring(indexName.lastIndexOf("-") + 1);
//Below code is for getting number of days difference from index creation ad current date
try {
indexDate = LocalDate.parse(indexDateStr.replace('.', '-'), formatter);
days = ChronoUnit.DAYS.between(indexDate, todayLocalDate);
esManager.performAction(indexDetail, days,client);
} catch (Exception ex) {
System.out.println("Index is not having formatted date as required : yyyy.MM.dd :"+indexName);
}
}
}
}
} catch (Exception ex) {
System.out.println("Exception found while index management");
ex.printStackTrace();
System.exit(1);
} finally {
System.out.println("Index Management successfully completed");
System.exit(0);
}
}
//Get Elasticsearch Connection
private RestClient getElasticsearchConnectionClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userid", "password"));
RestClient client = RestClient
.builder(new HttpHost(ELASTICSEARCH_SERVER,ELASTICSEARCH_SERVER_PORT))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setProxy(new HttpHost("ProxyHost", "ProxyPort"));
}
}).setMaxRetryTimeoutMillis(60000).build();
return client;
}
//Get List of Indexes in Elaticsearxh Server
public IndexDetail[] getIndexDetailList(RestClient client)
{
IndexDetail[] indexDetails=null;
HttpEntity in=null;
try
{
ObjectMapper jacksonObjectMapper = new ObjectMapper();
Response response = client.performRequest("GET", "/_cat/indices?format=json&pretty", Collections.singletonMap("pretty", "true"));
in =response.getEntity();
indexDetails=jacksonObjectMapper.readValue(in.getContent(), IndexDetail[].class);
System.out.println("Index found :"+indexDetails.length);
}
catch(IOException ex)
{
ex.printStackTrace();
}
return indexDetails;
}
//This Method Decide what action need to take based based Index creation date and configured date for No Action, close and Delete indexes
private void performAction(IndexDetail indexDetail, long days,RestClient client) {
String indexName = indexDetail.getIndexName();
if (days >= INDEX_NO_ACTION_TIME) {
if (!(indexDetail.getStatus() != null && indexDetail.getStatus().equalsIgnoreCase("close"))) {
// Close index condition
if (days >= INDEX_CLOSE_TIME) {
System.out.println("Close Index :" + indexName);
closeIndex(indexName,client);
}
}
// Delete index condition
if (days >= INDEX_DELETE_TIME) {
if (!(indexDetail.getStatus() != null && indexDetail.getStatus().equalsIgnoreCase("close"))) {
System.out.println("Delete Index :" + indexName);
deleteIndex(indexName,client);
} else {
System.out.println("Delete Close Index :" + indexName);
deleteCloseIndex(indexName,client);
}
}
}
}
// Operation on Indexes
private void closeIndex(String indexName,RestClient client) {
flushIndex(indexName,client);
postDocuments(indexName + "/_close", client);
System.out.println("Close Index :" + indexName);
}
private void deleteIndex(String indexName,RestClient client) {
flushIndex(indexName,client);
deleteDocument(indexName,client);
System.out.println("Delete Index :" + indexName);
}
private void deleteCloseIndex(String indexName,RestClient client) {
openIndex(indexName,client);
flushIndex(indexName,client);
deleteDocument(indexName, client);
System.out.println("Delete Close Index :" + indexName);
}
private void openIndex(String indexName,RestClient client) {
postDocuments(indexName + "/_open", client);
System.out.println("Open Index :" + indexName);
}
private void flushIndex(String indexName,RestClient client) {
postDocuments(indexName + "/_flush", client);
System.out.println("Flush Index :" + indexName);
try {
Thread.sleep(3000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
//POST perform action used for creation and updation indexes
public InputStream postDocuments(String endPoint,RestClient client)
{
InputStream in=null;
Response response=null;
try
{
response = client.performRequest("POST", endPoint, Collections.<String, String>emptyMap());
in =response.getEntity().getContent();
}
catch(IOException ex)
{
System.out.println("Exception in post Documents :");
ex.printStackTrace();
}
return in;
}
//DELETE perform action use for Deletion of Index
public InputStream deleteDocument(String endPoint,RestClient client)
{
InputStream in=null;
try
{
Response response = client.performRequest("DELETE", endPoint, Collections.singletonMap("pretty", "true"));
in =response.getEntity().getContent();
}
catch(IOException ex)
{
System.out.println("Exception in delete Documents :");
ex.printStackTrace();
}
return in;
}
}
In above code pretty state forward and step by step. Let’s me explain about below operation.
Open : Index status open keep index available for searching and we can perform below operation like close and delete on indexes when it open status. For making Index open we can use below command in curl .
curl POST /indexName-2017.06.10/_open
Flush: This operation is required before executing close and delete operation on indexes so that all running transactions on indexes complete.
curl POST /indexName-2017.06.10/_flush
Close : Close indexes persist in elasticsearch server but not available for searching. For making Index open we can use below command in curl .
curl POST /indexName-2017.06.10/_close
Delete : Delete operation on index will delete completely from server.
curl POST /indexName-2017.06.10/_delete
Now our code is ready to take care of indexes based on configured time and test . we test it after running above code.
Below steps are for making your index manager code auto runnable in Linux environment.
Create Auto Runnable Jar
Export ElasticsearchAutoIndexManager project as auto runnable jar by selecting as Launch class ElascticsearchIndexManagerClient. To learn about Auto runnable jar creation steps following link How to make and auto run /executable jar with dependencies?
Create Script File to Execute Autorun Jar
Create script file as below with name as IndexManger.sh and save it.
Create Cron Tab configuration for schedule and receive email alert
Linux provide cron tab for executing schedule job/scripts. by using cron tab will execute runnable jar by using above script file
Use command crontab -e to make and edit existing entries in cron tab.
Make below cron entry in this editor for executing IndexManager.sh script on every night 1AM.
If you want to get execution alert to you and your team with console logs also add your email id as below.
Save cron tab as ESC then (:wq)
Below are some more example for cron tab expression.
0 * * * * : Run Every hour of day
* * * * * : Every minute of day
30 4 * * * : Run on 4:30 AM everyday
5 10,22 * * * : Run twice on 10:05 and 22:05
5 0 * * * : Run after Midnight
Filebeat, Kafka, Logstash, Elasticsearch and Kibana Integration is used for big organizations where applications deployed in production on hundreds/thousands of servers and scattered around different locations and need to do analysis on data from these servers on real time.
This integration helps mostly for log level analysis , tracking issues, anomalies with data and alerts on events of particular occurrence and where accountability measures.
By using these technology provide scalable architecture to enhance systems and decoupled of each other individually.
Provide window to view Elasticsearch data in form different charts and dashboard.
Provide way searches and operation of data easily with respect to time interval.
Easily Imported by any web application by embedded dashboards.
How Data flow works ?
In this integration filebeat will install in all servers where your application is deployed and filebeat will read and ship latest logs changes from these servers to Kafka topic as configured for this application.
Logstash will subscribe log lines from kafka topic and perform parsing on these lines make relevant changes, formatting, exclude and include fields then send this processed data to Elasticsearch Indexes as centralize location from different servers.
Kibana is linked with Elasticsearch indexes which will help to do analysis by search, charts and dashboards .
Design Architecture
In below configured architecture considering my application is deployed on three servers and each server having current log file name as App1.log . Our goal is read real time data from these servers and do analysis on these data.
Steps to Installation, Configuration and Start
Here first we will install Kafka and Elasticsearch run individually rest of tools will install and run sequence to test with data flow. Initially install all in same machine and test with sample data with below steps and at end of this post will tell about what changes need to make according to your servers.
Kafka Installation, Configuration and Start
Elasticsearch Installation,Configuration and Start
Filebeat Installation,Configuration and Start
Logstash Installation,Configuration and Start
Kibana Installation,Start and display.
Pre-Requisite
These Filebeat,Logstash, Elasticsearch and Kibana versions should be compatible better use latest from https://www.elastic.co/downloads.
Java 8+
Linux Server
Filebeat 5.XX
Kafka 2.11.XX
Logstash 5.XX
Elasticsearch 5.XX
Kibana 5.XX
Note : Make sure JDK 8 should be install and JAVA_HOME environment variable point to JDK 8 home directory wherever you want in install Elasticsearch, Logstash,Kibana and Kafka.
Window : My computer ->right click-> Properties -> Advance System Settings->System Variable
Set JAVA_HOME
Linux : Go to your home directory/ sudo directory and below line as below .
For testing we will use these sample log line which is having debug as well as stacktrace of logs and grok parsing of this example is designed according to it. For real time testing and actual data you can point to your server log files but you have to modify grok pattern in Logstash configuration accordingly.
2013-02-28 09:57:56,662 WARN CreateSomethingActivationKey - WhateverException for User 49-123-345678 {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,663 INFO LMLogger - ERR1700 - u:null failures: 0 - Technical error {{rid,US8cFAp5eZgAABwUItEAAAAI_dev01_443}{realsid,60A9772A136B9912B6FF0C3627A47090.dev1-a}}
2013-02-28 09:57:56,668 ERROR SomeCallLogger - ESS10005 Cpc portalservices: Exception caught while writing log messege to MEA Call: {}
java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:445)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
2013-02-28 10:04:35,723 INFO EntryFilter - Fresh on request /portalservices/foobarwhatever {{rid,US8dogp5eZgAABwXPGEAAAAL_dev01_443}{realsid,56BA2AD41D9BB28AFCEEEFF927EE61C2.dev1-a}}
Create App1.log file in same machine where filebeat need to install and copy above logs lines in App1.log file.
Kafka Installation , Configuration and Start
Download latest version of Kafka from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.
To test Kafka install successfully you can check by running Kafka process on Linux “ps -ef|grep kafka” or steps for consumer and producer to/from topic in Setup Kafka Cluster for Single Server/Broker.
Elasticsearch Installation,Configuration and Start
Download latest version of Elasticsearch from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.
Before going to start Elasticsearch need to make some basic changes in config/elasticsearch.yml file for cluster and node name. You can configure it based on you application or organization name.
Download latest version of filebeat from below link and use command to untar and installation in Linux server. or if window just unzip downloaded file.
Now filebeat is configured and ready to start with below command, it will read from configured prospector for file App1.log continiously and publish log line events to Kafka . It will also create topic as APP-1-TOPIC in Kafka if not exist.
./filebeat -e -c filebeat.full.yml -d "publish"
On console it will display output as below for sample lines.
Now you can see from above filebeat debug statements publish event 3 is having multiline statements with stacktrace exception and each debug will have these fields like.
@timestamp: Timestamp of data shipped.
beat.hostname : filebeat machine name from where data is shipping.
beat.version: which version of filebeat installed on server that help for compatibility check on target end.
message : Log line from logs file or multline log lines
offset: it’s represent inode value in source file
source : it’s file name from where logs were read
Now time to check data is publish to Kafka topic or not. For this go to below directory and you will see two files as xyz.index and xyz.log for maintaining data offset and messages.
Before going to start Logstash need to create configuration file for taking input data from Kafka and parse these data in respected fields and send it elasticsearch. Create file logstash-app1.conf in logstash bin directory with below content.
To test your configuration file you can use below command.
./logstash -t -f logstash-app1.conf
If we get result OK from above command run below to start reading and parsing data from Kafka topic.
./logstash -f logstash-app1.conf
For design your own grok pattern for you logs line formatting you can follow below link that will help to generate incrementally and also provide some sample logs grok.
Logstash console will show parse data as below and you can remove unsed fields for storing in elasticsearch by uncomment mutate section from configuration file.
To test on elasticsearch end your data sent successfully you can use this url http://localhost:9200/_cat/indices on your browser and will display created index with current date.
yellow open app1-logs-2017.05.28 Qjs6XWiFQw2zsiVs9Ks6sw 5 1 4 0 47.3kb 47.3kb
Kibana Installation, Configuration and Start
Download latest version of Kibana from below link and use command to untar and installation in Linux server or if window just unzip downloaded file.
Now we are ready with Kibana configuration and time start Kibana. We can use below command to run Kibana in background.
screen -d -m /bin/kibana
Kibana take time to start and we can test it by using below url in browser
http://localhost:5601/
For checking this data in Kibana open above url in browser go to management tab on left side menu -> Index Pattern -> Click on Add New
Enter Index name or pattern and time field name as in below screen and click on create button.
Index Pattern Settings
Now go to Discover Tab and select index as app1-log* will display data as below.
Now make below changes according to your application specification .
Filebeat :
update prospector path to your log directory current file
Move Kafka on different machine because Kafka will single location where receive shipped data from different servers. Update localhost with same IP of kafka server in Kafka output section of filebeat.full.yml file for hosts properties.
Copy same filebeat setup on all servers from where you application deployed and need to read logs.
Start all filebeat instances on each Server.
Elasticsearch :
Uncomment network.host properties from elasticsearch.yml file for accessing by IP address.
Logstash:
Update localhost in logstash-app1.conf file input section with Kafka machine IP.
Generally we receive this issue when we trying elasticsearch start again with same port while one instance of elasticseearch is already running . For resolving issue follow below steps:
Exception :
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method) ~[?:?]
at sun.nio.ch.Net.bind(Net.java:433) ~[?:?]
at sun.nio.ch.Net.bind(Net.java:425) ~[?:?]
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) ~[?:?]
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128) ~[?:?]
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:554) ~[?:?]
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1258) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486) ~[?:?]
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:980) ~[?:?]
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:250) ~[?:?]
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:365) ~[?:?]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[?:?]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) ~[?:?]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) ~[?:?]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[?:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]
Follow below steps:
To check elasticsearch running instance and get process Id run below command
ps -ef|grep elasticsearch
Kill this process if you want to start elasticsearch again
Elasticseach doesn’t start due to this issue because of virtual memory required for it is not supporting to elasticseach system required.
ERROR: bootstrap checks failed
max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
Solution :
To overcome this issue make below changes in config/elasticseach.yml file and also in config/jvm.options file for your heap space as per your machine configuration by default set as -Xms2g
Generally, This issue start happening after installing Elasticsearch 5 version on time of boot because it’s having additional features to check Elasticsearch System requirement before running.
[2017-05-13T16:55:00,294][WARN ][o.e.b.JNANatives ] unable to install syscall filter:
java.lang.UnsupportedOperationException: seccomp unavailable: CONFIG_SECCOMP not compiled into kernel, CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed
at org.elasticsearch.bootstrap.SystemCallFilter.linuxImpl(SystemCallFilter.java:363) ~[elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.SystemCallFilter.init(SystemCallFilter.java:638) ~[elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.JNANatives.tryInstallSystemCallFilter(JNANatives.java:215) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Natives.tryInstallSystemCallFilter(Natives.java:99) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Bootstrap.initializeNatives(Bootstrap.java:111) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:204) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:360) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:123) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:114) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:67) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:122) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.cli.Command.main(Command.java:88) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:91) [elasticsearch-5.4.0.jar:5.4.0]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:84) [elasticsearch-5.4.0.jar:5.4.0]
Issue Solution:
To overcome this issue add below flag in your config/elasticsearch.yml file so that if your System is not fulfill exact criteria of booting but it will still run.
Generally in Elasticsearch people face this issue when trying to access elasticsearch related url from browser by using IP address while everything working fine with localhost.
Suppose your System IP is 10.0.0.31 and trying to run below query in browser.You will get below screen on different browsers
http://10.0.0.31:9200/_cluster/health?pretty
IE:
IE Browser
Chrome:
Chrome Browser
Solution:
Go to command prompt and use below command to get ipv4 address as mentioned in below screen.
Window :
ipconfig
Unix/Linux:
ifconfig
Update Your IPV4 address in config/elasticsearch.yml file for property
network.host: 10.0.0.31
if your IP is static IP then you can update same as above or if it’s dynamic then update it as below.
network.host:0.0.0.0
For testing above configuration restart your eleasticsearch and try in elasticsearch on your browser address bar after updating server ip. You will get result like below.
http://localhost:9200/_cluster/health?pretty
or as below if network.host configured
http://elasticseverIp:9200/_cluster/health?pretty
Below is example to get Index Detail in Java Array by using Elasticsearch REST Java client. Here client will call endpoint “/_cat/indices?format=json” to retrieve all detail of index list. It is same as we use GET by CURL
GET http://elasticsearchHost:9200/_cat/indices?format=json
Pre-requisite
Minimum requirement for Java 7 version required.
Add below dependency for Elasticsearch REST and JSON Mapping in your pom.xml or add in your class path.
Dependency
<!--Elasticsearch REST jar-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>rest</artifactId>
<version>5.1.2</version>
</dependency>
<!--Jackson jar for mapping json to Java -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.5</version>
</dependency>
Sample Code
import java.io.IOException;
import java.util.Collections;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ElasticsearchRESTIndexClient {
public static void main(String[] args) {
IndexInfo []indexArr = null;
RestClient client = null;
try {
client = openConnection();
if (client != null) {
// performRequest GET method will retrieve all index detail list
// information from elastic server
Response response = client.performRequest("GET", "/_cat/indices?format=json",
Collections.singletonMap("pretty", "true"));
// GetEntity api will return content of response in form of json
// in Http Entity
HttpEntity entity = response.getEntity();
ObjectMapper jacksonObjectMapper = new ObjectMapper();
// Map json response to Java object in IndexInfo Array
// Cluster Info
indexArr = jacksonObjectMapper.readValue(entity.getContent(), IndexInfo[].class);
for(IndexInfo indexInfo:indexArr)
{
System.out.println(indexInfo);
}
}
} catch (Exception ex) {
System.out.println("Exception found while getting cluster detail");
ex.printStackTrace();
} finally {
closeConnnection(client);
}
}
// Get Rest client connection
private static RestClient openConnection() {
RestClient client = null;
try {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userid", "password"));
client = RestClient.builder(new HttpHost("elasticHost", Integer.parseInt("9200")))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
// Customize connection as per requirement
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
// Credentials
.setDefaultCredentialsProvider(credentialsProvider)
// Proxy
.setProxy(new HttpHost("proxyServer", 8080));
}
}).setMaxRetryTimeoutMillis(60000).build();
} catch (Exception ex) {
ex.printStackTrace();
}
return client;
}
// Close Open connection
private static void closeConnnection(RestClient client) {
if (client != null) {
try {
client.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
Index Info Object where JSON index detail will map
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public class IndexInfo {
@JsonProperty(value = "health")
private String health;
@JsonProperty(value = "index")
private String indexName;
@JsonProperty(value = "status")
private String status;
@JsonProperty(value = "pri")
private int shards;
@JsonProperty(value = "rep")
private int replica;
@JsonProperty(value = "pri.store.size")
private String dataSize;
@JsonProperty(value = "store.size")
private String totalDataSize;
@JsonProperty(value = "docs.count")
private String documentCount;
@Override
public String toString()
{
StringBuffer str=new StringBuffer(60);
str.append("{\n");
str.append(" \"").append("indexName").append("\":\"").append(indexName).append("\",\n");
str.append(" \"").append("health").append("\":\"").append(health).append("\",\n");
str.append(" \"").append("status").append("\":\"").append(status).append("\",\n");
str.append(" \"").append("shards").append("\":\"").append(shards).append("\",\n");
str.append(" \"").append("replica").append("\":\"").append(replica).append("\",\n");
str.append(" \"").append("dataSize").append("\":\"").append(dataSize).append("\",\n");
str.append(" \"").append("totalDataSize").append("\":\"").append(totalDataSize).append("\",\n");
str.append(" \"").append("documentCount").append("\":\"").append(documentCount).append("\"\n");
str.append(" \"");
return str.toString();
}
public String getIndexName() {
return indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public int getShards() {
return shards;
}
public void setShards(int shards) {
this.shards = shards;
}
public int getReplica() {
return replica;
}
public void setReplica(int replica) {
this.replica = replica;
}
public String getDataSize() {
return dataSize;
}
public void setDataSize(String dataSize) {
this.dataSize = dataSize;
}
public String getTotalDataSize() {
return totalDataSize;
}
public void setTotalDataSize(String totalDataSize) {
this.totalDataSize = totalDataSize;
}
public String getDocumentCount() {
return documentCount;
}
public void setDocumentCount(String documentCount) {
this.documentCount = documentCount;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getHealth() {
return health;
}
public void setHealth(String health) {
this.health = health;
}
}
Below is example to get Cluster Detail in Java Object by using Elasticsearch REST Java client. Here client will call endpoint “/_cluster/health” to retrieve all detail of index list. It is same as we use GET by CURL
GET http://elasticsearchHost:9200/_cluster/health
Pre-requisite
Minimum requirement for Java 7 version required.
Add below dependency for Elasticsearch REST and JSON Mapping in your pom.xml or add in your class path.
Dependency
<!--Elasticsearch REST jar-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>rest</artifactId>
<version>5.1.2</version>
</dependency>
<!--Jackson jar for mapping json to Java -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.5</version>
</dependency>
Sample Code
import java.io.IOException;
import java.util.Collections;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ElasticsearchRESTClusterClient {
public static void main(String[] args) {
ClusterInfo clusterHealth = null;
RestClient client = null;
try {
client = openConnection();
if (client != null) {
// performRequest GET method will retrieve all cluster health
// information from elastic server
Response response = client.performRequest("GET", "/_cluster/health",
Collections.singletonMap("pretty", "true"));
// GetEntity api will return content of response in form of json
// in Http Entity
HttpEntity entity = response.getEntity();
ObjectMapper jacksonObjectMapper = new ObjectMapper();
// Map json response to Java object in ClusterInfo
// Cluster Info
clusterHealth = jacksonObjectMapper.readValue(entity.getContent(), ClusterInfo.class);
System.out.println(clusterHealth);
}
} catch (Exception ex) {
System.out.println("Exception found while getting cluster detail");
ex.printStackTrace();
} finally {
closeConnnection(client);
}
}
// Get Rest client connection
private static RestClient openConnection() {
RestClient client = null;
try {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userId", "password"));
client = RestClient.builder(new HttpHost("elasticHost", Integer.parseInt("9200")))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
// Customize connection as per requirement
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
// Credentials
.setDefaultCredentialsProvider(credentialsProvider)
// Proxy
.setProxy(new HttpHost("ProxyServer", 8080));
}
}).setMaxRetryTimeoutMillis(60000).build();
} catch (Exception ex) {
ex.printStackTrace();
}
return client;
}
// Close Open connection
private static void closeConnnection(RestClient client) {
if (client != null) {
try {
client.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
Cluster Info Java Object where retrieve json response will map.
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusterInfo {
@JsonProperty(value = "cluster_name")
private String clusterName;
@JsonProperty(value = "status")
private String clusterStatus;
@JsonProperty(value = "active_primary_shards")
private int primaryActiveShards;
@JsonProperty(value = "active_shards")
private int activeShards;
@JsonProperty(value = "delayed_unassigned_shards")
private int delayedUnAssignedShards;
@JsonProperty(value = "unassigned_shards")
private int unAssignedShards;
@JsonProperty(value = "initializing_shards")
private int initializingShards;
@JsonProperty(value = "relocating_shards")
private int relocatingShards;
@JsonProperty(value = "number_of_nodes")
private int totalNodeCount;
@JsonProperty(value = "number_of_data_nodes")
private int dataNodeCount;
@Override
public String toString()
{
StringBuffer str=new StringBuffer(60);
str.append("{\n");
str.append(" \"").append("clusterName").append("\":\"").append(clusterName).append("\",\n");
str.append(" \"").append("clusterStatus").append("\":\"").append(clusterStatus).append("\",\n");
str.append(" \"").append("primaryActiveShards").append("\":\"").append(primaryActiveShards).append("\",\n");
str.append(" \"").append("activeShards").append("\":\"").append(activeShards).append("\",\n");
str.append(" \"").append("delayedUnAssignedShards").append("\":\"").append(delayedUnAssignedShards).append("\",\n");
str.append(" \"").append("unAssignedShards").append("\":\"").append(unAssignedShards).append("\",\n");
str.append(" \"").append("initializingShards").append("\":\"").append(initializingShards).append("\",\n");
str.append(" \"").append("relocatingShards").append("\":\"").append(relocatingShards).append("\",\n");
str.append(" \"").append("totalNodeCount").append("\":\"").append(totalNodeCount).append("\",\n");
str.append(" \"").append("dataNode").append("\":\"").append(dataNodeCount).append("\"");
str.append(" \"");
return str.toString();
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public String getClusterStatus() {
return clusterStatus;
}
public void setClusterStatus(String clusterStatus) {
this.clusterStatus = clusterStatus;
}
public int getPrimaryActiveShards() {
return primaryActiveShards;
}
public void setPrimaryActiveShards(int primaryActiveShards) {
this.primaryActiveShards = primaryActiveShards;
}
public int getActiveShards() {
return activeShards;
}
public void setActiveShards(int activeShards) {
this.activeShards = activeShards;
}
public int getDelayedUnAssignedShards() {
return delayedUnAssignedShards;
}
public void setDelayedUnAssignedShards(int delayedUnAssignedShards) {
this.delayedUnAssignedShards = delayedUnAssignedShards;
}
public int getUnAssignedShards() {
return unAssignedShards;
}
public void setUnAssignedShards(int unAssignedShards) {
this.unAssignedShards = unAssignedShards;
}
public int getInitializingShards() {
return initializingShards;
}
public void setInitializingShards(int initializingShards) {
this.initializingShards = initializingShards;
}
public int getRelocatingShards() {
return relocatingShards;
}
public void setRelocatingShards(int relocatingShards) {
this.relocatingShards = relocatingShards;
}
public int getDataNodeCount() {
return dataNodeCount;
}
public void setDataNodeCount(int dataNodeCount) {
this.dataNodeCount = dataNodeCount;
}
public int getTotalNodeCount() {
return totalNodeCount;
}
public void setTotalNodeCount(int totalNodeCount) {
this.totalNodeCount = totalNodeCount;
}
}
Elasticsearch REST performRequest api always returned response for Synchronous by object Response and for Asynchronous by ResponseListener which contain response object. Response object contains other fields as given below:
Host
getHost() api return host information.
requestLine
getRequestLine() api returned information about performed request.
statusLine
return response status code by calling getStatusLine()
headers
provide response all header information by api getHeaders() if need to get specific one can get by getHeader(String).
Entity
Entity keeps all the content of response which comes after query or filters. We can get this by calling getEntity()
Failure Response or Exception
IOException
Communication problem like SocketTimeout etc.
ResponseException
Response received with status code not having 200 for OK. Response Exception not occurs for code 404 which indicate resource is not available.
Rest client can perform Synchronous and Asynchronous both type of requests. Synchronous Api’s return response with response code while Asynchronous api’s return response as void and accept extra argument extraResponseListener as callback which respond on completion and failure.
Method : Elasticsearch support all rest opration like GET, POST,PUT,DELETE. Endpoint: Url to call elasticsearch apis like (/_cat/indices for getting list of indexes on elasticsearch). Params: This field is optional and will pass as query strings parameter. Entity: This field is optional and will pass in method type like POST,PUT or filter query request. Headers: This is optional will pass if request need header param.
Additional Parameter for Asynchronous call:
ResponseConsumerFactory: Optional and will use to create HttpAsynchResponseConsumer for callback response for request.
ResponseListener: Listener return callback response as request was complete successfully or failure.
For Elasticsearch connection, Elasticsearch REST Java API provide RestClient and RestClient.build() to get connection.
Below connection client class covers all the ways of connectivity to elastic search depend of elastic servers configuration and accessibility accordingly you can uncomment below methods in customize Http Client. I made this class as Singleton because Elasticsearch client keep connection persistent.
public class ElasticSearchConnectionRestTest {
private RestClient client = null;
private static ElasticSearchConnectionRestTest esc = null;
private ElasticSearchConnectionRestTest() {
}
public static synchronized ElasticSearchConnectionRestTest getInstance() {
if (esc == null) {
esc = new ElasticSearchConnectionRestTest();
}
return esc;
}
public RestClient getClient() {
if (client == null) {
getElasticSearchClient();
}
return client;
}
private RestClient getElasticSearchClient() {
//Basic credential settings
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("usrid", "password"));
client = RestClient.builder(new HttpHost("elasticserchhost1",
Integer.parseInt("elasticsearchport1")),
new HttpHost("elasticserchhost2",
Integer.parseInt("elasticsearchport2"))))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
//Security Settings
@override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
//disable preemptive authentication so that same request done when next
//request
.disableAuthCaching()
//Credentials
.setDefaultCredentialsProvider(credentialsProvider)
//Proxy server settings
.setProxy(new HttpHost("one.proxy.att.com", 8080))
//setting for key store for JKS SSL
//.setSSLContext(sslcontext)
//Number of threads will execute
//.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build())
//connection timeout
//.setConnectTimeout(5000)
//socket connection timeout
//.setSocketTimeout(60000)
;
}
)
//Max retry timeout
.setMaxRetryTimeoutMillis(60000).build();
return client;
}
private void closeConnnection()
{
try
{
client.close();
}catch(IOException ex)
{
ex.printStackTrace();
}
}
}
Details of API’s used by Elasticsearch REST Connection
setHttpClientConfigCallback
This callback method allows to modify the http client configuration like encrypted ,proxy, communication over ssl, socket timeout etc. By using customizeHttpClient we can configure all these values.
setDefaultHeaders
We can set default header if need to sent some value with in every request.
setMaxRetryTimeoutMillis
The timeout value if request do multiple attempts for same request.
setFailureLister
This is listener to get notification whenever get any node fails and need to get any action for same.
Methods of CustomizeHttpClient Builder
Timeout Configuration
connectTimeout: Default value is 1 seconds
socketTimeout: Default 30 seconds
maxRetryTimeoutMilliseconds: Default 30 seconds
Thread Configuration
IoThreadCount: Client start with default one thread and a number of worker threads used by connection manager, as many as the number of locally detected processors.
Authentication Configuration
setDefaultCredentialsProvider : This method require basic credentials for authentication.
httpClientBuilder.disableAuthCaching(): We can disable authentication caching and sent in every request headers to elasticsearch if it will accepted and, if it get failed by receiving a HTTP 401 response message, it will resend the exact same request again with the basic authentication header.
Encrypted Communication
setSSLContext: Set this value for SSL Context for encrypted communication.
Elasticsearch 5 provides low-level client API’s to communicate with Elasticsearch Cluster over HTTP by JAVA. Because of REST services and JSON able to communicate with all version of Elasticseach and across firewall also.
Elasticsearch REST Client sent http requests internally by using Apache Http Async Client.
In case of version change on Elasticsearch Cluster no need to change for client and will not impact any communication.
Initial Release Version : 5.0.0-alpha4
Elasticsearch REST Features
Keep the connection persistent.
Logged request and response by Elasticsearch Rest API.
Load balancing across all available nodes.
Failover in case of node fails and upon specific response codes.
Provide sniffing to the discovery of clusters node.
Minimum Dependencies.
Pre-Requisite
Elasticsearch Rest required minimum Java version is 1.7
Elasticsearch REST Java API Configuration
Maven Configuration
Add below dependency in pom.xml file to import all library.
.
org.elasticsearch.client
rest
5.1.2
Gradle Configuration
Add below dependency in build.gradle file to import all library.
We faced connection refused issue if trying to Logstash output data to Elasticsearch over HTTP that happen because of Proxy configuration or if Elasticsearch on cloud environment.
Generally we faced below exception
[2017-04-24T10:45:32,933][WARN ][logstash.outputs.elasticsearch] UNEXPECTED POOL ERROR {:e=>#}
[2017-04-24T10:45:32,934][ERROR][logstash.outputs.elasticsearch] Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down? {:error_message=>"No Available connections", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError", :will_retry_in_second
Logstash allow proxy declaration in configuration file for Elasticsearch Output as given below in field proxy. For user userid and password is having any special symbol than you have to use ASCII value. For example my password is music@123 then after converting to ASCII value for that is %40 my password become music%40123. Refer this link ASCII CODE for getting ASCII value corresponding to each character.
proxy => "http://userid:passowrd@proxyhost:8080"
For example my userid and password is “smart” and “music@123” below proxy configuration like
Filebeat.yml file with Prospectors, Multiline,Elasticsearch Output and Logging Configuration
You can copy same file in filebeat.yml and run after making below change as per your environment directory structure and follow steps mentioned for Filebeat Download,Installation and Start/Run
Change on Prospectors section for your logs file directory and file name
Configure Multiline pattern as per your logs format as of now set as generic hopefully will work with all pattern
Change on Elasticsearch output section for Host ,Port and other setting if required
Change on logging directory as per you machine directory.
Sample filebeat.yml file
#=============Filebeat prospectors ===============
filebeat.prospectors:
# Here we can define multiple prospectors and shipping method and rules as per #requirement and if need to read logs from multiple file from same patter directory #location can use regular pattern also.
#Filebeat support only two types of input_type log and stdin
##############input type logs configuration#####################
- input_type: log
# Paths of the files from where logs will read and use regular expression if need to read #from multiple files
paths:
- /opt/app/app1/logs/app1-debug*.log*
# make this fields_under_root as true if you want filebeat json out for read files in root.
fields_under_root: true
### Multiline configuration for handeling stacktrace, Object, XML etc if that is the case #and multiline is enabled with below configuration will shipped output for these case in #multiline
# The regexp Pattern that has to be matched. The example pattern matches all lines #starting with [DEBUG,ALERT,TRACE,WARNING log level that can be customize #according to your logs line format
#multiline.pattern: '^\[([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)'
# Default is false.Defines if the pattern match should be negated or not.
#multiline.negate: true
# multiline.match define if pattern not match with above pattern where these line need #to append.Possible values are "after" or "before".
#multiline.match: after
# if you will set this max line after these number of multiline all will ignore
#multiline.max_lines: 50</pre>
<h4>#==========Elasticsearch Output Configuration=======================</h4>
<pre>output.elasticsearch:
# We can configure this flag the output as module.
#enabled: true
#Define elasticsearch elasticsearch HTTP client server host and port. default port for #elasticsearch is 9200
hosts: ["elasticsearver:9200"]
# Filebeat provide gzip compression level which varies from 1 to 9. As compression level #increase processing speed will reduce but network speed increase.By default #compression level disable and value is 0.
compression_level: 0
# Optional protocol by default HTTP. If requires set https and basic auth credentials for #credentials if any.
#protocol: "https"
#username: "userid"
#password: "pwd"
# we can configure number of worker for each host publishing events to elasticseach #which will do load balancing.
#worker: 1
# Optional index name. The default is "filebeat" plus date and generates filebeat-{YYYY.MM.DD} keys.
index: "app1-%{+yyyy.MM.dd}"
# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
# Optional HTTP Path
#path: "/elasticsearch"
# Proxy server url
#proxy_url: http://proxy:3128
# Default value is 3. When max retry reach specified limit and evens not published all #events will drop. Filebeat also provide option to retry until all events are published by #setting value as less than 0.
#max_retries: 3
#Default values is 50. If filebeat is generating events more than configure batch max size it will split events in configure size batches and send to elasticsearch. As much as batch size will increase performance will improve but require more buffring. It can cause other issue like connection, errors, timeout for requests.
#bulk_max_size: 50
#Default value is 90 seconds. If no response http request will timeout.
#timeout: 90
# waiting time for new events for bulk requests. If bulk request max size sent before this #specified time, new bulk index request created.
#flush_interval: 1s
# We can update elasticsearch index template from filebeat which will define settings #and mappings to determine field analysis.
# Set to false to disable template loading.
#template.enabled: true
# Template name. By default the template name is filebeat.
#template.name: "app1"
# Path to template file
#template.path: "${path.config}/app1.template.json"
#Set template.overwrite as true and if need to update template file version as 2.x then set #path of Latest template file with below configuration.
#template.overwrite: false
#template.versions.2x.enabled: true
#template.versions.2x.path: "${path.config}/filebeat.template-es2x.json"
# Configure SSL setting id required for Kafk broker
#ssl.enabled: true
# Optional SSL configuration options. SSL is off by default.
# List of root certificates for HTTPS server verifications
#SSL configuration is Optional and OFF by default . It required for server verification if #HTTPS root certificate .
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
#Default value is full. SSL configuration verfication mode is required if SSL is configured .#We can use value as 'none' for testing purpose but in this mode it can accept any #certificate.
#ssl.verification_mode: full
# List of supported/valid TLS versions. By default all TLS versions 1.0 up to
# 1.2 are enabled.
#By Default it support all TLS versions after 1.0 to 1.2. We can also mentioned version in #below array
#ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]
# Define path for certificate for SSL
#ssl.certificate: "/etc/pki/client/cert.pem"
# Define path for Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
# If data is configured and shipped encrypted form. Need to add passphrase for decrypting the Certificate Key otherwise optional
#ssl.key_passphrase: ''
# Configure encryption cipher suites to be used for SSL connections
#ssl.cipher_suites: []
# Configure encryption curve types for ECDHE based cipher suites
#ssl.curve_types: []
#====================Logging ==============================
# Default log level is info if set above or below will record top this hierarchy #automatically. Available log levels are: critical, error, warning, info, debug
logging.level: debug
# Possible values for selectors are "beat", "publish" and "service" if you want to enable #for all select value as "*". This selector decide on command line when start filebeat.
logging.selectors: ["*"]
# The default value is false.If make it true will send out put to syslog.
logging.to_syslog: false
# The default is true. all non-zero metrics reading are output on shutdown.
logging.metrics.enabled: true
# Period of matrics for log reading counts from log files and it will send complete report #when shutdown filebeat
logging.metrics.period: 30s
# Set this flag as true to enable logging in files if not set that will disable.
logging.to_files: true
logging.files:
# Path of directory where logs file will write if not set default directory will home #directory.
path: /tmp
# Name of files where logs will write
name: filebeat-app.log
# Log File will rotate if reach max size and will create new file. Default value is 10MB
rotateeverybytes: 10485760 # = 10MB
# This will keep recent maximum log files in directory for rotation and remove oldest #files.
keepfiles: 7
# Will enable logging for that level only. Available log levels are: critical, error, warning, #info, debug
level: debug
If we need to shipped server logs lines directly to elasticseach over HTTP by filebeat . We have set below fields for elasticsearch output according to your elasticsearch server configuration and follow below steps.
Uncomment output.elasticsearch in filebeat.yml file Elasticsearch
Set host and port in hosts line
Set index name as you want. If it’s not set filebeat will create default index as “filebeat-%{+yyyy.MM.dd}” .
Elasticsearch server credentials configuration if any
Set user name and password
Set protocol if https because default protocol is http
username:userid
password:pwd
Elasticsearch Index Template Configuration: We can update elasticsearch index template from filebeat which will define settings and mappings to determine field analysis.
Auto Index Template Loading: Filebeat package will load default template filebeat.template.json to elasticsearch if no any template configuration for template and will not overwrite template.
Customize Index Template Loading: We can upload our user define template and update version also by using below configuration.
#(if set as false template need to upload manually)
template.enabled:true
#default value is filebeat
template.name:"app1"
#default value is filebeat.template.json.
template.path:"app1.template.json"
#default value is false
template.overwrite:false
By default, template.overwrite value is false and will not overwrite index template if already exist on elasticsearch. For overwriting index template make this flag as true in filebeat.yml configuraton file.
Latest Template Version Loading from Filebeat: Set template.overwrite as true and if need to update template file version as 2.x then set path of Latest template file with below configuration.
Manually Index Template Loading : for manually index loading please refer Elasticsearch Index Template Management.
Compress Elasticsearch Output : Filebeat provide gzip compression level which varies from 1 to 9. As compression level increase processing speed will reduce but network speed increase.By default compression level disable and value is 0.
compress_level: 0
Other configuration Options:
bulk_max_size : Default values is 50. If filebeat is generating events more than configure batch max size it will split events in configure size batches and send to elasticsearch. As much as batch size will increase performance will improve but require more buffring. It can cause other issue like connection, errors, timeout for requests.
Never set value of bulk size as 0 because there would not be any buffering for events and filebeat will send each event directly to elasticsearch.
timeout: Default value is 90 seconds. If no response http request will timeout.
flush_interval: waiting time for new events for bulk requests. If bulk request max size sent before this specified time, new bulk index request created.
max_retries: Default value is 3. When max retry reach specified limit and evens not published all events will drop. Filebeat also provide option to retry until all events are published by setting value as less than 0.
worker: we can configure number of worker for each host publishing events to elasticseach which will do load balancing.
To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial and Filebeat Issues. To know more about YAML follow link YAML tutorials.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
You must be logged in to post a comment.