基于Docker可以很轻松的搭建一个kafka集群,其他机器上的应用如何使用这个kafka集群服务呢?本次实战就来解决这个问题。

<>基本情况

整个实战环境一共有三台机器,各自的职责如下图所示:

IP地址 身份 备注
192.168.1.102 消息生产者 这是个spring boot应用,
应用名称是kafka01103producer,
01103代表kafka版本0.11.0.3
192.168.1.101 Docker server 此机器上安装了Docker,并且运行了两个容器:zookeeper和kafka
192.168.1.104 消息消费者 这是个spring boot应用,
应用名称是kafka01103consumer,
01103代表kafka版本0.11.0.3
整个环境的部署情况如下图:


<>版本信息

* 操作系统:Centos7
* docker:17.03.2-ce
* docker-compose:1.23.2
* kafka:0.11.0.3
* zookeeper:3.4.9
* JDK:1.8.0_191
* spring boot:1.5.9.RELEASE
* spring-kafka:1.3.8.RELEASE
<>重点介绍

本次实战有几处重点需要注意:

* spring-kafka和kafka的版本匹配问题,请关注官方文档:https://spring.io/projects/spring-kafka
<https://spring.io/projects/spring-kafka>
* kafka的kafka的advertised.listeners配置,应用通过此配置来连接broker;
* 应用所在服务器要配置host,才能连接到broker;
接下来开始实战吧;

<>配置host

为了让生产和消费消息的应用能够连接kafka成功,需要配置应用所在服务器的/etc/hosts文件,增加以下一行内容:
192.168.1.101 kafka1
192.168.1.101是docker所在机器的IP地址;

请注意,生产和消费消息的应用所在服务器都要做上述配置;


可能有的读者在此会有疑问:为什么要配置host呢?我把kafka配置的advertised.listeners配置成kafka的IP地址不就行了么?这样的配置我试过,但是用kafka-console-producer.sh和kafka-console-consumer.sh连接kafka的时候会报错"LEADER_NOT_AVAILABLE"。

<>在docker上部署kafka

* 在docker机器上编写docker-compose.yml文件,内容如下: version: '2' services: zookeeper:
image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: image:
wurstmeister/kafka:2.11-0.11.0.3 ports: - "9092:9092" environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_LISTENERS: PLAINTEXT:
//:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS:
"topic001:2:1" volumes: - /var/run/docker.sock:/var/run/docker.sock
上述配置中有两处需要注意:

第一,KAFKA_ADVERTISED_LISTENERS的配置,这个参数会写到kafka配置的advertised.listeners这一项中,应用会用来连接broker;
第二,KAFKA_CREATE_TOPICS的配置,表示容器启动时会创建名为"topic001"的主题,并且partition等于2,副本为1;

* 在docker-compose.yml所在目录执行命令docker-compose up -d,启动容器;
* 执行命令docker ps,可见容器情况,kafka的容器名为temp_kafka1_1: [root@hedy temp]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES ba5374d6245c
wurstmeister/zookeeper"/bin/sh -c '/usr/..." About an hour ago Up About an hour
22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp temp_zookeeper_1
2c58f46bb772 wurstmeister/kafka:2.11-0.11.0.3"start-kafka.sh" About an hour ago
Up About an hour 0.0.0.0:9092->9092/tcp temp_kafka1_1
* 执行以下命令可以查看topic001的基本情况: docker exec temp_kafka1_1 \ kafka-topics.sh \
--describe \ --topic topic001 \ --zookeeper zookeeper:2181
看到的信息如下:
Topic:topic001 PartitionCount:2 ReplicationFactor:1 Configs: Topic: topic001
Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: topic001 Partition: 1
Leader: 1001 Replicas: 1001 Isr: 1001
<>源码下载

接下来的实战是编写生产消息和消费消息的两个应用的源码,您可以选择直接从GitHub下载这两个工程的源码,地址和链接信息如下表所示:

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos
<https://github.com/zq2599/blog_demos> 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git
<https://github.com/zq2599/blog_demos.git> 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com <mailto:git@github.com>:zq2599/blog_demos.git
该项目源码的仓库地址,ssh协议
这个git项目中有多个文件夹,本章源码在kafka01103consumer和kafka01103producer这两个文件夹下,如下图红框所示:


接下来开始编码:

<>开发生产消息的应用

