目录

基本API概念
<https://blog.csdn.net/Mathieu66/article/details/82984770#%E5%9F%BA%E6%9C%ACAPI%E6%A6%82%E5%BF%B5>

1. Dataset和DataStream
<https://blog.csdn.net/Mathieu66/article/details/82984770#1.%20Dataset%E5%92%8CDataStream>

2. Flink程序构成(Anatomy of a Flink Program)
<https://blog.csdn.net/Mathieu66/article/details/82984770#2.%20Flink%E7%A8%8B%E5%BA%8F%E6%9E%84%E6%88%90%EF%BC%88Anatomy%20of%20a%20Flink%20Program%EF%BC%89>

3. 延迟计算(Lazy Evaluation)
<https://blog.csdn.net/Mathieu66/article/details/82984770#3.%20%E5%BB%B6%E8%BF%9F%E8%AE%A1%E7%AE%97%EF%BC%88Lazy%20Evaluation%EF%BC%89>

4. 指定键(Specifying Keys)
<https://blog.csdn.net/Mathieu66/article/details/82984770#4.%20%E6%8C%87%E5%AE%9A%E9%94%AE%EF%BC%88Specifying%20Keys%EF%BC%89>

4.1 为Tuple定义键(Define keys for Tuples)
<https://blog.csdn.net/Mathieu66/article/details/82984770#4.1%20%E4%B8%BATuple%E5%AE%9A%E4%B9%89%E9%94%AE(Define%20keys%20for%20Tuples)>

4.2 使用字段表达式定义键(Define keys using Field Expressions)
<https://blog.csdn.net/Mathieu66/article/details/82984770#4.2%20%E4%BD%BF%E7%94%A8%E5%AD%97%E6%AE%B5%E8%A1%A8%E8%BE%BE%E5%BC%8F%E5%AE%9A%E4%B9%89%E9%94%AE%EF%BC%88Define%20keys%20using%20Field%20Expressions%EF%BC%89>

4.3 使用键选择器函数定义键(Define keys using Key Selector Functions)
<https://blog.csdn.net/Mathieu66/article/details/82984770#4.3%20%E4%BD%BF%E7%94%A8%E9%94%AE%E9%80%89%E6%8B%A9%E5%99%A8%E5%87%BD%E6%95%B0%E5%AE%9A%E4%B9%89%E9%94%AE%EF%BC%88Define%20keys%20using%20Key%20Selector%20Functions%EF%BC%89>

5. 指定转换函数(Specifying Transformation Functions)
<https://blog.csdn.net/Mathieu66/article/details/82984770#5.%20%E6%8C%87%E5%AE%9A%E8%BD%AC%E6%8D%A2%E5%87%BD%E6%95%B0%EF%BC%88Specifying%20Transformation%20Functions%EF%BC%89>

5.1 Lambda函数(Lambda Functions)
<https://blog.csdn.net/Mathieu66/article/details/82984770#5.1%20Lambda%E5%87%BD%E6%95%B0%EF%BC%88Lambda%20Functions%EF%BC%89>

5.2 Rich Functions
<https://blog.csdn.net/Mathieu66/article/details/82984770#5.2%20Rich%20Functions>

6. 支持的数据类型(Supported Data Types)
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.%20%E6%94%AF%E6%8C%81%E7%9A%84%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%EF%BC%88Supported%20Data%20Types%EF%BC%89>

6.1 Tuples 和 Case 类
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.1%20Tuples%20%E5%92%8C%20Case%20%E7%B1%BB>

6.2 POJOs
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.2%20POJOs>

6.3 基本类型
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.3%20%E5%9F%BA%E6%9C%AC%E7%B1%BB%E5%9E%8B>

6.4 一般通用类
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.4%20%E4%B8%80%E8%88%AC%E9%80%9A%E7%94%A8%E7%B1%BB>

6.5 值
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.5%20%E5%80%BC>

6.6 Hadoop Writables
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.6%20Hadoop%20Writables>

6.7 特殊类型
<https://blog.csdn.net/Mathieu66/article/details/82984770#6.7%20%E7%89%B9%E6%AE%8A%E7%B1%BB%E5%9E%8B>

