Tag Archives: topic

Logstash Input and Output to/from Kafka Example


Logstash can take input from Kafka to parse data  and send parsed output to Kafka for streaming to other Application.

Kafka Input Configuration in Logstash

Below are basic configuration for Logstash to consume messages from Logstash. For more information about Logstash, Kafka Input configuration  refer this elasticsearch site Link

input {
   kafka {
       bootstrap_servers => 'KafkaServer:9092'
       topics => ["TopicName"]
       codec => json {}
        }
}

bootstrap_servers : Default value is “localhost:9092”. Here it takes list of all servers connections in the form of  host1:port1,host2:port2  to establish the initial connection to the cluster. It will connect with other if one server is down.

topics: List of topics to subscribe from where it will consume messages.

Kafka Output Configuration in Logstash

Below are basic configuration for Logstash to publish messages to Logstash. For more information about Logstash, Kafka Output configuration  refer this elasticsearch site Link

output {
        kafka {
        bootstrap_servers => "localhost:9092"
        topic_id => 'TopicName'
        }
      }

bootstrap_servers : Default value is “localhost:9092”. Here it takes list of all servers connections in the form of  host1:port1,host2:port2   and producer will only use it for getting metadata(topics, partitions and replicas) .The socket connections for sending the actual data will be established based on the broker information returned in the metadata.

topic_id: Topic name where messages will publish.

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Advertisements

Integrate Filebeat with Kafka

Kafka can consume messages published by Filebeat based on configuration  filebeat.yml file for Kafka Output.

Filebeat Kafka Output Configuration

Filebeat.yml  required below fields to connect and publish message to Kafka for configured topic. Kafka will create Topics dynamically based on filebeat requirement.

output.kafka:
#The list of Kafka broker addresses from where to fetch the cluster metadata.
#The cluster metadata contain the actual Kafka brokers events are published to.
hosts: <strong>["localhost:9092"]</strong>

# The Kafka topic used for produced events. The setting can be a format string
topic: <strong>Topic-Name</strong>

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

For more information about filebeat Kafka Output  configuration option refers below Links.

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Setup Kafka Cluster for Multi/Distributed Servers/Brokers

For setting up Kafka Cluster for Multi Broker/ Server on single Machine follow below steps:

In below example we will create Kafka cluster with three brokers on single machine. All steps are same as configured for Kafka Cluster with Single Server on same machine additionally created two more file for additional brokers and run it on same Cluster.

Download and Installation

Download Latest version of Kafka from link download , copy it to installation directory and run below command to install it.

tar -zxvf kafka_2.11-0.10.0.0

Configuration Changes for Zookeeper and Server

Make below changes  in zookeeper.properties configuration file in config directory.

config/zookeeper.properties

clientPort=2181

clientPort is the port where client will connect. By Default port is 2181 if port will update in zookeeper.properties have to update in below server.properties too.

Make below changes  in server.properties configuration file in config directory.

config/server.properties:

broker.id=0
listeners=PLAINTEXT://:9092
log.dir=/tmp/kafka-logs
zookeeper.connect=localhost:2181

By default server.properties file have above fields with default values.

broker.id : Represents broker unique id by which zookeeper recognize brokers in Kafka Cluster. If Kafka Cluster is having multiple server this broker id will in incremental order for servers.

listeners : Each broker runs on different port by default port for broker is 9092 and can change also.

log.dir:  keep path of logs where Kafka will store steams records. By default point /tmp/kafka-logs.

For more change on property for server.properties file follow  link Kafka Server Properties Configuration.

Multi Server/Broker :

For creating three brokers create two more copy of server.properties configuration file as server1.properties and server2.properties and make below changes in files so that configuration will ready with three brokers on Kafka cluster .

Create copy of server.properties file.

cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties

make below changes corresponding to each configuration file.

config/server1.properties:

broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
zookeeper.connect=localhost:2181

config/server2.properties:

broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
zookeeper.connect=localhost:2181

Start Zookeeper and Servers

Run below files as below in Kafka directory

screen -d -m bin/zookeeper-server-start.sh config/zookeeper.properties
screen -d -m bin/kafka-server-start.sh config/server.properties
screen -d -m bin/kafka-server-start.sh config/server1.properties
screen -d -m bin/kafka-server-start.sh config/server2.properties

Check status of Zookeeper & Servers

Below commands will return the port of Zookeeper and Servers processes Id

ps aux | grep zookeeper.properties
ps aux | grep server.properties
ps aux | grep server1.properties
ps aux | grep server2.properties

 Now Kafka is ready to create topic publish and subscribe messages also.

Create a Topic and Check Status

Create topic with user defined name and by passing replication and number partitions for topic. For more info about how partition stores in Kafka Cluster Env follow link for Kafka Introduction and Architecture.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic multi-test

Result:
Created topic "multi-test".

