Below examples are for Kafka Logs Producer and Consumer by Kafka Java API. Where Producer is sending logs from file to Topic1 on Kafka server and same logs Consumer is subscribing from Topic1. While Kafka Consumer can subscribe logs from multiple servers.
Pre-Requisite:
- Kafka client work with Java 7 + versions.
- Add Kafka library to your application class path from Installation directory
Kafka Logs Producer
Below Producer Example will create new topic as Topic1 in Kafka server if not exist and push all the messages in topic from below Test.txt file.
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaLogsProducer { public static void main(String[] args) throws Exception{ //Topic Name where logs message events need to publish String topicName = "Topic1"; // create instance for properties to access producer configs Properties props = new Properties(); //Kafka server host and port props.put("bootstrap.servers", "kafkahost:9092"); //Will receive acknowledgemnt of requests props.put("acks", "all"); //Buffer size of events props.put("batch.size", 16384); //Total available buffer memory to the producer . props.put("buffer.memory", 33553333); //request less than zero props.put("linger.ms", 1); //If the request get fails, then retry again, props.put("retries", 0); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //Thread.currentThread().setContextClassLoader(null); Producer producer = new KafkaProducer (props); File in = new File("C:\\Users\\Saurabh\\Desktop\\Test.txt"); try (BufferedReader br = new BufferedReader(new FileReader(in))) { String line; while ((line = br.readLine()) != null) { producer.send(new ProducerRecord(topicName, "message", line)); } } System.out.println("All Messages sent successfully"); producer.close(); } }
Input File from Directory
C:\Users\Saurabh\Desktop\Test.txt
Hi This is kafka Producer Test. Now will check for Response.
Kafka Logs Consumer
Below Kafka Consumer will read from Topic1 and display output to console with offset value. Consumer can be read messages from multiple topics on same time.
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaLogsConsumer { public static void main(String[] args) { //Topics from where message need to consume List topicsList=new ArrayList(); topicsList.add("Topic1"); //topicsList.add("Topic2"); Properties props = new Properties(); props.put("bootstrap.servers", "kafkahost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer (props); //Kafka consumer subscribe to all these topics consumer.subscribe(topicsList); System.out.println("Subscribed to topic " + topicsList.get(0)); while (true) { //Below poll setting will poll to kafka server in every 100 milliseconds //and get logs mssage from there ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { //Print offset value of Kafka partition where logs message store and value for it System.out.println(record.offset()+"-"+record.value()); } } } }
Kafka Consumer Output
1-Hi 2-This is kafka Producer Test. 3-Now will check for Response.
Read More on Kafka
- Kafka Introduction and Architecture
- Kafka Server Properties Configuration
- Setup Kafka Cluster for Single Server/Broker
- Setup Kafka Cluster for Multi/Distributed Servers/Brokers
- Integrate Logstash with Kafka
- Integrate Filebeat with Kafka
- Kafka and Zookeeper Common Issues
Integration
Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana
6 thoughts on “Integrate Java with Kafka”