环境准备

* 启动zookeeper集群和kafka集群,在kafka集群中打开一个消费者
[hadoop@hadoop-100 kafka]$ zkservers start
[hadoop@hadoop-100 kafka]$ zkservers status
[hadoop@hadoop-100 kafka]$ mykafka start
[hadoop@hadoop-100 kafka]$ myjps
[hadoop@hadoop-100 kafka]$ bin/kafka-console-consumer.sh
<http://kafka-console-consumer.sh> --zookeeper hadoop-100:2181 --topic first
* 导入pom依赖 <dependencies> <!--
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <version>0.11.0.0</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka
--> <dependency> <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId> <version>0.11.0.0</version> </dependency>
</dependencies>
Kafka生产者Java API

创建生产者(过时的API)
package com.zj.producer; import kafka.javaapi.producer.Producer; import
kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import
java.util.List; import java.util.Properties; public class OldProducer { public
static void main(String[] args) { Properties properties = new Properties();
properties.put("metadata.broker.list", "hadoop-100:9092");
properties.put("request.required.acks", "1");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
Producer<Integer, String> producer = new Producer<>(new
ProducerConfig(properties)); KeyedMessage<Integer, String> message = new
KeyedMessage<Integer, String>("first","hello api"); producer.send(message); } }
创建生产者(新API)
package com.zj.producer; import
org.apache.kafka.clients.producer.KafkaProducer; import
org.apache.kafka.clients.producer.Producer; import
org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;
public class NewProducer { public static void main(String[] args) { Properties
props = new Properties(); // Kafka服务端的主机名和端口号 props.put("bootstrap.servers",
"hadoop-100:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数
props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 请求延时
props.put("linger.ms", 1); // 发送缓存区内存大小 props.put("buffer.memory", 33554432);
// key序列化 props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); // value序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,
String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i),
Integer.toString(i))); } producer.close(); } }
创建生产者带回调函数(新API)
package com.zj.producer; import org.apache.kafka.clients.producer.*; import
java.util.Properties; public class NewProducerWithCallBack { public static void
main(String[] args) { Properties props = new Properties(); // Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop-100:9092"); // 等待所有副本节点的应答
props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小
props.put("batch.size", 1); // 请求延时 props.put("linger.ms", 1); // 发送缓存区内存大小
props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); // value序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,
String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("second", Integer.toString(i),
"hello" + Integer.toString(i)), new Callback() { @Override public void
onCompletion(RecordMetadata metadata, Exception exception) { if(metadata !=
null) { System.out.println(metadata.partition() + "," + metadata.offset()); } }
}); } producer.close(); } }
自定义分区生产者

需求,将所有数据存储到topic的第0号分区上
定义一个类实现Partitioner接口,重写里面的方法(过时API)
package com.zj.producer.partition; import kafka.producer.Partitioner; import
kafka.utils.VerifiableProperties; public class OldApiMyPartition implements
Partitioner { public OldApiMyPartition(VerifiableProperties props) { }
@Override public int partition(Object key, int numPartitions) { return 0; } }
自定义分区(新API)
package com.zj.producer.partition; import
org.apache.kafka.clients.producer.Partitioner; import
org.apache.kafka.common.Cluster; import java.util.Map; public class
NewApiMyPartition implements Partitioner { @Override public int
partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster) { return 0; } @Override public void close() { }
@Override public void configure(Map<String, ?> configs) { } }
在代码中调用,新API
package com.zj.producer; import
org.apache.kafka.clients.producer.KafkaProducer; import
org.apache.kafka.clients.producer.Producer; import
org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;
public class ProducerWithPartition { public static void main(String[] args) {
Properties props = new Properties(); // Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop-102:9092"); // 等待所有副本节点的应答
props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小
props.put("batch.size", 16384); // 增加服务端请求延时 props.put("linger.ms", 1); //
发送缓存区内存大小 props.put("buffer.memory", 33554432); // key序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); // value序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); // 自定义分区
props.put("partitioner.class", "com.zj.producer.partition.NewApiMyPartition");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("second", "1", "hhh555"));
producer.close(); } }
在代码中调用,旧API
package com.zj.producer; import kafka.javaapi.producer.Producer; import
kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import
java.util.Properties; public class OldApiProducerWithPartition { public static
void main(String[] args) { Properties properties = new Properties();
properties.put("metadata.broker.list", "hadoop-100:9092");
properties.put("request.required.acks", "1");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("partitioner.class",
"com.zj.producer.partition.OldApiMyPartition"); Producer<Integer, String>
producer = new Producer<>(new ProducerConfig(properties));
KeyedMessage<Integer, String> message = new KeyedMessage<Integer,
String>("first","hello api"); producer.send(message); } }
Kafka消费者Java API

高级API

创建消费者(过时API)
package com.zj.consumer; import kafka.consumer.Consumer; import
kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import
kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap; import java.util.List; import java.util.Map; import
java.util.Properties; public class HighOldApiConsumer { public static void
main(String[] args) { Properties properties = new Properties();
properties.put("zookeeper.connect", "hadoop-100:2181");
properties.put("group.id", "test1");
properties.put("zookeeper.session.timeout.ms", "500");
properties.put("zookeeper.sync.time.ms", "250");
properties.put("auto.commit.interval.ms", "1000"); // 创建消费者连接器
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(properties)); HashMap<String, Integer> topicCount = new
HashMap<>(); topicCount.put("first", 1); Map<String, List<KafkaStream<byte[],
byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) {
System.out.println(new String(it.next().message())); } } }
低级API

实现使用低级API读取指定topic,指定partition,指定offset的数据

消费者使用低级API 的主要步骤