above command will create a topic multi-test with configured partition as 1 and replica as 3.

List of available Topics  in Zookeeper

Run below command to get list of topics

bin/kafka-topics.sh --list --zookeeper localhost:2181

Result:
test
multi-test

Description of Topic

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic multi-test

Result:
Topic:multi-test    PartitionCount:1   ReplicationFactor:3     Configs:
Topic: multi-test   Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1

In above command response .The first line gives a summary of all the partitions, each additional line provide information about one partition. We have only one line for this topic  because  there is one partition.

  • “leader” is the broker responsible for all reads and writes for the given partition. Each broker will be the leader for a randomly selected portion of the partitions.
  • “replicas” is the list of brokers that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • “isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

In above example broker 2 is the leader for one partition for the topic and replicas of these portion are stored  on broker 0 and  then 1.  If any message publish for topic will store in partition 2 first then in brokers 0 and 1 in sequence.

For all the request for this topic will taken care by Broker 2  and if broker is busy or fail by some reason like shutdown then broker 0 will become lead.  See below example I have stopped broker 2 and run below command again and there lead is showing as 0.

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic multi-test
Topic:multi-test     PartitionCount:1       ReplicationFactor:3     Configs:
Topic: multi-test    Partition: 0    Leader: 0       Replicas: 2,0,1 Isr: 0,1

Publish Messages to Topic

To test topic push your messages to topic by running below command

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic multi-test

Input Messages:
Hi Dear
How r u doing?
Where are u these days?

These message after publish to Topic will retain as logs retention is configured for server even it’s read by consumer or not. To get information about Retention Policy configuration follow link Kafka Server Properties Configuration.

Subscribe Messages by Consumer from Topic

Run below command to get all published messages from multi-test Topic. It will return all these messages from beginning.

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic multi-test

Output Messages:
Hi Dear
How r u doing?
Where are u these days?

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Integrate Java with Kafka

Below examples are for Kafka Logs Producer and Consumer by Kafka Java API. Where Producer is sending logs from file to Topic1 on Kafka server and same logs Consumer is subscribing from Topic1. While Kafka Consumer can subscribe logs from multiple servers.

Pre-Requisite:

  • Kafka client work with Java 7 + versions.
  • Add Kafka library to your application class path from Installation directory

Kafka Logs Producer

Below Producer Example will create new topic as Topic1 in Kafka server if not exist and push all the messages in topic from below Test.txt file.

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaLogsProducer {

	public static void main(String[] args) throws Exception{

	    //Topic Name where logs message events need to publish
	    String topicName = "Topic1";
	    // create instance for properties to access producer configs
	    Properties props = new Properties();

	    //Kafka server host and port
	    props.put("bootstrap.servers", "kafkahost:9092");

	    //Will receive acknowledgemnt of requests
	    props.put("acks", "all");

	   //Buffer size of events
	    props.put("batch.size", 16384);

	   //Total available buffer memory to the producer .
	    props.put("buffer.memory", 33553333);

	    //request less than zero
	    props.put("linger.ms", 1);

	    //If the request get fails, then retry again,
	    props.put("retries", 0);

	    props.put("key.serializer",
	       "org.apache.kafka.common.serialization.StringSerializer");

	    props.put("value.serializer",
	       "org.apache.kafka.common.serialization.StringSerializer");

	    //Thread.currentThread().setContextClassLoader(null);
	    Producer<String, String> producer = new KafkaProducer
	       <String, String>(props);
	    File in = new File("C:\\Users\\Saurabh\\Desktop\\Test.txt");
	    try (BufferedReader br = new BufferedReader(new FileReader(in))) {
		    String line;
		    while ((line = br.readLine()) != null) {
		    	 producer.send(new ProducerRecord<String, String>(topicName,
		    	          "message", line));
		    }
		}
	             System.out.println("All Messages sent successfully");
	             producer.close();
	 }
	}

Input File from Directory

C:\Users\Saurabh\Desktop\Test.txt

Hi
This is kafka Producer Test.
Now will check for Response.

Kafka Logs Consumer

