Kafka是基于发布/订阅的消息系统,是一个分布式流平台。支持多个生产者和多个消费者,作为消息中间件,Kafka解耦了生产者和消费者,彼此透明,不需要在他们之间建立任何直接的连接,数据从生产者流入Kafka再从Kafka流入消费者。Spark
Streaming是Spark核心API的一个扩展,提供高性能高容错性的流数据处理能力。





支持包括Kafka,Flume,HDFS/S3,Kinesis,Twitter,TCPSockets等作为数据源。数据经过处理后可以存入HDFS,数据库或展示到仪表盘上。SparkStreaming还可以合影包括SparkMLlib,SparkGraphx等Spark等其它组件无缝结合,得益于Spark的各个组件都是基于同样的Spark
Core。


下面是一个简单的示例,利用sparkStream处理kafka中的数据。例子中部署的是kafka0.8.x的版本。例子是通过Direct
Approach(No Receivers)的方式来融合Spark
Streaming和Kafka。没有使用Receiver-Based的方式。后续再整理一下两种融合方式的区别和优劣。Spark融合Kafka0.10与0.8点Direct
Approach方式类似。本文主要是记录一个示例,一些细节和理论部分留着以后再整理吧。


一、配置


 JDK1.8kafka_2.11-0.8.2.0Spark-2.3.0-bin-hadoop2.7 
二、示例代码
import kafka.serializer.StringDecoder import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka.KafkaUtils import
org.apache.spark.streaming.{Seconds, StreamingContext} /* * spark消费kafka例子 * *
2018/5/13 */ object SparkKafkaTest { def main(args: Array[String]): Unit = {
val spark =
SparkSession.builder().appName("spark_kafka").master("local[*]").getOrCreate()
val batchDuration = Seconds(5) //时间单位为秒 val streamContext = new
StreamingContext(spark.sparkContext, batchDuration)
streamContext.checkpoint("/Users/eric/SparkWorkspace/checkpoint") val topics =
Array("behaviors").toSet val kafkaParams = Map[String,
String]("metadata.broker.list" -> "localhost:9092") val stream =
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](streamContext, kafkaParams, topics) stream.foreachRDD(rdd => {
rdd.foreach(line => { println("key=" + line._1 + " value=" + line._2) }) })
streamContext.start() //spark stream系统启动 streamContext.awaitTermination() // } }
三、pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.eric.spark</groupId> <artifactId>spark-learning</artifactId>
<version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear>
<properties> <scala.version>2.11.8</scala.version>
<spark.version>2.3</spark.version> <spark.artifact>2.11</spark.artifact>
<dependency.scope>compile</dependency.scope> </properties> <repositories>
<repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2
Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository>
</repositories> <pluginRepositories> <pluginRepository>
<id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url> </pluginRepository>
</pluginRepositories> <dependencies> <dependency> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
<dependency> <groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId> <version>${scala.version}</version>
</dependency> <!--
https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency>
<groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency>
<groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency>
<groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId>
<version>2.3.0</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
<dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId> <version>2.3.0</version>
</dependency> <!--
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8
--> <dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.0</version> </dependency> <dependency> <groupId>junit</groupId>
<artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope>
</dependency> <dependency> <groupId>org.specs</groupId>
<artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope>
</dependency> </dependencies> <build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin>
<groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId>
<executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal>
</goals> </execution> </executions> <configuration>
<scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg>
</args> </configuration> </plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId> <configuration>
<downloadSources>true</downloadSources> <buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands> <additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures> <classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers> </configuration> </plugin> </plugins> </build>
<reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId> <configuration>
<scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin>
</plugins> </reporting> </project>
四、创建一个topic




五、运行测试代码并向自定义topic输入消息,查看打印结果







对通过kafka-console-producer写入behaviors主题的消息进行简单的打印。

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信