7. 累加器和计数器(Accumulators & Counters)
<https://blog.csdn.net/Mathieu66/article/details/82984770#7.%20%E7%B4%AF%E5%8A%A0%E5%99%A8%E5%92%8C%E8%AE%A1%E6%95%B0%E5%99%A8%EF%BC%88Accumulators%20%26%20Counters%EF%BC%89>

7.1 怎样使用累加器
<https://blog.csdn.net/Mathieu66/article/details/82984770#7.1%20%E6%80%8E%E6%A0%B7%E4%BD%BF%E7%94%A8%E7%B4%AF%E5%8A%A0%E5%99%A8>

7.2 自定义累加器
<https://blog.csdn.net/Mathieu66/article/details/82984770#7.2%20%E8%87%AA%E5%AE%9A%E4%B9%89%E7%B4%AF%E5%8A%A0%E5%99%A8>

基本API概念

Flink 是实现了分布式集合转换操作(例如 filtering,mapping,updating
state,joining,grouping,defining windows,aggregating等)的一般程序。集合最开始从sources(例如,从
files,kafka
topics,或本地,内存中集合读取)创建。结果通过sinks返回,比如将数据写出到(分布式)文件,或标准输出(例如,终端的命令行)。Flink
程序可以在多种环境下运行:单节点,或嵌入到其他程序中。计算可以在本地JVM中进行,也可以在多台机器组成的集群上进行。

取决于数据源类型,例如有界或者无界数据源,您可以编写一个批处理或流处理程序。其中,批处理程序用DataSet API,流处理程序用DataStream
API。本篇指南将会介绍这两种API的通用概念,但是您可以查看Streaming Guide和Batch Guide来查看这两种API的详细信息。

注意:当展示API用法的实例时,我们将使用StreamingExecutionEnvironment和 DataStream
API。二者的概念是一样的,使用DataSet时只需要 替换 ExecutionEnvironment和 DataSet。

1. Dataset和DataStream


在程序中,Flink使用特殊的类DataSet和DataStream来表示数据。你可以把他们当做是可以包含副本的不可变数据集。DataSet数据是有限的,而DataStream中元素数量可以是无限个的。


在一些方面,这些数据集和常规的Java集合是不同的。首先,他们是不可变的,这意味着集合一旦创建,其中的元素就不可以在添加或者移除。甚至简单的检查里面的元素都不可以。

数据集最开始通过向Flink程序添加source来创建,而新的数据集可以通过转换(例如map,filter等等)这些数据集衍生而来。

2. Flink程序构成(Anatomy of a Flink Program)

Flink程序看起来和那些转换数据集的常规程序一样。每个程序都由下面几个相同的基本部分构成:

1)获取执行环境;

2)加载/创建初始数据;

3)指定转换操作;

4)指定计算结果输出方式;

5)开始执行。

现在我们大致介绍下这几个步骤。详细信息还请查看相应章节。请注意:Scala Data API的核心类在org.apache.flink.api.scala
<https://github.com/apache/flink/blob/master//flink-scala/src/main/scala/org/apache/flink/api/scala>
 包下,Scala DataStream API的核心类在org.apache.flink.streaming.api.scala
<https://github.com/apache/flink/blob/master//flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala>
包下。


StreamingExecutionEnvironment是所有Flink程序的基础,可以通过StreamingExecutionEnvironment的静态方法获得:
getExecutionEnvironment() createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

一般来说,你只需要使用getExecutionEnvironment()方法,因为这个方法将根据上下文返回正确的执行环境:比如说你使用IDE或者作为通用Java程序来运行你的程序,它将创建一个本地环境,在本地机器上执行你的程序。如果你把程序打成了jar包,并用过命令行启动,Flink集群管理器将会执行你的main方法,并且getExecutionEnvironment()方法会返回一个在集群上运行你的程序的执行环境。


