Tag Archives: Cluster

Elasticsearch Overview


“Elasticsearch is open source cross-platform developed completely in Java. It’s built on top of Lucene which provide full text search on high volumes of data quickly and easily do analysis based on indexing. It is schema free and provide NRT(Near real Time) search results.”

Advantage of Elasticsearch

Full Text Search 

Elasticserach built on top of Lucene which provide full-featured  library to search full-text on any open source.

Schema Free 

Elasticsearch stores documents in JSON format and based on it detects words and type to make it searchable.

Restful API 

Elastisearch is easily accessible over browser by using URL and also support for Restful API to perform Operation. For read more on Elasticsearch REST follow link for Elasticsearch REST JAVA API Overview.

Operation Persistence

Elasticsearch cluster keep records of all transaction level changes for schema if anything get change in data for index and track of availability of Nodes in cluster so that make data easily available if any fail-over of any node.

Area of use Elasticsearch?

  • It’s useful in application where need to do analysis, statics and need to find out anomalies on data based on pattern.
  • It’s useful where need to send alerts when particular condition matched like stock market, exception from logs etc.
  • It’s useful with application where log analysis and issue solution provide because of full search in billions of records in milliseconds.
  • It’s compatible with application like Filebeat, Logstash and Kibana for storage of high volume data for analysis and visualize in form of chart and dashboards.

Basic Concepts and Terminology

Cluster

Cluster is a collection of one or more nodes which provide capabilities to search text on scattered data on nodes. It’s identified by unique name with in network so that all associated nodes will join together by cluster name.

For more info on Cluster configuration and query follow link Elasticsearch Cluster.

Elasticsearch Cluster
Elasticsearch Cluster

In above screen elasticsearch cluster “FACING_ISSUE_IN_IT” having three master and four data node.

Node

Node is a Elasticsearch server which associate with a cluster. It’s store data , help cluster for indexing data and search query. It’s identified by unique name in Cluster if name is not provided elasticsearch will generate random Universally Unique Identifier(UUID) on time of server start.

A Cluster can have one or more Nodes .If first node start that will have Cluster with single node and when other node will start will add with that cluster.

For more info on Node Configuration, Master Node, Data Node, Ingest node follow link Elasticsearch Node.

Data Node storage
Data Node Documents Storage

In above screen trying to represent data of two indexes like I1 and I2.Where Index I1 is having two type of documents T1 and T2 while index I2 is having only type T2 and these shards are distributes over all nodes in cluster. This data node is having documents of shard (S1) for  Index I1 and shard (S3) for Index I2. It’s also keeping replica of documents of shards S2 of Index I2 and I1 which are store some other nodes in cluster.

Index

An Index is collection of documents with same characteristics which stores on nodes in distributed fashion and its identify by unique name on which perform different operation like search query, update and delete for documents. A cluster can have as many indexes with unique name.

A document store in Index and assigned a type to it and an Index can have multiple types of documents.

For more info on Index Creation, Mapping Template , CRUD follow link Elasticsearch Index.

Shards

Shards are partitions of indexes scattered on nodes. It provide capability to store large amount (billions) of documents for same index to store in cluster even one disk of node is not capable to store it.

Replica

Replica is copy of shard which store on different node. A shard can have zero or more replica. If shard on one node then replica of shard will store on another node.

Benefits of Shards and Replica

  • Shards splits indexes in horizontal partition for high volumes of data.
  • It perform operations parallel to each shards or replica on multiple node for index so that increase system performance and throughput.
  • Recovered easily in case of fail-over of node because data replica exist on another node because replica always store on different node where shards exist.

Note

When we create index by default elasticseach index configure as 5 shards and 1 replica but we can configure it from config/elasticsearch.yml file or by passing shards and replica values in mapping when index create.

Once index created we can’t change shards configuration but modify in replica. If need to update in shards only option is re-indexing.

Each Shard itself a Lucene index and it can keep max 2,147,483,519 (= Integer.MAX_VALUE – 128) documents. For merging of search results and failover taken care by elasticsearch cluster.

For more info on elasticsearch Shards and Replica follow Elasticsearch Shards and Replica configuration.

