<>全局有序?

业务: 1 2 3 5
消费: 1 5 2 3

1.一个topic 一个分区 3个 虽然保证全局有序,但是性能下降 生产(很多公司也在使用,或则没有吧太在意)
2.单分区有序,那么我们想方法把同一个特征数据写到一个分区

p0 p1 p2

id money
业务系统:
insert into t values(1,1)
update t set age= 200 where id =1
update t set age= 400 where id =1
update t set age= 1000000 where id =1
delete from t where id =1
最终的结果的是0条
如何把特征的数据写一个分区里面
producer send api (key,value)
key: erp.t.1 null
value:SQL数据
业务系统: 特征值
insert into t values(1,1) erp.t.1 hash 5 %3=1…2 -->p2
update t set age= 200 where id =1 erp.t.1
update t set age= 400 where id =1 erp.t.1
update t set age= 1000000 where id =1 erp.t.1
delete from t where id =1 erp.t.1
源码读取:
https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

<https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java>
这5条记录都发送到一个分区 有序的发送 ,那么消费时也不会出现紊乱
/* * Licensed to the Apache Software Foundation (ASF) under one or more *
contributor license agreements. See the NOTICE file distributed with * this
work for additional information regarding copyright ownership. * The ASF
licenses this file to You under the Apache License, Version 2.0 * (the
"License"); you may not use this file except in compliance with * the License.
You may obtain a copy of the License at * *
http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable
law or agreed to in writing, software * distributed under the License is
distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. * See the License for the specific language
governing permissions and * limitations under the License. */ package
org.apache.kafka.clients.producer.internals; import java.util.List; import
java.util.Map; import java.util.concurrent.ConcurrentHashMap; import
java.util.concurrent.ConcurrentMap; import
java.util.concurrent.ThreadLocalRandom; import
java.util.concurrent.atomic.AtomicInteger; import
org.apache.kafka.clients.producer.Partitioner; import
org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils; /** * The default partitioning
strategy: * <ul> * <li>If a partition is specified in the record, use it *
<li>If no partition is specified but a key is present choose a partition based
on a hash of the key * <li>If no partition or key is present choose a partition
in a round-robin fashion */ public class DefaultPartitioner implements
Partitioner { private final ConcurrentMap<String, AtomicInteger>
topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String,
?> configs) {} /** * Compute the partition for the given record. * * @param
topic The topic name * @param key The key to partition on (or null if no key) *
@param keyBytes serialized key to partition on (or null if no key) * @param
value The value to partition on or null * @param valueBytes serialized value to
partition on or null * @param cluster The current cluster metadata */ public
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster) { List<PartitionInfo> partitions =
cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if
(keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo>
availablePartitions = cluster.availablePartitionsForTopic(topic); if
(availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) %
availablePartitions.size(); return availablePartitions.get(part).partition(); }
else { // no partitions are available, give a non-available partition return
Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to
choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) %
numPartitions; } } private int nextValue(String topic) { AtomicInteger counter
= topicCounterMap.get(topic); if (null == counter) { counter = new
AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger
currentCounter = topicCounterMap.putIfAbsent(topic, counter); if
(currentCounter != null) { counter = currentCounter; } } return
counter.getAndIncrement(); } public void close() {} }
<>重点:

只要把同一个表的同一个主键的数据发到同一个分区即可(如果多数据库得加入数据库名)

分区定义如下:
private int partitionDefine(String keyToPartition) { if (keyToPartition ==
null) { return new Random().nextInt(numPartitions); } else { return
Math.abs(keyToPartition.hashCode()) % numPartitions; } }
传入的参数 tableName+主键
这样,消费到的数据就是有序的。不同的场景灵活运用即可。

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信