至于指定数据源,执行环境有多种方法来读取文件:你可以一行行的读,读取CSV文件,或者使用完全自定义的输入数据格式。仅仅读取一个text文件为行队列,你可以使用下面的方法:
val env = StreamExecutionEnvironment.getExecutionEnvironment() val text:
DataStream[String] = env.readTextFile("file:///path/to/file")
这将返回一个DataStream,在这之上你可以应用转换操作来创建新的衍生的DataStreams。

你可以在Dataset是调用转换函数来转换DataSet(官网这里有问题)。比如map操作:
val input: DataSet[String] = ... val mapped = input.map { x => x.toInt }
通过将原始集合中的每一个字符串转换成整型,这将创建一个新的DataStream(很明显官网这里是有问题的,但是操作是一样的)。





得到包含最终结果的DataStream后,你可以创建一个sink来将结果写出到外部系统:
writeAsText(path: String) print()

完成整个程序的逻辑编码后,你需要调用StreamingExecutionEnvironment的execute()方法来触发程序执行。根据执行环境ExecutionEnvironment的不同,程序将在本地或者集群上运行。

execute()方法有一个返回值:JobExecutionResult,这个返回值包含了程序执行时间和累加器结果。


(JobExecutionResult继承自JobSubmissionResult,JobSubmissionResult有JobID属性,所以通过JobExecutionResult也可获得JobID属性。)



至于DataSet和DataStream的source和sink的详细信息,请参考相应的指导文档。

3. 延迟计算(Lazy Evaluation)


所有的Flink程序都是延迟执行的:当执行程序的主方法时,数据的加载和转化操作并没有直接发生,而是创建这些操作,并添加到程序计划。这些操作实际是在ExecutionEnvironment的execute()方法触发后执行的。程序是在本地执行还是在集群上运行,取决于ExecutionEnvironment的类型。

延迟计算使得你可以构建复杂的程序,而Flink只需把它当做一个整体的计划单元运行。

4. 指定键(Specifying Keys)


一些转换操作(join,coGroup,keyBy,groupBy)要求集合内的元素需要定义有键。另一些操作(Reduce,GroupReduce,Aggregate,Windows)需要在使用这些操作前将数据按key分组。

DataSet这样分组(官网不给力,没给出Scala版本):
val input: DataSet[String] = env.readTextFile("src/main/resources/kv.txt") val
reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
DataStream这样指定key:
val input: DataStream[String] = env.readTextFile("src/main/resources/kv.txt")
val windowed = input .keyBy(/*define key here*/) .window(/*window
specification*/)
Flink的数据模型不是基于键值对的,因此,你不需要手动的把数据集打包成键值对。键是虚拟的:他们被定义为实际数据上的函数,来引导分组操作。

注意:下面的讨论中,我们将使用DataStream API和KeyBy展示。对于DataSet API,只需要用DataSet和groupBy替换即可。

4.1 为Tuple定义键(Define keys for Tuples)

最简单的用例是根据Tuple的一个或多个字段对Tuple分组:
val input: DataStream[(Int, String, Long)] = // [...] val keyed =
input.keyBy(0)
Tuple以它的第一个字段分组(也就是示例中Int类型的那个字段)
val input: DataSet[(Int, String, Long)] = // [...] val grouped =
input.groupBy(0,1)
这里,我们使用了一个组合键来对Tuple分组。这个组合件由第一和第二个字段组成。

嵌套Tuple需要注意的一个点:如果你的DataStream内有嵌套的tuple,比如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

使用KeyBy(0)指定键,系统将会使用整个Tuple2作为键(整型和浮点型的)。如果想使用Tuple2内部字段作为键,你可以使用字段来表示键,这种方法会在后面阐述。

4.2 使用字段表达式定义键(Define keys using Field Expressions)

你可以使用基于字符串的字段表达式来引用嵌套字段,用这些字段来为grouping,sorting,joining或者coGroupping定义键。

字段表达式使选择嵌套(组合)类型数据(例如Tuple,POJO)中的字段变得非常容易。

