5 Introducing the ML Package
在前面,我们使用了Spark中严格基于RDD的MLlib包。 在这里,我们将基于DataFrame使用MLlib包。
另外,根据Spark文档,现在主要的Spark机器学习API是spark.ml包中基于DataFrame的一套模型。
5.1 ML包的介绍
从顶层上看,ML包主要包含三大抽象类:转换器、预测器和工作流。

5.1.1 转换器(Transformer)
从Transformer抽象类派生出来的每一个新的Transformer都需要实现一个.transform(…)
方法,该方法可以将一个DataFrame转换成另一个DataFrame。
在spark.ml.feature中有许多Transformer:

* Binarizer :给定一个阈值,该方法需要一个连续的变量将其转换为二进制。
*
Bucketizer:分箱(分段处理):将连续数值转换为离散类别比如特征是年龄,是一个连续数值,需要将其转换为离散类别(未成年人、青年人、中年人、老年人),就要用到Bucketizer了。
* ChiSqSelector:对于分类目标变量(考虑到分类模型),此方法允许你预定义数量的特征(通过numTopFeatures参数指定)。
选择完成后,如方法的名称所示,使用卡方检验。 需要两步:首先,你需要.fit(…)
数据(为了这个方法可以计算卡方检验)。然后,调用.fit(…)方法(将你的DataFrame作为参数传递)返回一个可以用.transform(…)转换的ChiSqSelectorModel对象。
*
CountVectorizer:将文本文档转换为单词计数的向量。当不存在先验字典时,Countvectorizer作为Estimator提取词汇进行训练,并生成一个CountVectorizerModel用于存储相应的词汇向量空间。该模型产生文档关于词语的稀疏表示,其表示可以传递给其他算法,
* HashingTF : 生成词频率向量。它采用词集合并将这些集合转换成固定长度的特征向量。在文本处理中,“一组词”可能是一袋词。
HashingTF使用散列技巧。通过应用散列函数将原始要素映射到索引,然后基于映射的索引来计算项频率。
* IDF : 此方法计算逆文档频率。需要注意的是文本首先要用向量表示,可以用HashingTF 或者 CountVectorizer。
*
MinMaxScaler:最大-最小规范化,将所有特征向量线性变换到用户指定最大-最小值之间。但注意在计算时还是一个一个特征向量分开计算的。通常将最大,最小值设置为1和0,这样就归一化到[0,1]。Spark中可以对min和max进行设置,默认就是[0,1]。(不移动中心点)
* MaxAbsScaler:同样对某一个特征操作,各特征值除以最大绝对值,因此缩放到[-1,1]之间。且不移动中心点。不会将稀疏矩阵变得稠密。
* NGram :返回 n-grams,两个连续的序列词,或三个或更多。例如 [‘good’,’morning’, ‘Robin’,
‘Williams’] ,返回两个即 [‘good morning’, ‘morning Robin’, ‘Robin Williams’] 。
* Normalizer :
将某个特征向量(由所有样本某一个特征组成的向量)计算其p-范数,然后对该每个元素除以p-范数。将原始特征Normalizer以后可以使得机器学习算法有更好的表现。(默认是L2)。
* PCA : 数据降维。
* RegexTokenizer:使用正则表达式。
* StandardScaler
:将某个特征向量(由所有样本某一个特征组成的向量)进行标准化,使数据均值为0,方差为1。Spark中可以选择是带或者不带均值和方差。
* StopWordsRemover :移除停用词。
* Tokenizer:这是默认的标记器,它将字符串转换为小写,然后在空间上分割。
* VectorAssembler:这是一个非常有用的转换器,将多个数值(包含向量)列整合到一个用向量表示的列中。
* VectorSlicer:适用于特征向量,无论是稠密还是稀疏:给定一个索引列表,它从特征向量中提取值。
* Word2Vec :该方法将一个句子(字符串)作为输入,并将其转换为{string,vector}格式的映射,这种格式在自然语言处理中非常有用。
String<->Index 相互转换:

* VectorIndexer:提高决策树或随机森林等ML方法的分类效果。VectorIndexer是对数据集特征向量中的类别(离散值)特征(index
categorical features categorical features
)进行编号。它能够自动判断哪些特征是离散值型的特征,并对他们进行编号,具体做法是通过设置一个maxCategories,特征向量中某一个特征不重复取值个数小于maxCategories,则被重新编号为0~K(K<=maxCategories-1)。某一个特征不重复取值个数大于maxCategories,则该特征视为连续值,不会重新编号(不会发生任何改变)。
* StringIndexer:按label出现的频次,转换成0~num numOfLabels-1(分类个数),频次最高的转换为0,
*
IndexToString:有StringIndexer,就应该有IndexToString。在应用StringIndexer对labels进行重新编号后,带着这些编号后的label对数据进行了训练,并接着对其他数据进行了预测,得到预测结果,预测结果的label也是重新编号过的,因此需要转换回来。
5.1.2 预测器(Estimators)

