Kafka Java API实现的简单Producer和Consumer

from:
原文阅读


关键字:Kafka Java API、producer、consumer前面的文章《Kafka安装配置测试》中安装配置了分布式的Kafka集群,并且使用自带的kafka-console-producer.sh和kafka-console-consumer.sh模拟测试了发送消息和消费消息。

本文使用简单的Java API模拟Kafka的producer和consumer,其中,procuder从一个文本文件中逐行读取内容,然后发送到Kafka,consumer则从Kafka中读取内容并在控制台打印。

Java API Producer


  1. package com.lxw1234.kafka;

  2.  

  3. import java.io.BufferedReader;

  4. import java.io.File;

  5. import java.io.FileReader;

  6. import java.io.IOException;

  7. import java.util.Properties;

  8.  

  9. import kafka.javaapi.producer.Producer;

  10. import kafka.producer.KeyedMessage;

  11. import kafka.producer.ProducerConfig;

  12.  

  13. public class MyProducer {

  14.  

  15. public static void main(String[] args) {

  16. Properties props = new Properties();

  17. props.put("serializer.class", "kafka.serializer.StringEncoder");

  18. props.put("metadata.broker.list", "172.16.212.17:9091,172.16.212.17:9092,172.16.212.102:9091,172.16.212.102:9092");

  19. Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));

  20. String topic = "lxw1234.com";


  21. File file = new File("E:/track-log.txt");

  22. BufferedReader reader = null;

  23. try {

  24. reader = new BufferedReader(new FileReader(file));

  25. String tempString = null;

  26. int line = 1;

  27. while ((tempString = reader.readLine()) != null) {

  28. producer.send(new KeyedMessage<Integer, String>(topic,line + "---" + tempString));

  29. System.out.println("Success send [" + line + "] message ..");

  30. line++;

  31. }

  32. reader.close();

  33. System.out.println("Total send [" + line + "] messages ..");

  34. } catch (Exception e) {

  35. e.printStackTrace();

  36. } finally {

  37. if (reader != null) {

  38. try {

  39. reader.close();

  40. } catch (IOException e1) {}

  41. }

  42. }

  43. producer.close();


  44. }

  45. }

程序从E:/track-log.txt文件中读取内容,发送至Kafka。

Java API Consumer



  1. package com.lxw1234.kafka;

  2.  

  3. import java.util.HashMap;

  4. import java.util.List;

  5. import java.util.Map;

  6. import java.util.Properties;

  7.  

  8. import kafka.consumer.Consumer;

  9. import kafka.consumer.ConsumerConfig;

  10. import kafka.consumer.ConsumerIterator;

  11. import kafka.consumer.KafkaStream;

  12. import kafka.javaapi.consumer.ConsumerConnector;

  13.  

  14. public class MyConsumer {

  15. public static void main(String[] args) {

  16. String topic = "lxw1234.com";

  17. ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());

  18. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

  19. topicCountMap.put(topic, new Integer(1));

  20. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

  21. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);

  22. ConsumerIterator<byte[], byte[]> it = stream.iterator();

  23. while(it.hasNext())

  24. System.out.println("consume: " + new String(it.next().message()));

  25. }


  26. private static ConsumerConfig createConsumerConfig() {

  27. Properties props = new Properties();

  28. props.put("group.id","group1");

  29. props.put("zookeeper.connect","zk1:2181,zk2:2181,zk3:2181");

  30. props.put("zookeeper.session.timeout.ms", "400");

  31. props.put("zookeeper.sync.time.ms", "200");

  32. props.put("auto.commit.interval.ms", "1000");

  33. return new ConsumerConfig(props);

  34. }

  35. }

  36.  

Consumer从Kafka中消费数据,并在控制台中打印消息内容。

运行和结果


先运行Consumer,之后再运行Producer,运行时候将$KAFKA_HOME/lib/下的所有jar包依赖进去。

Producer运行结果如下:

Kafka Producer

文件中只有50000行记录,因为最后又把行号加了一次,因此最后打印出是50001.

Consumer运行结果如下:

Kafka Consumer



Consumer成功获取了5000条数据。

关于Kafka,还有很多疑问,继续尝试和学习吧,enjoy it!
您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » Kafka Java API实现的简单Producer和Consumer