Document

Each Record store in index is called a document which store in JSON object. JSON data exchange is fast over internet and easy to handle on browser side display.

Read More

To read more on Elasticsearch Configuration, Sample Elasticsearch REST Clients, Search Queries Types with example follow link Elasticsearch Tutorial and Elasticsearch Issues.

Hope this blog was helpful for you.

Leave you feedback to enhance more on this topic so that make it more helpful for others.

Advertisements

Elasticsearch REST JAVA Client for Cluster Detail


Below is example to get Cluster Detail in Java Object by using Elasticsearch REST Java client. Here client will call endpoint  “/_cluster/health” to retrieve all detail of index list. It is same as we use GET by CURL

GET http://elasticsearchHost:9200/_cluster/health

Pre-requisite

  • Minimum requirement for Java 7 version required.
  • Add below dependency for Elasticsearch REST and JSON Mapping in your pom.xml or add in your class path.

Dependency

<!--Elasticsearch REST jar-->
<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>rest</artifactId>
			<version>5.1.2</version>
</dependency>
<!--Jackson jar for mapping json to Java -->
<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.8.5</version>
</dependency>

Sample Code

import java.io.IOException;
import java.util.Collections;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import com.fasterxml.jackson.databind.ObjectMapper;

public class ElasticsearchRESTClusterClient {

	public static void main(String[] args) {
		ClusterInfo clusterHealth = null;
		RestClient client = null;
		try {
			client = openConnection();
			if (client != null) {
				// performRequest GET method will retrieve all cluster health
				// information from elastic server
				Response response = client.performRequest("GET", "/_cluster/health",
						Collections.singletonMap("pretty", "true"));
				// GetEntity api will return content of response in form of json
				// in Http Entity
				HttpEntity entity = response.getEntity();
				ObjectMapper jacksonObjectMapper = new ObjectMapper();
				// Map json response to Java object in ClusterInfo
				// Cluster Info
				clusterHealth = jacksonObjectMapper.readValue(entity.getContent(), ClusterInfo.class);
				System.out.println(clusterHealth);
			}

		} catch (Exception ex) {
			System.out.println("Exception found while getting cluster detail");
			ex.printStackTrace();
		} finally {
			closeConnnection(client);
		}

	}

	// Get Rest client connection
	private static RestClient openConnection() {
		RestClient client = null;
		try {
			final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
			credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userId", "password"));
			client = RestClient.builder(new HttpHost("elasticHost", Integer.parseInt("9200")))
					.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
						// Customize connection as per requirement
						public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
							return httpClientBuilder
									// Credentials
									.setDefaultCredentialsProvider(credentialsProvider)
									// Proxy
									.setProxy(new HttpHost("ProxyServer", 8080));

						}
					}).setMaxRetryTimeoutMillis(60000).build();

		} catch (Exception ex) {
			ex.printStackTrace();
		}
		return client;
	}

	// Close Open connection
	private static void closeConnnection(RestClient client) {
		if (client != null) {
			try {
				client.close();
			} catch (IOException ex) {
				ex.printStackTrace();
			}
		}
	}

}