预测器可以被认为是需要评估的统计模型,来进行预测或对观测结果进行分类。
如果派生自抽象的Estimator类,则新模型必须实现.fit(…)方法,该方法给DataFrame中的数据以及一些默认或用户指定的参数泛化模型。

一、分类
ML包提供了七种分类模型,这里介绍四种常用的模型。

* LogisticRegression:逻辑回归是分类的基本模型。逻辑回归使用logit函数来计算观测到属于特定类别的概率。
* DecisionTreeClassifier
:构建一棵决策树以预测观察类别的分类器。maxDepth指定参数限制树的生长深度,minInstancePerNode确定进一步拆分所需的树节点中观察值的最小数目,maxBins参数指定连续变量将被分割的最大数量的区间,
impurity 指定测量和计算来自分割的信息增益的度量。
* RandomForestClassifier:这个模型产生多个决策树(因此称为森林),并使用这些决策树的模式输出分类结果。
RandomForestClassifier支持二元和多元标签。
* NaiveBayes:基于贝叶斯定理,这个模型使用条件概率来分类观测。 PySpark ML中的NaiveBayes模型支持二元和多元标签。
二、回归

PySpark ML包中有七种模型可用于回归任务。这里只介绍两种模型,如后续需要用可查阅官方手册。

* LinearRegression:最简单的回归模型,它假定了特征和连续标签之间的线性关系,以及误差项的正态性。
* DecisionTreeRegressor:与分类模型类似,标签是连续的而不是二元或多元的。
三、聚类

聚类是一种无监督的模型。PySpark ML包提供了四种模型。

* BisectingKMeans :k-means 聚类和层次聚类的组合。该算法以单个簇中的所有观测值开始,并将数据迭代地分成k个簇。
* KMeans :
将数据分成k个簇,随机生成k个初始点作为质心,将数据集中的数据按照距离质心的远近分到各个簇中,将各个簇中的数据求平均值,作为新的质心,重复上一步,直到所有的簇不再改变。
* GaussianMixture:这个方法使用k个未知的高斯分布参数来剖析数据集。使用期望最大化算法,通过最大化对数似然函数来找到高斯参数。
* LDA:此模型用于自然语言处理应用程序中的主题建模。
5.1.3 管道/工作流(Pipeline)

一个管道串起多个转换器和预测器,明确一个机器学习工作流。
预测上一篇中的婴儿存活率。

* 载入数据: import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT',
typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS',
typ.IntegerType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('CIG_BEFORE',
typ.IntegerType()), ('CIG_1_TRI', typ.IntegerType()), ('CIG_2_TRI',
typ.IntegerType()), ('CIG_3_TRI', typ.IntegerType()), ('MOTHER_HEIGHT_IN',
typ.IntegerType()), ('MOTHER_PRE_WEIGHT', typ.IntegerType()), (
'MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), ('MOTHER_WEIGHT_GAIN',
typ.IntegerType()), ('DIABETES_PRE', typ.IntegerType()), ('DIABETES_GEST',
typ.IntegerType()), ('HYP_TENS_PRE', typ.IntegerType()), ('HYP_TENS_GEST',
typ.IntegerType()), ('PREV_BIRTH_PRETERM', typ.IntegerType()) ] # Specifying
the schema of the DataFrame schema = typ.StructType([ typ.StructField(e[0], e[1
],False) for e in labels ]) births = spark.read.csv('births_transformed.csv.gz'
, header=True, schema=schema)
* 创建转换器
因为统计模型只能对数值数据进行操作,我们必须对birth_place变量进行编码。
import pyspark.ml.feature as ft # Casting the column to an IntegerType births
= births \ .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] \
.cast(typ.IntegerType())) #Using the OneHotEncoder to encode encoder =
ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT', outputCol='BIRTH_PLACE_VEC') #
Using the VectorAssembler to create a single column with all the features
collated together. featuresCreator = ft.VectorAssembler( inputCols=[col[0] for
colin labels[2:]] + \ [encoder.getOutputCol()], outputCol='features' )
* 创建预测器 import pyspark.ml.classification as cl logistic =
cl.LogisticRegression(maxIter=10, regParam=0.01, labelCol
='INFANT_ALIVE_AT_REPORT')
* 创建一个工作流
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[ encoder,
featuresCreator, logistic ])
* 训练模型 births_train, births_test = births \ .randomSplit([0.7, 0.3], seed=666
) model = pipeline.fit(births_train) test_model = model.transform(births_test)
* 评估 import pyspark.ml.evaluation as ev evaluator =
ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol=
'INFANT_ALIVE_AT_REPORT') print(evaluator.evaluate(test_model,
{evaluator.metricName:'areaUnderROC'})) print(evaluator.evaluate(test_model,
{evaluator.metricName:'areaUnderPR'}))


* 保存模型 pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath) # You canload it up later and
use it straight away to .fit(...)and predict: loadedPipeline = Pipeline.load
(pipelinePath) loadedPipeline \ .fit(births_train)\ .transform(births_test)