Below Kafka Consumer will read from Topic1 and display output to console with offset value. Consumer can be read messages from multiple topics on same time.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaLogsConsumer {

	public static void main(String[] args) {
		//Topics from where message need to consume
		 List<String> topicsList=new ArrayList<String>();
		 topicsList.add("Topic1");
		 //topicsList.add("Topic2");		

		  Properties props = new Properties();
	      props.put("bootstrap.servers", "kafkahost:9092");
	      props.put("group.id", "test");
	      props.put("enable.auto.commit", "true");
	      props.put("auto.commit.interval.ms", "1000");
	      props.put("session.timeout.ms", "30000");
	      props.put("key.deserializer",
	         "org.apache.kafka.common.serialization.StringDeserializer");
	      props.put("value.deserializer",
	         "org.apache.kafka.common.serialization.StringDeserializer");
	      KafkaConsumer<String, String> consumer = new KafkaConsumer
	         <String, String>(props);

	      //Kafka consumer subscribe to all these topics
	      consumer.subscribe(topicsList);

	      System.out.println("Subscribed to topic " + topicsList.get(0));

	      while (true) {
	    	 //Below poll setting will poll to kafka server in every 100 milliseconds
	    	 //and get logs mssage from there
	         ConsumerRecords<String, String> records = consumer.poll(100);
	         for (ConsumerRecord<String, String> record : records)
	         {
	        	//Print offset value of Kafka partition where logs message store and value for it
	        	 System.out.println(record.offset()+"-"+record.value());

	         }
	      }

	}

}

Kafka Consumer Output

1-Hi
2-This is kafka Producer Test.
3-Now will check for Response.

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Kafka Introduction and Architecture

Kafka is Open source distributed, Steam Processing, Message Broker platform written in Java and Scala developed by Apache Software Foundation.

Kafka is massively use for enterprise infrastructure to process stream data or transaction logs on real time. Kafka provide unified, fault-tolerant, high throughput, low latency platform for dealing real time data feeds.

 Important Points about Kafka :

  • Publish/subscribe messaging system.
  • Robust queue able to handle high volume of data.
  • Work for Online and Offline message consumption.
  • In Kafka Cluster each server/Node work like broker and each broker is responsible for published record and may have zero or more partitions per topic.
  • Each records in partition consists fields key, value and timestamp.
  • Kafka use TCP Protocol to communicate between clients and servers.
  • Kafka provide Producer, Consumer, Streams and Connector java API to publish and consume from topics.

Initial Release: January, 2011

Current Release: 0.10.20

Kafka Cluster Architecture?

 Before going to discuss about Kafka Cluster Architecture . Let’s introduce about terminology use of Kafka that will easy to understand about Architecture and flow.

 Broker

Broker is stateless instance of Kafka server in Cluster. We define broker by giving unique id for each server instance. Kafka cluster can have multiple broker instances and each broker can handle hundred thousands of reads and write request or TB data per seconds of messages without any performance impact.

Zookeeper

Kafka Cluster use Zookeeper for managing and coordinating brokers. Producer and Consumer will get notification if new broker added to cluster or if any fail so that producer and consumer can decide about to point available broker.

 Topic

Topic is a category to keeps steams of records which are publish to it. Topic can have zero, one or many consumers for reading data. We can create our own topics by application and manually also.

Topic stored data on portitions and distribute over servers based on number of partition configure per topic and available brokers.

Partition

A partition stores records in sequential orders and will continually to append it. Each record in partition having sequential id number called as offset. Individual Log partition allows the records to scale up to available single server capacity.

How Topic will partitioned for  brokers/servers/nodes?

Suppose, Need to create a topic with N partitions for Kafka Cluster having M brokers.

If (M==N) : Each broker will have one partition.

If (M>N): First available N broker will take one partition for each.

If (M<N): Some brokers may have more than one partitions.

Kafka cluster will retain these partition  as configured for retention policy in server.properties file while it’s consumed or not by default it’s configure for two months and we can modify based on our storage capacity. Kafka performance don’t impact based on data size because it read and write data based on offset values.

 Kafka Cluster Architecture with Multi distrubuted servers

 Detail about above Kafka Cluster for Multi/distributed servers.

Kafka Cluster: Having three servers  and each server is having corresponding  brokers as id 1, 2 and 3.

Zookeeper: Zookeeper runs over Kafka Cluster which keeps detail of availability of brokers and update producers and consumers.

Brokers: 1,2 and 3 which are having topics as T1, T2 and T3 stored in partitions.

Topics: Topics T1, T2 is partitioned as 3 and distributed over the servers 1, 2 and 3 while Topic 3 is partitioned as 1 that is stored in server 3 only.

Partition: Each partitioned for topic is having different no of records from offset 0 to some value where 0 represents oldest records.

Producers: APP1, APP2 and APP3 is writing to different topics on T1, T2 and T3 which are created by Applications or manually.

Consumers: Topic T3 is consumed by applications APP5 and APP6 while Topic T1 is consumed by APP4 and T2 is consumed by APP5 only.  One topic can be consumed by multiple APPs.

How Kafka Cluster Flow works for Producers and Consumers?

 I will divide above architectures in two parts as “Producer to Kafka Cluster” and “Kafka Cluster to Consumer” because producer and consumer runs parallel and independent of each others.

Producer to Kafka Cluster

  • Create Topic manually or by application with configuration for portioned and replica.
  • Producer will connect with Kafka Cluster with topic name . Kafka cluster will check in Zookeeper for available broker and send broker id to Producer .
  • Producer will publish message to available broker to store in sequential order to partition. If anything got change with Kafka cluster servers like add or fail server Zookeeper updated to Producer.
  • If replication is configured for topic will keep copy of partition on another server for fault tolerant.