在下面的例子中,我们有一个包含两个字段:word和count的 wc POJO。为了根据word字段分组,我们只需把字段名传给KeyBy()函数:
// some ordinary POJO (Plain old Java Object) class WC(var word: String, var
count: Int) { def this() { this("", 0L) } } val words: DataStream[WC] = //
[...] val wordCounts = words.keyBy("word").window(/*window specification*/) //
or, as a case class, which is less typing case class WC(word: String, count:
Int) val words: DataStream[WC] = // [...] val wordCounts =
words.keyBy("word").window(/*window specification*/)
字段表达式语法:

*
通过字段名选择POJO的字段。例如 user 表示 一个POJO的user字段;

*
Tuple通过offset来选择,"_1"和"5"分别代表第一和第六个Scala Tuple字段

*
POJO和Tuple的嵌套字段也可以拿到。例如 "user.zip"可以表示POJO的user属性的zip属性。任意的嵌套和混合都是支持的,例如
"_2.user.zip"或"user._4.1.zip"

*
也可以选择全类型,使用通配符表达式"_"。这对不是POJO或者Tuple的类型也适用。

字段表达式示例:
class WC(var complex: ComplexNestedClass, var count: Int) { def this() {
this(null, 0) } } class ComplexNestedClass( var someNumber: Int, someFloat:
Float, word: (Long, Long, String), hadoopCitizen: IntWritable) { def this() {
this(0, 0, (0, 0, ""), new IntWritable(0)) } }
上述示例代码的有效字段表达式如下:

*
"count": wc 类的count字段;

*
"complex": 递归的选取ComplexNestedClass的所有字段;

*
"complex.word._3":ComplexNestedClass类中的tuple word的第三个字段;

*
"complex.hadoopCitizen":选择Hadoop IntWritable类型。

4.3 使用键选择器函数定义键(Define keys using Key Selector Functions)

还有一种定义键的方式叫做“键选择器”函数。键选择器函数需要一个元素作为入参,返回这个元素的键。这个键可以是任何类型的,也可从指定计算中生成。

下面的示例展示了一个键选择函数,这个函数仅仅返回了一个对象的字段。
// some ordinary case class case class WC(word: String, count: Int) val words:
DataStream[WC] = // [...] val keyed = words.keyBy( _.word )
5. 指定转换函数(Specifying Transformation Functions)

大多数的转换操作需要用户自己定义函数。这一章节列举了指定这些函数的几种不同方式。

5.1 Lambda函数(Lambda Functions)

之前见过的,所有的操作都能接受Lambda函数来描述操作:
val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...] data.reduce { (i1,i2) => i1 + i2 } // or
data.reduce { _ + _ }
5.2 Rich Functions

所有能把Lambda函数当做参数接收的转换操作都可以接收Rich函数来替换Lambda函数。例如:
data.map { x => x.toInt }
可以写成:
class MyMapFunction extends RichMapFunction[String, Int] { def map(in:
String):Int = { in.toInt } }; data.map(new MyMapFunction())
Rich函数也可以定义成匿名的:
data.map (new RichMapFunction[String, Int] { def map(in: String):Int = {
in.toInt } })

Rich函数除了提供用户自定义函数(map,reduce等),还提供了四种方法:open,close,getRuntimeContext和setRuntimecontext。这些功能在参数化函数、创建和确定本地状态、获取广播变量、获取运行时信息(例如累加器和计数器)和迭代信息时非常有帮助。

6. 支持的数据类型(Supported Data Types)

Flink对DataSet和DataStream中可使用的元素类型添加了一些约束。原因是系统可以通过分析这些类型来确定有效的执行策略。

有7中不同的数据类型:

*
Java Tuple 和 Scala Case类;

*
Java POJO;

*
基本类型;

*
通用类;

*
值;

*
Hadoop Writables;

*
特殊类型。

6.1 Tuples 和 Case 类