* 创建一个maven工程,pom.xml内容如下: <?xml version="1.0" encoding="UTF-8"?> <project
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</
groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>
1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository -->
</parent> <groupId>com.bolingcavalry</groupId> <artifactId>kafka01103producer</
artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka01103producer</name> <
description>Demo project for Spring Boot</description> <properties> <
java.version>1.8</java.version> </properties> <dependencies> <dependency> <
groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</
artifactId> </dependency> <dependency> <groupId>org.springframework.boot</
groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</
artifactId> <version>1.3.8.RELEASE</version> </dependency> <dependency> <groupId
>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</
version> </dependency> <dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </
dependency> </dependencies> <build> <plugins> <plugin> <groupId>
org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</
artifactId> </plugin> </plugins> </build> </project>
再次强调spring-kafka版本和kafka版本的匹配很重要;
2. 配置文件application.properties内容:
#kafka相关配置 spring.kafka.bootstrap-servers=kafka1:9092 #设置一个默认组
spring.kafka.consumer.group-id=0 #key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量 spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
* 发送消息的业务代码只有一个MessageController类: package com.bolingcavalry.
kafka01103producer.controller; import com.alibaba.fastjson.JSONObject; import
org.springframework.beans.factory.annotation.Autowired; import org.
springframework.kafka.core.KafkaTemplate; import org.springframework.util.
concurrent.ListenableFuture; import org.springframework.web.bind.annotation.*;
import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID;
/** * @Description: 接收web请求,发送消息到kafka * @author: willzhao E-mail:
zq2599@gmail.com * @date: 2019/1/1 11:44 */ @RestController public class
MessageController { @Autowired private KafkaTemplate kafkaTemplate;
@RequestMapping(value = "/send/{name}/{message}", method = RequestMethod.GET)
public @ResponseBody String send(@PathVariable("name") final String name,
@PathVariable("message") final String message) { SimpleDateFormat
simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String timeStr =
simpleDateFormat.format(new Date()); JSONObject jsonObject = new JSONObject();
jsonObject.put("name", name); jsonObject.put("message", message); jsonObject.put
("time", timeStr); jsonObject.put("timeLong", System.currentTimeMillis());
jsonObject.put("bizID", UUID.randomUUID()); String sendMessage = jsonObject.
toJSONString(); ListenableFuture future = kafkaTemplate.send("topic001",
sendMessage); future.addCallback(o -> System.out.println("send message success
: " + sendMessage), throwable -> System.out.println("send message fail : " +
sendMessage)); return "send message to [" + name + "] success (" + timeStr + ")"
; } }
* 编码完成后,在pom.xml所在目录执行命令mvn clean package -U -DskipTests
,即可在target目录下发现文件kafka01103producer-0.0.1-SNAPSHOT.jar,将此文件复制到192.168.1.102机器上;
* 登录192.168.1.102,在文件kafka01103producer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar
kafka01103producer-0.0.1-SNAPSHOT.jar,即可启动生产消息的应用;
<>开发消费消息的应用

* 创建一个maven工程,pom.xml内容如下: <?xml version="1.0" encoding="UTF-8"?> <project
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</
groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>
1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository -->
</parent> <groupId>com.bolingcavalry</groupId> <artifactId>kafka01103consumer</
artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka01103consumer</name> <
description>Demo project for Spring Boot</description> <properties> <
java.version>1.8</java.version> </properties> <dependencies> <dependency> <
groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</
artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</
groupId> <artifactId>spring-kafka</artifactId> <version>1.3.8.RELEASE</version>
</dependency> <dependency> <groupId>org.springframework.boot</groupId> <
artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </
dependency> </dependencies> <build> <plugins> <plugin> <groupId>
org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</
artifactId> </plugin> </plugins> </build> </project>
再次强调spring-kafka版本和kafka版本的匹配很重要;
2. 配置文件application.properties内容:
#kafka相关配置 spring.kafka.bootstrap-servers=192.168.1.101:9092 #设置一个默认组
spring.kafka.consumer.group-id=0 #key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量 spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
* 消费消息的业务代码只有一个Consumer类,收到消息后,会将内容内容和消息的详情打印出来: @Component public class
Consumer { @KafkaListener(topics = {"topic001"}) public void listen(
ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(
record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.
get(); System.out.println("----------------- record =" + record); System.out.
println("------------------ message =" + message); } } }
* 编码完成后,在pom.xml所在目录执行命令mvn clean package -U -DskipTests
,即可在target目录下发现文件kafka01103consumer-0.0.1-SNAPSHOT.jar,将此文件复制到192.168.1.104机器上;
* 登录192.168.1.104,在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar
kafka01103consumer-0.0.1-SNAPSHOT.jar,即可启动消费消息的应用,控制台输出如下: 2019-01-01
13:41:41.747 INFO 1422 ---[ main] o.a.kafka.common.utils.AppInfoParser : Kafka
version: 0.11.0.2 2019-01-01 13:41:41.748 INFO 1422 --- [ main]
o.a.kafka.common.utils.AppInfoParser: Kafka commitId : 73be1e1168f91ee2
2019-01-01 13:41:41.787 INFO 1422 ---[ main] o.s.s.c.ThreadPoolTaskScheduler :
Initializing ExecutorService 2019-01-01 13:41:41.912 INFO 1422 ---[ main]
c.b.k.Kafka01103consumerApplication: Started Kafka01103consumerApplication in
11.876 seconds(JVM running for 16.06) 2019-01-01 13:41:42.699 INFO 1422 --- [
ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered
coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0. 2019-01-01
13:41:42.721 INFO 1422 ---[ntainer#0-0-C-1]
o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned
partitions [] for group 0 2019-01-01 13:41:42.723 INFO 1422 --- [ntainer
#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2019-01-01 13:41:42.724 INFO 1422 ---[ntainer#0-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group 0 2019-01-01
13:41:42.782 INFO 1422 ---[ntainer#0-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group 0 with
generation 5 2019-01-01 13:41:42.788 INFO 1422 --- [ntainer#0-0-C-1]
o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions
[topic001-1, topic001-0] for group 0 2019-01-01 13:41:42.805 INFO 1422 --- [
ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions
assigned:[topic001-1, topic001-0] 2019-01-01 13:48:00.938 INFO 1422 --- [ntainer
#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously
assigned partitions [topic001-1, topic001-0] for group 0 2019-01-01
13:48:00.939 INFO 1422 ---[ntainer#0-0-C-1]
o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[topic001-1,
topic001-0]
上述内容显示了当前应用消费了两个partition;

*
再启动一个同样的应用,这样每个应用负责一个parititon的消费,做法是在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令
java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar --server.port=8081,看看控制台的输出:
2019-01-01 13:47:58.068 INFO 1460 ---[ main]
o.a.kafka.common.utils.AppInfoParser: Kafka version : 0.11.0.2 2019-01-01
13:47:58.069 INFO 1460 ---[ main] o.a.kafka.common.utils.AppInfoParser : Kafka
commitId: 73be1e1168f91ee2 2019-01-01 13:47:58.103 INFO 1460 --- [ main]
o.s.s.c.ThreadPoolTaskScheduler: Initializing ExecutorService 2019-01-01
13:47:58.226 INFO 1460 ---[ main] c.b.k.Kafka01103consumerApplication : Started
Kafka01103consumerApplicationin 11.513 seconds (JVM running for 14.442)
2019-01-01 13:47:59.007 INFO 1460 ---[ntainer#0-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator kafka1:9092
(id: 2147482646 rack: null) for group 0. 2019-01-01 13:47:59.030 INFO 1460 --- [
ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously
assigned partitions [] for group 0 2019-01-01 13:47:59.031 INFO 1460 --- [
ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2019-01-01 13:47:59.032 INFO 1460 ---[ntainer#0-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group 0 2019-01-01
13:48:00.967 INFO 1460 ---[ntainer#0-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group 0 with
generation 6 2019-01-01 13:48:00.985 INFO 1460 --- [ntainer#0-0-C-1]
o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions
[topic001-0] for group 0 2019-01-01 13:48:01.015 INFO 1460 --- [ntainer
#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions
assigned:[topic001-0]
可见新的进程消费的是0号partition,此时再去看看先启动的进程的控制台,见到了新的日志,显示该进程只消费1号pairtition了:
2019-01-01 13:48:00.955 INFO 1422 --- [ntainer#0-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group 0 with
generation 6 2019-01-01 13:48:00.960 INFO 1422 --- [ntainer#0-0-C-1]
o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions
[topic001-1] for group 0 2019-01-01 13:48:00.967 INFO 1422 --- [ntainer
#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions
assigned:[topic001-1]
<>验证消息的生产和消费

* 在浏览器输入以下地址:192.168.1.102:8080/send/Tom/hello
* 浏览器显示返回的结果是:send message to [Tom] success (2019-01-01 13:58:08),表示操作成功;
* 去检查两个消费者进程的控制台,发现其中一个成功的消费了消息,如下: ----------------- record =ConsumerRecord(
topic= topic001, partition = 0, offset = 0, CreateTime = 1546351226016,
serialized key size= -1, serialized value size = 133, headers = RecordHeaders(
headers= [], isReadOnly = false), key = null, value = {"timeLong":1546351225804,
"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f","time":"2019-01-01
22:00:25","message":"hello"}) ------------------ message ={"timeLong"
:1546351225804,"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f",
"time":"2019-01-01 22:00:25","message":"hello"}
至此,外部应用使用基于Docker的kafa服务实战就完成了,如果您也在用Docker部署kafka服务,给外部应用使用,希望本文能给您提供一些参考;

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信