Below examples are for Kafka Logs Producer and Consumer by Kafka Java API. Where Producer is sending logs from file to Topic1 on Kafka server and same logs Consumer is subscribing from Topic1. While Kafka Consumer can subscribe logs from multiple servers.
Pre-Requisite:
- Kafka client work with Java 7 + versions.
- Add Kafka library to your application class path from Installation directory
Kafka Logs Producer
Below Producer Example will create new topic as Topic1 in Kafka server if not exist and push all the messages in topic from below Test.txt file.
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaLogsProducer {
public static void main(String[] args) throws Exception{
//Topic Name where logs message events need to publish
String topicName = "Topic1";
// create instance for properties to access producer configs
Properties props = new Properties();
//Kafka server host and port
props.put("bootstrap.servers", "kafkahost:9092");
//Will receive acknowledgemnt of requests
props.put("acks", "all");
//Buffer size of events
props.put("batch.size", 16384);
//Total available buffer memory to the producer .
props.put("buffer.memory", 33553333);
//request less than zero
props.put("linger.ms", 1);
//If the request get fails, then retry again,
props.put("retries", 0);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//Thread.currentThread().setContextClassLoader(null);
Producer producer = new KafkaProducer
(props);
File in = new File("C:\\Users\\Saurabh\\Desktop\\Test.txt");
try (BufferedReader br = new BufferedReader(new FileReader(in))) {
String line;
while ((line = br.readLine()) != null) {
producer.send(new ProducerRecord(topicName,
"message", line));
}
}
System.out.println("All Messages sent successfully");
producer.close();
}
}
Input File from Directory
C:\Users\Saurabh\Desktop\Test.txt
Hi
This is kafka Producer Test.
Now will check for Response.
Kafka Logs Consumer
Below Kafka Consumer will read from Topic1 and display output to console with offset value. Consumer can be read messages from multiple topics on same time.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaLogsConsumer {
public static void main(String[] args) {
//Topics from where message need to consume
List topicsList=new ArrayList();
topicsList.add("Topic1");
//topicsList.add("Topic2");
Properties props = new Properties();
props.put("bootstrap.servers", "kafkahost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer
(props);
//Kafka consumer subscribe to all these topics
consumer.subscribe(topicsList);
System.out.println("Subscribed to topic " + topicsList.get(0));
while (true) {
//Below poll setting will poll to kafka server in every 100 milliseconds
//and get logs mssage from there
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
{
//Print offset value of Kafka partition where logs message store and value for it
System.out.println(record.offset()+"-"+record.value());
}
}
}
}
Kafka Consumer Output
1-Hi
2-This is kafka Producer Test.
3-Now will check for Response.
Let me know your thought on this post.
Happy Learning!!!
Read More on Kafka
- Kafka Introduction and Architecture
- Kafka Server Properties Configuration
- Setup Kafka Cluster for Single Server/Broker
- Setup Kafka Cluster for Multi/Distributed Servers/Brokers
- Integrate Logstash with Kafka
- Integrate Filebeat with Kafka
- Kafka and Zookeeper Common Issues
Integration
Integrate Filebeat, Kafka, Logstash, Elasticsearch and Kibana
6 thoughts on “Java and Kafka Integration”
You must log in to post a comment.