最近在学习spark streaming 相关知识,现在总结一下

主要代码如下
def createStreamingContext():StreamingContext ={ val sparkConf = new
SparkConf().setAppName("myStreamingText").setMaster(ConfigInfo.MasterConfig)
sparkConf.set("spark.streaming.kafka.maxRetries", "100")
sparkConf.set("spark.streaming.kafka.maxRatePerParititon", "1000") val ssc =
new StreamingContext(sparkConf,Seconds(ConfigInfo.durationConfig))
ssc.checkpoint(ConfigInfo.checkpointConfig) ssc } def createKafkaDstream(ssc :
StreamingContext, kafkaParams : Map[String,String], topics : Set[String] ):
InputDStream[(String,String)] = { val kafkaDstream =
KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc,
kafkaParams, topics) kafkaDstream } val ssc =
StreamingContext.getOrCreate(ConfigInfo.checkpointConfig,
createStreamingContext) val kafkaParams=
Map[String,String]("metadata.brokers.list" -> ConfigInfo.brokerListConfig,
"group.id" -> ConfigInfo.groupIdConfig) val topics =
Set[String](ConfigInfo.groupIdConfig) val dStream = createKafkaDstream(ssc,
kafkaParams, topics) def saveToHDFS(rdd : RDD[String]) : Unit ={ def
convertData(line:String) = { val key = SparkUtil.fetchKey(line) (new Text(key),
new Text(line)) } val dataToSaveHDFS : RDD[Tuple2[Text,Text]] = rdd.map( line
=>{ convertData(line) }) val hadoopConf = rdd.context.hadoopConfiguration
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec",
"org.apache.hadoop.io.compress.SnappyCodec")
hadoopConf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK")
dataToSaveHDFS.saveAsNewAPIHadoopFile(ConfigInfo.saveHdfsPathConfig,
classOf[Text], classOf[Text], classOf[StreamingDataOutputFormat[Text,Text]]) }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信