Cluster Info Java Object where retrieve json response will map.

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusterInfo {

@JsonProperty(value = "cluster_name")
private String clusterName;
@JsonProperty(value = "status")
private String clusterStatus;
@JsonProperty(value = "active_primary_shards")
private int primaryActiveShards;
@JsonProperty(value = "active_shards")
private int activeShards;
@JsonProperty(value = "delayed_unassigned_shards")
private int delayedUnAssignedShards;
@JsonProperty(value = "unassigned_shards")
private int unAssignedShards;
@JsonProperty(value = "initializing_shards")
private int initializingShards;
@JsonProperty(value = "relocating_shards")
private int relocatingShards;
@JsonProperty(value = "number_of_nodes")
private int totalNodeCount;
@JsonProperty(value = "number_of_data_nodes")
private int dataNodeCount;

@Override
public String toString()
{
	StringBuffer str=new StringBuffer(60);
	str.append("{\n");
	str.append("    \"").append("clusterName").append("\":\"").append(clusterName).append("\",\n");
	str.append("    \"").append("clusterStatus").append("\":\"").append(clusterStatus).append("\",\n");
	str.append("    \"").append("primaryActiveShards").append("\":\"").append(primaryActiveShards).append("\",\n");
	str.append("    \"").append("activeShards").append("\":\"").append(activeShards).append("\",\n");
	str.append("    \"").append("delayedUnAssignedShards").append("\":\"").append(delayedUnAssignedShards).append("\",\n");
	str.append("    \"").append("unAssignedShards").append("\":\"").append(unAssignedShards).append("\",\n");
	str.append("    \"").append("initializingShards").append("\":\"").append(initializingShards).append("\",\n");
	str.append("    \"").append("relocatingShards").append("\":\"").append(relocatingShards).append("\",\n");
	str.append("    \"").append("totalNodeCount").append("\":\"").append(totalNodeCount).append("\",\n");
	str.append("    \"").append("dataNode").append("\":\"").append(dataNodeCount).append("\"");
	str.append("    \"");
	return str.toString();
}

public String getClusterName() {
	return clusterName;
}
public void setClusterName(String clusterName) {
	this.clusterName = clusterName;
}
public String getClusterStatus() {
	return clusterStatus;
}
public void setClusterStatus(String clusterStatus) {
	this.clusterStatus = clusterStatus;
}
public int getPrimaryActiveShards() {
	return primaryActiveShards;
}
public void setPrimaryActiveShards(int primaryActiveShards) {
	this.primaryActiveShards = primaryActiveShards;
}
public int getActiveShards() {
	return activeShards;
}
public void setActiveShards(int activeShards) {
	this.activeShards = activeShards;
}
public int getDelayedUnAssignedShards() {
	return delayedUnAssignedShards;
}
public void setDelayedUnAssignedShards(int delayedUnAssignedShards) {
	this.delayedUnAssignedShards = delayedUnAssignedShards;
}
public int getUnAssignedShards() {
	return unAssignedShards;
}
public void setUnAssignedShards(int unAssignedShards) {
	this.unAssignedShards = unAssignedShards;
}
public int getInitializingShards() {
	return initializingShards;
}
public void setInitializingShards(int initializingShards) {
	this.initializingShards = initializingShards;
}
public int getRelocatingShards() {
	return relocatingShards;
}
public void setRelocatingShards(int relocatingShards) {
	this.relocatingShards = relocatingShards;
}
public int getDataNodeCount() {
	return dataNodeCount;
}
public void setDataNodeCount(int dataNodeCount) {
	this.dataNodeCount = dataNodeCount;
}
public int getTotalNodeCount() {
	return totalNodeCount;
}
public void setTotalNodeCount(int totalNodeCount) {
	this.totalNodeCount = totalNodeCount;
}
}

Read More on Elasticsearch REST

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Kafka Introduction and Architecture


Kafka is Open source distributed, Steam Processing, Message Broker platform written in Java and Scala developed by Apache Software Foundation.

Kafka is massively use for enterprise infrastructure to process stream data or transaction logs on real time. Kafka provide unified, fault-tolerant, high throughput, low latency platform for dealing real time data feeds.

 Important Points about Kafka :

  • Publish/subscribe messaging system.
  • Robust queue able to handle high volume of data.
  • Work for Online and Offline message consumption.
  • In Kafka Cluster each server/Node work like broker and each broker is responsible for published record and may have zero or more partitions per topic.
  • Each records in partition consists fields key, value and timestamp.
  • Kafka use TCP Protocol to communicate between clients and servers.
  • Kafka provide Producer, Consumer, Streams and Connector java API to publish and consume from topics.

Initial Release: January, 2011

Current Release: 0.10.20

Kafka Cluster Architecture?

 Before going to discuss about Kafka Cluster Architecture . Let’s introduce about terminology use of Kafka that will easy to understand about Architecture and flow.

 Broker

Broker is stateless instance of Kafka server in Cluster. We define broker by giving unique id for each server instance. Kafka cluster can have multiple broker instances and each broker can handle hundred thousands of reads and write request or TB data per seconds of messages without any performance impact.