* 根据指定的分区从主题元数据中找到主副本
* 获取分区最新的消费进度
* 从主副本拉取分区的消息
* 识别主副本的变化,重试
方法描述
findLeader(),客户端向种子节点发送主题元数据,将副本集加入备用节点
getLastOffset(),消费者客户端发送偏移量请求,获取分区最近的偏移量
run(),消费者低级AP I拉取消息的主要方法
findNewLeader(),当分区的主副本节点发生故障,客户将要找出新的主副本
package com.zj.consumer; import java.nio.ByteBuffer; import
java.util.ArrayList; import java.util.Collections; import java.util.HashMap;
import java.util.List; import java.util.Map; import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder; import
kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import
kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import
kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import
kafka.javaapi.TopicMetadataRequest; import
kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset;
public class LowerConsumer { private List<String> mReplicaBrokers = new
ArrayList<>(); public LowerConsumer() { mReplicaBrokers = new ArrayList<>(); }
public static void main(String args[]) { LowerConsumer lowerConsumer = new
LowerConsumer(); // 最大读取消息数量 long maxReads = Long.parseLong("3"); // 要订阅的topic
String topic = "first"; // 要查找的分区 int partition = Integer.parseInt("0"); //
broker节点的ip List<String> seeds = new ArrayList<>();
seeds.add("192.168.114.100"); seeds.add("192.168.114.101");
seeds.add("192.168.114.102"); // 端口 int port = Integer.parseInt("9092"); try {
lowerConsumer.run(maxReads, topic, partition, seeds, port); } catch (Exception
e) { System.out.println("Oops:" + e); e.printStackTrace(); } } public void
run(long aMaxReads, String aTopic, int aPartition, List<String> aSeedBrokers,
int aPort) throws Exception { // 获取指定Topic partition的元数据 PartitionMetadata
metadata = findLeader(aSeedBrokers, aPort, aTopic, aPartition); if (metadata ==
null) { System.out.println("Can't find metadata for Topic and Partition.
Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't
find Leader for Topic and Partition. Exiting"); return; } String leadBroker =
metadata.leader().host(); String clientName = "Client_" + aTopic + "_" +
aPartition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, aPort,
100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer,
aTopic, aPartition, kafka.api.OffsetRequest.EarliestTime(), clientName); int
numErrors = 0; while (aMaxReads > 0) { if (consumer == null) { consumer = new
SimpleConsumer(leadBroker, aPort, 100000, 64 * 1024, clientName); }
FetchRequest req = new
FetchRequestBuilder().clientId(clientName).addFetch(aTopic, aPartition,
readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short
code = fetchResponse.errorCode(aTopic, aPartition); System.out.println("Error
fetching data from the Broker:" + leadBroker + " Reason: " + code); if
(numErrors > 5) { break; } if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for // the last element
to reset readOffset = getLastOffset(consumer, aTopic, aPartition,
kafka.api.OffsetRequest.LatestTime(), clientName); continue; }
consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker,
aTopic, aPartition, aPort); continue; } numErrors = 0; long numRead = 0; for
(MessageAndOffset messageAndOffset : fetchResponse.messageSet(aTopic,
aPartition)) { long currentOffset = messageAndOffset.offset(); if
(currentOffset < readOffset) { System.out.println("Found an old offset: " +
currentOffset + " Expecting: " + readOffset); continue; } readOffset =
messageAndOffset.nextOffset(); ByteBuffer payload =
messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new
String(bytes, "UTF-8")); numRead++; aMaxReads--; } if (numRead == 0) { try {
Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer !=
null) consumer.close(); } public static long getLastOffset(SimpleConsumer
consumer, String topic, int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime,
1)); kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response
= consumer.getOffsetsBefore(request); if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " +
response.errorCode(topic, partition)); return 0; } long[] offsets =
response.offsets(topic, partition); return offsets[0]; } private String
findNewLeader(String a_oldLeader, String aTopic, int aPartition, int aPort)
throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false;
PartitionMetadata metadata = findLeader(mReplicaBrokers, aPort, aTopic,
aPartition); if (metadata == null) { goToSleep = true; } else if
(metadata.leader() == null) { goToSleep = true; } else if
(a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first
time through if the leader hasn't changed give // ZooKeeper a second to recover
// second time, assume the broker did recover before failover, // or it was a
non-Broker issue // goToSleep = true; } else { return metadata.leader().host();
} if (goToSleep) { Thread.sleep(1000); } } System.out.println("Unable to find
new leader after Broker failure. Exiting"); throw new Exception("Unable to find
new leader after Broker failure. Exiting"); } private PartitionMetadata
findLeader(List<String> aSeedBrokers, int aPort, String aTopic, int aPartition)
{ PartitionMetadata returnMetaData = null; loop: for (String seed :
aSeedBrokers) { SimpleConsumer consumer = null; try { consumer = new
SimpleConsumer(seed, aPort, 100000, 64 * 1024, "leaderLookup"); List<String>
topics = Collections.singletonList(aTopic); TopicMetadataRequest req = new
TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp =
consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for
(TopicMetadata item : metaData) { for (PartitionMetadata part :
item.partitionsMetadata()) { if (part.partitionId() == aPartition) {
returnMetaData = part; break loop; } } } } catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed + "] to find
Leader for [" + aTopic + ", " + aPartition + "] Reason: " + e); } finally { if
(consumer != null) { consumer.close(); } } } if (returnMetaData != null) {
mReplicaBrokers.clear(); for (BrokerEndPoint replica :
returnMetaData.replicas()) { mReplicaBrokers.add(replica.host()); } } return
returnMetaData; } }