Scala的Case类(以及Scala的Tuple,实际是Case
class的特殊类型)是包含了一定数量多种类型字段的组合类型。Tuple字段通过他们的1-offset名称定位,例如 _1代表第一个字段。Case class
通过字段名称获得:
case class WordCount(word: String, count: Int) val input = env.fromElements(
WordCount("hello", 1), WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word" val input2 =
env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set input2.keyBy(0,
1) // key by field positions 0 and 1
6.2 POJOs

Java和Scala的类在满足下列条件时将会被Flink视作特殊的POJO数据类型:

*
是公共类;

*
无参构造是公共的;

*
所有的参数都是可获得的(声明为公共的,或提供get,set方法);

*
字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象(例如Date)

Flink会分析POJO类型的结构,它会获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。
class WordWithCount(var word: String, var count: Int) { def this() {
this(null, -1) } } val input = env.fromElements( new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set input.keyBy("word")// key
by field expression "word"
6.3 基本类型

Flink支持Java和Scala所有的基本数据类型,比如 Integer,String,和Double.

6.4 一般通用类

Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。

所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。

6.5 值


值类型需要自己描述他们的序列化/反序列化方式。他们通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架。当通用序列化非常低效的时候可以使用值类型。举个列子:将一个元素稀疏的向量表示为数组。知道了数组的元素几乎都是0,我们可以对非零元素进行特殊编码,而通用序列化器却会简单的写出所有元素。

以类似的方式,org.apache.flinktypes.CopyableValue接口支持人工内置克隆逻辑(然而并不知道什么意思)。


对应于基本数据类型,Flink提供了预定义数值类型。(ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。这些数值类型充当这些基本数据类型的可变变量:他们的值是可变的,允许程序员复用对象,以减小垃圾回收器的压力。

6.6 Hadoop Writables


可以使用实现了org.apache.hadoop.Writable接口的类型。定义在write()和ReadFields(0方法中的序列化逻辑,将被用来序列化。

6.7 特殊类型

也可以使用特殊类型,包括Scala的 Either,Option和Try。Java
ApI有它自己的Either实现。类似于Scala的Either,代表两个类型中的一个类型的值,左或者右类型。Either在处理异常或者需要输出两种不同类型记录的时候非常有用。

7. 累加器和计数器(Accumulators & Counters)

累加器构造很简单, 只需要一个add操作和最终累加结果,这个在程序结束之后才能获得。

最直接的累加器是个计数器:你可以使用Accumulator.add(v
value)方法使之增加。作业结束后,Flink会对所有的部分结果求和,并返回结果给客户端。

在Debugg或者想快速知道数据更多信息的时候比较有用。

Flink当前有下列这些内置的累加器。这些累加器都实现了Accumulator接口

*
IntCounter,LongCounter和DoubleCounter.示例在后面

*

Histogram:一个离散数量容器的柱状图实现。其内部,只是一个Integer到Ingteger的Map.你可以用它来计算值得分布。例如对于一个wordCount程序,可以计算每行的单词的分布。

7.1 怎样使用累加器

首先你需要在用户自定义的转换操作里创建一个累加器对象(这里使用Counter)(这里又没有Scala版本)。
val counter = new IntCounter()
然后你需要注册这个累加器。
getRuntimeContext.addAccumulator("intCounter",intCounter)
现在可以再任何操作函数中使用这个,包括在open()和close()方法里.
intCounter.add(1)
所有的结构都存储在JobExecutionResult对象中(由执行环境的execute()方法返回,并且只有job运行结束后可用)。
val counter = env.execute("AccumulatorDemo")
.getAccumulatorResult("intCounter")
每个job的所有累加器都共享一个命名空间,因此你在同一个job中的不同操作函数里可以使用同一个累加器。Flink会在内部聚合所有名称相同的累加器。


关于累加器和迭代主要注意的一点:当前累加器的结果只能在所有的job都结束后才可获取。我们计划让当前迭代的结果在下一个迭代中可用。你可以使用Aggregators来计算每个迭代的统计信息,并基于这些信息终止迭代。

7.2 自定义累加器

实现自己的累加器只需要实现Accumulator接口即可。你可以选择实现Accumulator或者SimpleAccumulator。


Accumulator<V,R>是最灵活的:它为add的值定义了一个v类型,结果值定义了R类型。对于柱状图,v是数值,R是柱子。SimpleAccumulator适用于两种类型相同的情况。