Kafka Cluster to Consumer

  • Consumer will point to Topic on Kafka Cluster  as required by Application.
  • Consumer will subscribe records from Topic based on required offset value (like beginning, now or from last).
  • If consumer wants records from now  Zookeeper will send offset value to Consumer to start read records from brokers partitions.
  • If  required offset is not exist in Broker partition where Consumer was pointing reading data then Zookeeper will return available broker id  with partition detail to Consumer.
  • If in between one broker is down during Consumer is reading records from it. Zookeeper will send will send available broker id  with partition detail to Consumer.

Kafka Cluster with Single Server: Will create no of partition on the same server per topic.

Kafka Cluster with Multi Server/Distributed: Topic partitions logs are distributed on all the servers in the Kafka cluster and each server is able to handle data and requests for share partitions. If replication is configure servers will keep number of copies of partition logs distributed to servers for fault tolerance.

Kafka Cluster Load balance for multi-server or distributed servers?

For each Topic partitions log having one server/broker as “leader” while others are followers (if multi server/distributed). Leader handles all read and write requests from producer and consumer while followers make replica of lead server partition. If somehow leader fail or server down for one machine then one of followers will become leader and rest server will follower. For more detail go to Kafka Cluster with multi server on same machine.

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Filebeat, Kafka Output Configuration

If need  to shipped server logs lines  directly to Kafka. Follow below steps:

Pre-Requisite :

  • Start Kafka  before start filebeat to listen publish events and configure filebeat with same kafka server port

Kafka Output  Required Configuration :

  • Comment out output.elasticsearch output section and uncomment output.kafka section
  • Set enabled value is true to make kafka output as enabled
  • Set host  of server where Kafka is running for listening  by default port for Kafka is 9092 if any change use same port value.
output.kafka:
 enabled:true
 #configure topic as per your application need
 hosts:["kafkaserver:9092"]
 topic:QC-TEST

Kafka Credentials Settings: Set below credentials if any for Kafka broker.

 username:"userid"
 password:"password"

Other Optional Configurations:

Kafka Output Compression Configuration:

Default value for compression is gzip. We can also set other compression codec like snappy, gzip or none.

compression:gzip

Logstash Output Performance Configuration:

worker:  we can configure number of worker for each host publishing events to elasticsearch which will do load balancing.

Kafka Broker Topic Partition Configuration:

key: Default no key setting. But we can use formatted key settings.

partition.hash: Default partition strategy is ‘hash’ using key values set. If not set key value will randomly distribute publish events.

reachable_only: Default value  is false. If reach_only enabled event will publish only reachable kafka brokers.

hash: [] Default value is empty list. Configure alternative event field names used to compute the hash value. If empty output.kafka.key setting will be used.

version: Kafka Broker version to configure so that filebeat can check compatibility with that.

Meta Data Configuration: Meta data information is required for broker event publishing so that filebeat can take  #decision based on status of brokers.

metadata:

retry.max: Defaults value for max 3 retries selection of available brokers.

retry.backoff: Default value is 250ms. Will wait for specified time before make next retries.
refresh_frequency: Will update meta data information  in every 10 minutes.

max_retries: Default value is 3. If set less than 0 filebeat will retry continuously as logs as events not publish.

bulk_max_size: The Default value is 2048.It shows max number of batch events will publish to Kafka in one request.

Kafka Reliability Setting:

#Default Value is 1 for ACK for reliability. Possible values can be :

#0=no response , Message can be lost on some error happens

#1=wait for local commit

#-1=wait for all replicas to commit.
required_acks: 1
timeout: The default value is 30 second. It will timeout if not hear any response from Kafka broker with in specified time.
broker_timeout: Default is value is 10 seconds. During this max duration broker will wait for number #of required acknowledgement.
channel_buffer_size: Default value is 256 for buffered message for Kafka broker.
keep_alive: Default value is 0 seconds  as keep alive is disabled and if this value set will keep alive active network connection for that time.
max_message_bytes: Default value is 1000000 bytes . If Json value is more than configured max message bytes event will dropped.

flush_interval: Waiting Interval between new events and previous events for read logs.

client_id: Default value is beat. We can set values for this field that will help for analysis and auditing purpose.

Sample configuration file

Sample filebeat.yml file for Kafka Output Configuration

Integration

Complete Integration Example Filebeat, Kafka, Logstash, Elasticsearch and Kibana

Read More

To read more on Filebeat topics, sample configuration files and integration with other systems with example follow link Filebeat Tutorial  and  Filebeat Issues. To Know more about YAML follow link YAML Tutorials.

Leave you feedback to enhance more on this topic so that make it more helpful for others.