Zookeeper

Kafka Cluster use Zookeeper for managing and coordinating brokers. Producer and Consumer will get notification if new broker added to cluster or if any fail so that producer and consumer can decide about to point available broker.

 Topic

Topic is a category to keeps steams of records which are publish to it. Topic can have zero, one or many consumers for reading data. We can create our own topics by application and manually also.

Topic stored data on portitions and distribute over servers based on number of partition configure per topic and available brokers.

Partition

A partition stores records in sequential orders and will continually to append it. Each record in partition having sequential id number called as offset. Individual Log partition allows the records to scale up to available single server capacity.

How Topic will partitioned for  brokers/servers/nodes?

Suppose, Need to create a topic with N partitions for Kafka Cluster having M brokers.

If (M==N) : Each broker will have one partition.

If (M>N): First available N broker will take one partition for each.

If (M<N): Some brokers may have more than one partitions.

Kafka cluster will retain these partition  as configured for retention policy in server.properties file while it’s consumed or not by default it’s configure for two months and we can modify based on our storage capacity. Kafka performance don’t impact based on data size because it read and write data based on offset values.

 Kafka Cluster Architecture with Multi distrubuted servers

 Detail about above Kafka Cluster for Multi/distributed servers.

Kafka Cluster: Having three servers  and each server is having corresponding  brokers as id 1, 2 and 3.

Zookeeper: Zookeeper runs over Kafka Cluster which keeps detail of availability of brokers and update producers and consumers.

Brokers: 1,2 and 3 which are having topics as T1, T2 and T3 stored in partitions.

Topics: Topics T1, T2 is partitioned as 3 and distributed over the servers 1, 2 and 3 while Topic 3 is partitioned as 1 that is stored in server 3 only.

Partition: Each partitioned for topic is having different no of records from offset 0 to some value where 0 represents oldest records.

Producers: APP1, APP2 and APP3 is writing to different topics on T1, T2 and T3 which are created by Applications or manually.

Consumers: Topic T3 is consumed by applications APP5 and APP6 while Topic T1 is consumed by APP4 and T2 is consumed by APP5 only.  One topic can be consumed by multiple APPs.

How Kafka Cluster Flow works for Producers and Consumers?

 I will divide above architectures in two parts as “Producer to Kafka Cluster” and “Kafka Cluster to Consumer” because producer and consumer runs parallel and independent of each others.

Producer to Kafka Cluster

  • Create Topic manually or by application with configuration for portioned and replica.
  • Producer will connect with Kafka Cluster with topic name . Kafka cluster will check in Zookeeper for available broker and send broker id to Producer .
  • Producer will publish message to available broker to store in sequential order to partition. If anything got change with Kafka cluster servers like add or fail server Zookeeper updated to Producer.
  • If replication is configured for topic will keep copy of partition on another server for fault tolerant.

Kafka Cluster to Consumer

  • Consumer will point to Topic on Kafka Cluster  as required by Application.
  • Consumer will subscribe records from Topic based on required offset value (like beginning, now or from last).
  • If consumer wants records from now  Zookeeper will send offset value to Consumer to start read records from brokers partitions.
  • If  required offset is not exist in Broker partition where Consumer was pointing reading data then Zookeeper will return available broker id  with partition detail to Consumer.
  • If in between one broker is down during Consumer is reading records from it. Zookeeper will send will send available broker id  with partition detail to Consumer.

Kafka Cluster with Single Server: Will create no of partition on the same server per topic.

Kafka Cluster with Multi Server/Distributed: Topic partitions logs are distributed on all the servers in the Kafka cluster and each server is able to handle data and requests for share partitions. If replication is configure servers will keep number of copies of partition logs distributed to servers for fault tolerance.

Kafka Cluster Load balance for multi-server or distributed servers?

For each Topic partitions log having one server/broker as “leader” while others are followers (if multi server/distributed). Leader handles all read and write requests from producer and consumer while followers make replica of lead server partition. If somehow leader fail or server down for one machine then one of followers will become leader and rest server will follower. For more detail go to Kafka Cluster with multi server on same machine.

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana