一、工作环境准备

erlang kafka客户端库使用的是brod,https://github.com/klarna/brod
<https://github.com/klarna/brod>

emq使用的是v2.3.5版本,https://github.com/emqtt/emq-relx
<https://github.com/emqtt/emq-relx>

kafka的运行环境准备,http://blog.csdn.net/libaineu2004/article/details/79202408
<http://blog.csdn.net/libaineu2004/article/details/79202408>

我们以插件的形式来实现,我的插件路径是/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps

copy一份emq_plugin_template,并更名为emq_plugin_kafka_brod,注意相关配置文件和源文件都要更名。

进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_brod


Makefile文件,新增红色文字

BUILD_DEPS = emqttd cuttlefish brod
dep_brod = git https://github.com/klarna/brod.git 3.4.0





二、工程文件修改

1、进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/Makefile 文件


Makefile 增加 

DEPS += emq_plugin_kafka_brod

2、relx.config 中 release 段落添加这两行: 

 brod,
 {emq_plugin_kafka_brod, load}



3、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/data/loaded_plugins设置自启动插件
emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka_brod.




4、编译过程会报错:
DEP brod make[2]: Entering directory
`/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod' ../../erlang.mk:1260:
warning: overriding recipe for target
`/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/docopt' ../../erlang.mk:1235:
warning: ignoring old recipe for target
`/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/docopt' ../../erlang.mk:1260:
warning: overriding recipe for target
`/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/jsone' ../../erlang.mk:1235:
warning: ignoring old recipe for target
`/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/jsone' DEP supervisor3 Error:
Unknown or invalid dependency: supervisor3. make[2]: ***
[/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/supervisor3] Error 78 make[2]:
Leaving directory `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod' make[1]:
*** [deps] Error 2 make[1]: Leaving directory
`/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_brod' make: ***
[deps] Error 2
解决方法如下:

/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod


Makefile文件,第12和13行
dep_supervisor3_commit = 1.1.5
dep_kafka_protocol_commit = 1.1.2
修改为:
dep_supervisor3 = git https://github.com/klarna/supervisor3.git 1.1.5


dep_kafka_protocol = git https://github.com/klarna/kafka_protocol.git 1.1.2




三、源码实现(完整的源码下载地址:https://download.csdn.net/download/libaineu2004/10284403
<https://download.csdn.net/download/libaineu2004/10284403>)


1、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_brod.config
[ {emq_plugin_kafka_brod, [ {kafka, [ { bootstrap_broker, [{"127.0.0.1",
9092}] }, %%能使用"localhost" { query_api_versions, false }, {
reconnect_cool_down_seconds, 10} ]} ]} ].
kafka broker集群时,建议不使用localhost和127.0.0.1。而是使用真实ip。可以多个ip,例如:

[{"172.16.6.170", 9092},{"172.16.6.170", 9093},{"172.16.6.170", 9094}]






2、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/src/emq_plugin_kafka_brod.erl

%%-------------------------------------------------------------------- %%
Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed
under the Apache License, Version 2.0 (the "License"); %% you may not use this
file except in compliance with the License. %% You may obtain a copy of the
License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless
required by applicable law or agreed to in writing, software %% distributed
under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. %% See the License for the
specific language governing permissions and %% limitations under the License.
%%--------------------------------------------------------------------
-module(emq_plugin_kafka_brod). -include_lib("emqttd/include/emqttd.hrl").
-include_lib("brod/include/brod_int.hrl"). -define(TEST_TOPIC, <<"emqtest">>).
-export([load/1, unload/0]). %% Hooks functions -export([on_client_connected/3,
on_client_disconnected/3]). -export([on_client_subscribe/4,
on_client_unsubscribe/4]). -export([on_session_created/3,
on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]). %%
Called when the plugin application start load(Env) -> brod_init([Env]),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3,
[Env]), emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4,
[Env]), emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4,
[Env]), emqttd:hook('session.created', fun ?MODULE:on_session_created/3,
[Env]), emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4,
[Env]), emqttd:hook('session.unsubscribed', fun
?MODULE:on_session_unsubscribed/4, [Env]), emqttd:hook('session.terminated',
fun ?MODULE:on_session_terminated/4, [Env]), emqttd:hook('message.publish', fun
?MODULE:on_message_publish/2, [Env]), emqttd:hook('message.delivered', fun
?MODULE:on_message_delivered/4, [Env]), emqttd:hook('message.acked', fun
?MODULE:on_message_acked/4, [Env]). on_client_connected(ConnAck, Client =
#mqtt_client{client_id = ClientId}, _Env) -> io:format("client ~s connected,
connack: ~w~n", [ClientId, ConnAck]), Json = mochijson2:encode([ {type,
<<"connected">>}, {client_id, ClientId}, {cluster_node, node()}, {ts,
emqttd_time:now_ms()} ]), %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC,
0, <<"mykey_1">>, list_to_binary(Json)), {ok, CallRef} =
brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>,
list_to_binary(Json)), receive #brod_produce_reply{ call_ref = CallRef , result
= brod_produce_req_acked } -> io:format("brod_produce_reply:ok ~n"), ok after
5000 -> io:format("brod_produce_reply:exit ~n"), erlang:exit(timeout)
%%ct:fail({?MODULE, ?LINE, timeout}) end, {ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId},
_Env) -> io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
Json = mochijson2:encode([ {type, <<"disconnected">>}, {client_id, ClientId},
{reason, Reason}, {cluster_node, node()}, {ts, emqttd_time:now_ms()} ]), %%ok =
brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>,
list_to_binary(Json)), {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC,
0, <<"mykey_2">>, list_to_binary(Json)), receive #brod_produce_reply{ call_ref
= CallRef , result = brod_produce_req_acked } -> ok after 5000 ->
ct:fail({?MODULE, ?LINE, timeout}) end, ok. on_client_subscribe(ClientId,
Username, TopicTable, _Env) -> io:format("client(~s/~s) will subscribe: ~p~n",
[Username, ClientId, TopicTable]), {ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}. on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic,
Opts}]), {ok, {Topic, Opts}}. on_session_unsubscribed(ClientId, Username,
{Topic, Opts}, _Env) -> io:format("session(~s/~s) unsubscribed: ~p~n",
[Username, ClientId, {Topic, Opts}]), ok. on_session_terminated(ClientId,
Username, Reason, _Env) -> io:format("session(~s/~s) terminated: ~p.",
[ClientId, Username, Reason]). %% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>},
_Env) -> {ok, Message}; on_message_publish(Message, _Env) -> io:format("publish
~s~n", [emqttd_message:format(Message)]), Id = Message#mqtt_message.id, From =
Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的 Topic =
Message#mqtt_message.topic, Payload = Message#mqtt_message.payload, Qos =
Message#mqtt_message.qos, Dup = Message#mqtt_message.dup, Retain =
Message#mqtt_message.retain, Timestamp = Message#mqtt_message.timestamp,
ClientId = c(From), Username = u(From), Json = mochijson2:encode([ {type,
<<"publish">>}, {client_id, ClientId}, {message, [ {username, Username},
{topic, Topic}, {payload, Payload}, {qos, i(Qos)}, {dup, i(Dup)}, {retain,
i(Retain)} ]}, {cluster_node, node()}, {ts, emqttd_time:now_ms()} ]), %%ok =
brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>,
list_to_binary(Json)), {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC,
0, <<"mykey_3">>, list_to_binary(Json)), receive #brod_produce_reply{ call_ref
= CallRef , result = brod_produce_req_acked } -> ok after 5000 ->
ct:fail({?MODULE, ?LINE, timeout}) end, {ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) -> io:format("delivered
to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}. on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId,
emqttd_message:format(Message)]), {ok, Message}. %% Called when the plugin
application stop unload() -> %%application:stop(brod),
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4). %%
=================================================================== %%
brod_init https://github.com/klarna/brod %%
===================================================================
brod_init(_Env) -> {ok, _} = application:ensure_all_started(brod), {ok, Kafka}
= application:get_env(?MODULE, kafka), KafkaBootstrapEndpoints =
proplists:get_value(bootstrap_broker, Kafka), %%KafkaBootstrapEndpoints =
[{"127.0.0.1", 9092}], %%localhost,172.16.6.161 %%KafkaBootstrapEndpoints =
[{"localhost", 9092}], %%localhost,172.16.6.161 %%ClientConfig =
[{reconnect_cool_down_seconds, 10}],%% socket error recovery ClientConfig =
[],%% socket error recovery Topic = ?TEST_TOPIC, Partition = 0, ok =
brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig), ok =
brod:start_producer(brod_client_1, Topic, _ProducerConfig = []), %%ok =
brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
%%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>,
<<"value2">>), io:format("Init ekaf with ~p~n", [KafkaBootstrapEndpoints]).
i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. c({ClientId,
Username}) -> ClientId; c(From) -> From. u({ClientId, Username}) -> Username;
u(From) -> From.
请注意:

(1)在运行brod:start_producer语句之前,务必保证Topic在kafka已经建立完毕,否则运行该语句会抛出异常。

Topic = ?TEST_TOPIC,


ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),

(2)关于brod:start_client,也可以这样使用clientid:

-define(CLIENT_ID, ?MODULE).
ClientId       = ?CLIENT_ID,


%%单机版BootstrapHosts = [{"localhost", 9092}],

BootstrapHosts = [{"172.16.6.170", 9092},{"172.16.6.170",
9093},{"172.16.6.170", 9094}],%%集群版

ClientConfig   = [],
ok = brod:start_client(BootstrapHosts, ClientId, ClientConfig),




3、kafka主题创建

注意,

(1)kafka broker和zookeeper使用集群时,必须手动先创建topic,指定zookeeper节点列表;单机也应该要创建。否则客户端会报错。

(2)主题名不建议使用"."和"_"字符。

(3)创建主题

./bin/kafka-topics.sh --create --zookeeper
172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3
--partitions 3 --topic emqtest

(4)消费者

./bin/kafka-console-consumer.sh --zookeeper 172.16.6.170:2181 --topic emqtest
--from-beginning




4、上面举例的都是分区写成固定了,Partition = 0,那么客户端如何自定义分区?由于erlang
brod库没有实现自动分区,所以需要我们手动计算hash值。
-define(NUM_PARTITIONS, 3). -define(EMPTY(S), (S == <<"">> orelse S ==
undefined)). getPartition(ClientId) when ?EMPTY(ClientId) ->
crypto:rand_uniform(0, ?NUM_PARTITIONS); getPartition(ClientId) -> <<NodeD01,
NodeD02, NodeD03, NodeD04, NodeD05, NodeD06, NodeD07, NodeD08, NodeD09,
NodeD10, NodeD11, NodeD12, NodeD13, NodeD14, NodeD15, NodeD16>> = Val =
crypto:hash(md5, ClientId),%%md5的值是16byte io:format("Value is ~w~n", [Val]),
NodeD16 rem ?NUM_PARTITIONS.%%取余数 具体实现举例: ClientId = <<"123456">>, Partition =
getPartition(ClientId), brod:produce(?KAFKA_CLIENT1, ?EMQ_TOPIC_ONLINE,
Partition, ClientId, list_to_binary(Json)).
我这个方案(取clientId的md5值,然后求余)是自定义的简易版,不是原生的java
hashcode源码的实现方式。原生的要查看java.lang.String的hashcode函数源码。




附上kafka的官方分区实现方式:

消息-分区的分配
默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:
def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
}
  这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?

if(key == null) {  // 如果没有指定key
        val id = sendPartitionPerTopicCache.get(topic)  //
先看看Kafka有没有缓存的现成的分区Id
        id match {
          case Some(partitionId) =>  
            partitionId  // 如果有的话直接使用这个分区Id就好了
          case None => // 如果没有的话,
            val availablePartitions =
topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) 
//找出所有可用分区的leader所在的broker
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any
partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size 
// 从中随机挑一个
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
            partitionId
        }
      }

  可以看出,Kafka几乎就是随机找一个分区发送无key的消息
,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)



完整的源码下载地址:https://download.csdn.net/download/libaineu2004/10284403
<https://download.csdn.net/download/libaineu2004/10284403>