Apache Flink 流处理实例

发布时间:2018-05-09 02:07  浏览次数:55



维基百科在 IRC 频道上记录 Wiki 被修改的日志,我们可以通过监听这个 IRC 频道,来实时监控给定时间窗口内的修改事件。Apache Flink
作为流计算引擎,非常适合处理流数据,并且,类似于 Hadoop MapReduce 等框架,Flink
提供了非常良好的抽象,使得业务逻辑代码编写非常简单。我们通过这个简单的例子来感受一下 Flink 的程序的编写。

通过 Flink Quickstart 构建 Maven 工程

Flink 提供了 flink-quickstart-java 和 flink-quickstart-scala 插件,允许使用 Maven
的开发者创建统一的项目模版,应用项目模板可以规避掉很多部署上的坑。

构建这次工程的命令如下
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeCatalog=https:
//repository.apache.org/content/repositories/snapshots/ \ -DarchetypeVersion=1.6
-SNAPSHOT \ -DgroupId=wiki-edits \ -DartifactId=wiki-edits \ -Dversion=0.1 \
-Dpackage=wikiedits \ -DinteractiveMode=false
注意高版本的 Maven 不支持 -DarchetypeCatalog 参数,可以将第一行改为  mvn
org.apache.maven.plugins:maven-archetype-plugin:2.4::generate \ 或者去掉 
-DarchetypeCatalog 行,并将 .m2/settings.xml 修改如下,其中主要是在 
//profiles/profile/repositories 下设置好搜索 archetype 的仓库地址
<settings xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/settings-1.0.0.xsd"> <profiles> <profile> <id>acme</
id> <repositories> <repository> <id>archetype</id> <name>Apache Development
Snapshot Repository</name> <url>
https://repository.apache.org/content/repositories/snapshots/</url> <releases> <
enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </
snapshots> </repository> </repositories> </profile> </profiles> <activeProfiles>
<activeProfile>acme</activeProfile> </activeProfiles> </settings>
成功下载项目模板后,在当前目录下应当能看到 wiki-edit 目录。执行命令 rm
wiki-edits/src/main/java/wikiedits/*.java 清除模板自带的 Java 文件。

为了监听维基百科的 IRC 频道,在 pom.xml 文件下添加如下依赖,分别是 Flink 的客户端和 WikiEdit 的连接器
<dependency> <groupId>org.apache.flink</groupId> <artifactId>
flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</
version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <
artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId> <
version>${flink.version}</version> </dependency>
编写 Flink 程序

接下来的代码编写工作假定你是在 IDE 下编写的,主要是为了避免啰嗦的 import 语句。包含 import 等模板代码的全部代码在末尾给出。

首先我们创建用于运行的主程序代码 src/main/java/wikiedits/WikipediaAnalysis.java
package wikiedits; public class WikipediaAnalysis { public static void main
(String[] args) throws Exception { } }
流处理的 Flink 程序的第一步是创建流处理执行上下文 StreamExecutionEnvironment,它类似于其他框架内的
Configuration 类,用于配制 Flink 程序和运行时的各个参数,对应的语句如下
StreamExecutionEnvironment see = StreamExecutionEnvironment.
getExecutionEnvironment();
下一步我们以维基百科 IRC 频道的日志作为数据源创建连接
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource
());
这个语句创建了填充 WikipediaEditEvent 的 DataStream,拿到数据流之后我们就可以对它做进一步的操作了。

我们的目标是统计给定时间窗口内,比如说五秒内,用户对维基百科的修改字节数。因此我们对每个 WikipediaEditEvent
 以用户名作为键来标记(keyed)。Flink 兼容 Java 1.6 版本,因此古老的版本中 Flink 提供 KeySelector 函数式接口来标记
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits .keyBy(new
KeySelector<WikipediaEditEvent, String>() {@Override public String getKey
(WikipediaEditEvent event) { return event.getUser(); } });
当前版本的 Flink 主要支持的是 Java 8 版本,因此我们也可以用 Lambda 表达式来改写这段较为繁琐的代码
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits .keyBy
(WikipediaEditEvent::getUser);
这个语句定义了 keyedEdits 变量,它是一个概念上形如(String, WikipediaEditEvent) 的数据流,即以字符串(用户名)为键,
WikipediaEditEvent 为值的数据的流。这一步骤类似于 MapReduce 的 Shuffle 过程,针对 keyedEdits
 的处理将自动按照键分组,因此我们可以直接对数据进行 fold 操作以折叠聚合同一用户名的修改字节数
DataStream<Tuple2<String, Long>> result = keyedEdits .timeWindow(Time.seconds(5
)) .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent,
Tuple2<String, Long>>() {@Override public Tuple2<String, Long> fold
(Tuple2<String, Long> acc, WikipediaEditEvent event) { acc.f0 = event.getUser
(); acc.f1 += event.getByteDiff(); return acc; } });
在新版的 Flink 中,FoldFunction 因为无法支持部分聚合被废弃了,如果对程序有强迫症,我们可以采用类似于 MapReduce
的办法来改写上边的代码,各个方法调用的作用与它们的名字一致,其中,为了绕过类型擦除导致的问题使用了 returns 函数
DataStream<Tuple2<String, Long>> result = keyedEdits .map((event) -> new
Tuple2<>(event.getUser(), Long.valueOf(event.getByteDiff()))) .returns(new
TypeHint<Tuple2<String, Long>>(){}) .timeWindowAll(Time.seconds(5)) .reduce
((acc, a) ->new Tuple2<>(a.f0, acc.f1+a.f1));
经过处理后的数据流 result 中就包含了我们所需要的信息,具体地说是填充了 Tuple2<String, Long>
,即(用户名,修改字节数)元组的流,我们可以使用 result.print() 来打印它。

程序至此主要处理逻辑就写完了,但是 Flink 还需要在 StreamExecutionEnvironment 类型的变量上调用 execute
 方法以实际执行整个 Flink 程序,该方法执行时将整个 Flink 程序转化为任务图并提交到 Flink 集群中。

整个程序的代码,包括模板代码,如下所示
package wikiedits; import org.apache.flink.api.common.typeinfo.TypeHint; import
org.apache.flink.streaming.api.datastream.DataStream; import
org.apache.flink.streaming.api.datastream.KeyedStream; import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import
org.apache.flink.streaming.api.windowing.time.Time; import
org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; import
org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; import
org.apache.flink.api.java.tuple.Tuple2; public class WikipediaAnalysis { public
static void main(String[] args) throws Exception { StreamExecutionEnvironment
see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource
()); KeyedStream<WikipediaEditEvent, String> keyedEdits = edits .keyBy
(WikipediaEditEvent::getUser); DataStream<Tuple2<String, Long>> result =
keyedEdits .map((event) -> new Tuple2<>(event.getUser(), Long.valueOf(event.
getByteDiff()))) .returns(new TypeHint<Tuple2<String, Long>>(){}) .timeWindowAll
(Time.seconds(5)) .reduce((acc, a) -> new Tuple2<>(a.f0, acc.f1+a.f1)); result.
print(); see.execute(); } }
可以通过 IDE 运行程序,在控制台看到类似下面格式的输出,每一行前面的数字代表了这是由 print 的并行实例中的编号为几的实例运行的结果
1> (LilHelpa,1966) 2> (1.70.80.5,2066) 3> (Beyond My Ken,-6550) 4> (Aleksandr
Grigoryev,725) 1> (6.77.155.31,1943) 2> (Serols,1639) 3> (ClueBot NG,1907) 4> (
GSS,3155)http://www.bfc2814.cn/
http://news.vja2045.cn/
http://news.rlp0976.cn/
http://www.cce2139.cn/
http://news.xtl7183.cn/
http://www.kog1435.cn/
http://www.rik3314.cn/
http://news.zmj4226.cn/
http://news.igf1400.cn/
http://www.bcu6005.cn/
http://news.esz4596.cn/
http://news.dli5822.cn/
http://www.vsb9575.cn/
http://news.dto7731.cn/
http://news.eqn5017.cn/
http://news.lkg4662.cn/
http://www.xrr9518.cn/
http://www.qfk7654.cn/
http://www.txr3194.cn/
http://news.nkc4539.cn/
http://www.xjy3902.cn/
http://www.ciw9360.cn/
http://news.pzx0011.cn/
http://www.eqv5313.cn/
http://www.qdn5355.cn/
http://www.qri2046.cn/
http://www.wxm6819.cn/
http://news.bpj4889.cn/
http://www.blm3653.cn/
http://news.ixa0880.cn/
http://news.tvz4241.cn/
http://www.nkc4539.cn/
http://www.ehe5445.top/
http://www.axs9870.cn/
http://news.hiv8337.cn/
http://news.wvh4263.cn/
http://news.myy9223.cn/
http://www.hud3144.cn/
http://www.kdz0246.cn/
http://news.pck8038.cn/
http://news.mtr5072.cn/
http://news.mzj8672.cn/
http://www.ixa0880.cn/
http://www.gdk7028.cn/
http://news.blm3653.cn/
http://www.yif9712.cn/
http://news.sfx6922.cn/
http://news.ric5056.cn/
http://www.vii0197.cn/
http://www.ybc8953.cn/
http://news.nwf3326.cn/
http://news.nlc4773.cn/
http://news.uqt9445.cn/
http://news.zbb7727.cn/
http://news.ajj5951.cn/
http://news.arf0717.cn/
http://news.cqu4082.cn/
http://news.dsx1888.cn/
<http://www.cnblogs.com/wander4096/category/1196118.html>

标签

归档

阅读排行

支付宝搜索“559315787”,天天领红包