Integrate Java with Kafka


Below examples are for Kafka Logs Producer and Consumer by Kafka Java API. Where Producer is sending logs from file to Topic1 on Kafka server and same logs Consumer is subscribing from Topic1. While Kafka Consumer can subscribe logs from multiple servers.

Pre-Requisite:

  • Kafka client work with Java 7 + versions.
  • Add Kafka library to your application class path from Installation directory

Kafka Logs Producer

Below Producer Example will create new topic as Topic1 in Kafka server if not exist and push all the messages in topic from below Test.txt file.

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaLogsProducer {

	public static void main(String[] args) throws Exception{

	    //Topic Name where logs message events need to publish
	    String topicName = "Topic1";
	    // create instance for properties to access producer configs
	    Properties props = new Properties();

	    //Kafka server host and port
	    props.put("bootstrap.servers", "kafkahost:9092");

	    //Will receive acknowledgemnt of requests
	    props.put("acks", "all");

	   //Buffer size of events
	    props.put("batch.size", 16384);

	   //Total available buffer memory to the producer .
	    props.put("buffer.memory", 33553333);

	    //request less than zero
	    props.put("linger.ms", 1);

	    //If the request get fails, then retry again,
	    props.put("retries", 0);

	    props.put("key.serializer",
	       "org.apache.kafka.common.serialization.StringSerializer");

	    props.put("value.serializer",
	       "org.apache.kafka.common.serialization.StringSerializer");

	    //Thread.currentThread().setContextClassLoader(null);
	    Producer<String, String> producer = new KafkaProducer
	       <String, String>(props);
	    File in = new File("C:\\Users\\Saurabh\\Desktop\\Test.txt");
	    try (BufferedReader br = new BufferedReader(new FileReader(in))) {
		    String line;
		    while ((line = br.readLine()) != null) {
		    	 producer.send(new ProducerRecord<String, String>(topicName,
		    	          "message", line));
		    }
		}
	             System.out.println("All Messages sent successfully");
	             producer.close();
	 }
	}

Input File from Directory

C:\Users\Saurabh\Desktop\Test.txt

Hi
This is kafka Producer Test.
Now will check for Response.

Kafka Logs Consumer

Below Kafka Consumer will read from Topic1 and display output to console with offset value. Consumer can be read messages from multiple topics on same time.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaLogsConsumer {

	public static void main(String[] args) {
		//Topics from where message need to consume
		 List<String> topicsList=new ArrayList<String>();
		 topicsList.add("Topic1");
		 //topicsList.add("Topic2");		

		  Properties props = new Properties();
	      props.put("bootstrap.servers", "kafkahost:9092");
	      props.put("group.id", "test");
	      props.put("enable.auto.commit", "true");
	      props.put("auto.commit.interval.ms", "1000");
	      props.put("session.timeout.ms", "30000");
	      props.put("key.deserializer",
	         "org.apache.kafka.common.serialization.StringDeserializer");
	      props.put("value.deserializer",
	         "org.apache.kafka.common.serialization.StringDeserializer");
	      KafkaConsumer<String, String> consumer = new KafkaConsumer
	         <String, String>(props);

	      //Kafka consumer subscribe to all these topics
	      consumer.subscribe(topicsList);

	      System.out.println("Subscribed to topic " + topicsList.get(0));

	      while (true) {
	    	 //Below poll setting will poll to kafka server in every 100 milliseconds
	    	 //and get logs mssage from there
	         ConsumerRecords<String, String> records = consumer.poll(100);
	         for (ConsumerRecord<String, String> record : records)
	         {
	        	//Print offset value of Kafka partition where logs message store and value for it
	        	 System.out.println(record.offset()+"-"+record.value());

	         }
	      }

	}

}

Kafka Consumer Output

1-Hi
2-This is kafka Producer Test.
3-Now will check for Response.

Read More on Kafka

Integration

Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana

About Saurabh Gupta

My Name is Saurabh Gupta, I have approx. 10 Year of experience in Information Technology World manly in Java/J2EE. During this time I have worked with multiple organization with different client, so many technology, frameworks etc.
This entry was posted in Kafka and tagged , , , , , . Bookmark the permalink.