Java and Kafka Integration

Below examples are for Kafka Logs Producer and Consumer by Kafka Java API. Where Producer is sending logs from file to Topic1 on Kafka server and same logs Consumer is subscribing from Topic1. While Kafka Consumer can subscribe logs from multiple servers.


  • Kafka client work with Java 7 + versions.
  • Add Kafka library to your application class path from Installation directory

Kafka Logs Producer

Below Producer Example will create new topic as Topic1 in Kafka server if not exist and push all the messages in topic from below Test.txt file.

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaLogsProducer {

	public static void main(String[] args) throws Exception{

	    //Topic Name where logs message events need to publish
	    String topicName = "Topic1";
	    // create instance for properties to access producer configs
	    Properties props = new Properties();

	    //Kafka server host and port
	    props.put("bootstrap.servers", "kafkahost:9092");

	    //Will receive acknowledgemnt of requests
	    props.put("acks", "all");

	   //Buffer size of events
	    props.put("batch.size", 16384);

	   //Total available buffer memory to the producer .
	    props.put("buffer.memory", 33553333);

	    //request less than zero
	    props.put("", 1);

	    //If the request get fails, then retry again,
	    props.put("retries", 0);



	    Producer producer = new KafkaProducer
	    File in = new File("C:\\Users\\Saurabh\\Desktop\\Test.txt");
	    try (BufferedReader br = new BufferedReader(new FileReader(in))) {
		    String line;
		    while ((line = br.readLine()) != null) {
		    	 producer.send(new ProducerRecord(topicName,
		    	          "message", line));
	             System.out.println("All Messages sent successfully");

Input File from Directory


This is kafka Producer Test.
Now will check for Response.

Kafka Logs Consumer

Below Kafka Consumer will read from Topic1 and display output to console with offset value. Consumer can be read messages from multiple topics on same time.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaLogsConsumer {

	public static void main(String[] args) {
		//Topics from where message need to consume
		 List topicsList=new ArrayList();

		  Properties props = new Properties();
	      props.put("bootstrap.servers", "kafkahost:9092");
	      props.put("", "test");
	      props.put("", "true");
	      props.put("", "1000");
	      props.put("", "30000");
	      KafkaConsumer consumer = new KafkaConsumer

	      //Kafka consumer subscribe to all these topics

	      System.out.println("Subscribed to topic " + topicsList.get(0));

	      while (true) {
	    	 //Below poll setting will poll to kafka server in every 100 milliseconds
	    	 //and get logs mssage from there
	         ConsumerRecords records = consumer.poll(100);
	         for (ConsumerRecord record : records)
	        	//Print offset value of Kafka partition where logs message store and value for it


Kafka Consumer Output

2-This is kafka Producer Test.
3-Now will check for Response.

Let me know your thought on this post.

Happy Learning!!!

Read More on Kafka


Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana