最近很多人问我,sparkstreaming怎么消费多个topic的数据,自己维护offest,其实这个跟消费一个topic是一样的,但还是有很多问我,今天就简单的写一个demo,供大家参考,直接上代码吧,已经测试过了.我把offest存到redis里了,当然也可以保存在zk,kafka,mysql,hbase中都可以,看自己的选择.(
用了3个topic,每个topic5个partition.)
package spark import java.io.File import kafka.{PropertiesScalaUtils,
RedisKeysListUtils} import kafka.streamingRedisHive.{dbIndex} import
org.apache.kafka.clients.consumer.ConsumerRecord import
org.apache.kafka.common.serialization.StringDeserializer import
org.apache.log4j.{Level, Logger} import org.apache.spark.TaskContext import
org.apache.spark.sql.SparkSession import
org.apache.spark.streaming.dstream.InputDStream import
org.apache.spark.streaming.{Seconds, StreamingContext} import
org.apache.spark.streaming.kafka010._ import redis.RedisPool object moreTopic {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO) val
warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath
val spark = SparkSession.builder().appName("Spark
Jason").config("spark.sql.warehouse.dir",
warehouseLocation).enableHiveSupport().getOrCreate()
spark.conf.set("spark.streaming.concurrentJobs", 10)
spark.conf.set("spark.streaming.kafka.maxRetries", 50)
spark.conf.set("spark.streaming.stopGracefullyOnShutdown",true)
spark.conf.set("spark.streaming.backpressure.enabled",true)
spark.conf.set("spark.streaming.backpressure.initialRate",5000)
spark.conf.set("spark.streaming.kafka.maxRatePerPartition", 3000) @transient
val sc = spark.sparkContext val scc = new StreamingContext(sc, Seconds(2)) val
kafkaParams = Map[String, Object]( "auto.offset.reset" -> "latest",
"value.deserializer" -> classOf[StringDeserializer] , "key.deserializer" ->
classOf[StringDeserializer] , "bootstrap.servers" ->
PropertiesScalaUtils.loadProperties("broker") , "group.id" ->
PropertiesScalaUtils.loadProperties("groupId") , "enable.auto.commit" ->
(false: java.lang.Boolean) ) var stream: InputDStream[ConsumerRecord[String,
String]] = null val topics = Array("jason_20180519", "jason_0606","jason_test")
val maxTotal = 200 val maxIdle = 100 val minIdle = 10 val testOnBorrow = false
val testOnReturn = false val maxWaitMillis = 5000
RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"),
PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle,
minIdle, testOnBorrow, testOnReturn, maxWaitMillis) val jedis =
RedisPool.getPool.getResource jedis.select(dbIndex) val keys =
jedis.keys(topics(0) + "*") val keys_2 = jedis.keys(topics(1) +"*") val keys_3
= jedis.keys(topics(2) +"*") if(keys.size() == 0 && keys_2.size() == 0 &&
keys_3.size() == 0){
println("第一次启动,从头开始消费数据-----------------------------------------------------------")
stream = KafkaUtils.createDirectStream[String, String]( scc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
String](topics, kafkaParams) ) }else{
println("不是第一次启动,从上次的offest开始消费数据-----------------------------------------------")
stream = KafkaUtils.createDirectStream[String, String]( scc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
String](topics, kafkaParams, RedisKeysListUtils.getRedisOffest(topics,jedis)))
} jedis.close() stream.foreachRDD(rdd=>{ if (!rdd.isEmpty()) { val offsetRanges
= rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(partition=>{ val o =
offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition}
${o.fromOffset} ${o.untilOffset}") val jedis_jason =
RedisPool.getPool.getResource jedis_jason.select(dbIndex)
partition.foreach(pair=>{ //自己的计算逻辑; }) offsetRanges.foreach { offsetRange =>
println("partition : " + offsetRange.partition + " fromOffset: " +
offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset) val
topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition
jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "") }
jedis_jason.close() }) } }) scc.start() scc.awaitTermination() } }
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢


友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信