4 Introducing MLlib
MLlib 即Machine Learning Library。
4.1 载入数据并转换数据
数据集下载:births_train.csv.gz.
<http://%20www.tomdrabas.com/data/LearningPySpark/births_train.%20csv.gz.>。
创建数据集的schema:
import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT',
typ.StringType()), ('BIRTH_YEAR', typ.IntegerType()), ('BIRTH_MONTH',
typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS',
typ.IntegerType()), ('MOTHER_RACE_6CODE', typ.StringType()), ('MOTHER_EDUCATION'
, typ.StringType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), (
'FATHER_EDUCATION', typ.StringType()), ('MONTH_PRECARE_RECODE',
typ.StringType()), ('CIG_BEFORE', typ.IntegerType()), ('CIG_1_TRI',
typ.IntegerType()), ('CIG_2_TRI', typ.IntegerType()), ('CIG_3_TRI',
typ.IntegerType()), ('MOTHER_HEIGHT_IN', typ.IntegerType()), (
'MOTHER_BMI_RECODE', typ.IntegerType()), ('MOTHER_PRE_WEIGHT',
typ.IntegerType()), ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), (
'MOTHER_WEIGHT_GAIN', typ.IntegerType()), ('DIABETES_PRE', typ.StringType()), (
'DIABETES_GEST', typ.StringType()), ('HYP_TENS_PRE', typ.StringType()), (
'HYP_TENS_GEST', typ.StringType()), ('PREV_BIRTH_PRETERM', typ.StringType()), (
'NO_RISK', typ.StringType()), ('NO_INFECTIONS_REPORTED', typ.StringType()), (
'LABOR_IND', typ.StringType()), ('LABOR_AUGM', typ.StringType()), ('STEROIDS',
typ.StringType()), ('ANTIBIOTICS', typ.StringType()), ('ANESTHESIA',
typ.StringType()), ('DELIV_METHOD_RECODE_COMB', typ.StringType()), (
'ATTENDANT_BIRTH', typ.StringType()), ('APGAR_5', typ.IntegerType()), (
'APGAR_5_RECODE', typ.StringType()), ('APGAR_10', typ.IntegerType()), (
'APGAR_10_RECODE', typ.StringType()), ('INFANT_SEX', typ.StringType()), (
'OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()), ('INFANT_WEIGHT_GRAMS',
typ.IntegerType()), ('INFANT_ASSIST_VENTI', typ.StringType()), (
'INFANT_ASSIST_VENTI_6HRS', typ.StringType()), ('INFANT_NICU_ADMISSION',
typ.StringType()), ('INFANT_SURFACANT', typ.StringType()), ('INFANT_ANTIBIOTICS'
, typ.StringType()), ('INFANT_SEIZURES', typ.StringType()), (
'INFANT_NO_ABNORMALITIES', typ.StringType()), ('INFANT_ANCEPHALY',
typ.StringType()), ('INFANT_MENINGOMYELOCELE', typ.StringType()), (
'INFANT_LIMB_REDUCTION', typ.StringType()), ('INFANT_DOWN_SYNDROME',
typ.StringType()), ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()), (
'INFANT_BREASTFED', typ.StringType()) ] schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels ])
.read.csv(…) 方法可以读取未压缩的或者压缩的逗号分隔值:
births = spark.read.csv('births_train.csv.gz', header=True, schema=schema)
header 参数指定为True 表示第一行包含头,我们用schema 明确数据的正确类型。

我们的数据集中有很多特征是字符串。这些大多是分类变量,我们需要以某种方式转换为数值形式。

字典映射:
recode_dictionary = { 'YNU': { 'Y': 1, 'N': 0, 'U': 0 } }
我们的目标是预测 ‘INFANT_ALIVE_AT_REPORT’ 是 1 or 0.。因此,我们要去除其他与婴儿相关的特征:
selected_features = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE',
'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_BEFORE', 'CIG_1_TRI',
'CIG_2_TRI', 'CIG_3_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT', 'MOTHER_WEIGHT_GAIN', 'DIABETES_PRE', 'DIABETES_GEST',
'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_trimmed = births.
select(selected_features)
特征字典映射:
# 0意味着母亲在怀孕前或怀孕期间不抽烟;1-97表示抽烟的实际人数,98表示98或更多;而99表示未知,我们将假设未知是0并相应地重新编码。 import
pyspark.sql.functionsas func def recode(col, key): return
recode_dictionary[key][col]def correct_cig(feat): return func \
.when(func.col(feat) !=99, func.col(feat))\ .otherwise(0) rec_integer =
func.udf(recode, typ.IntegerType())
recode()方法从recode_dictionary中返回key对应的值,correct_cig() 方法
correct_cig方法检查特征feat的值何时不等于99,若不等于99,则返回特征的值;如果这个值等于99,则返回0。

我们不能直接在DataFrame上使用recode函数;它需要转换为Spark理解的UDF。User Define Function,
用户自定义函数,简称UDF,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据。
rec_integer函数:通过传入我们指定的recode函数并指定返回值数据类型,我们可以使用rec_integer做字典映射。

首先纠正与吸烟数量有关的特征:
births_transformed = births_trimmed \ .withColumn('CIG_BEFORE', correct_cig(
'CIG_BEFORE'))\ .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\ .withColumn(
'CIG_2_TRI', correct_cig('CIG_2_TRI'))\ .withColumn('CIG_3_TRI', correct_cig(
'CIG_3_TRI'))
.withColumn(…) 方法第一个参数是新列名,第二个参数是指定原数据的某列。

找出哪些特征是Yes/No/Unknown :
cols = [(col.name, col.dataType) for col in births_trimmed.schema] YNU_cols =
[] for i, sin enumerate(cols): if s[1] == typ.StringType(): dis = births.select
(s[0]) \ .distinct() \ .rdd \ .map(lambda row: row[0]) \ .collect() if 'Y' in
dis: YNU_cols.append(s[0])
DataFrames 可以在选择特征的同时批量转换特征:
births.select([ 'INFANT_NICU_ADMISSION', rec_integer(
'INFANT_NICU_ADMISSION', func.lit('YNU') ) \ .alias(
'INFANT_NICU_ADMISSION_RECODE')] ).take(5)
用一个列表转换所有的 YNU_cols :
exprs_YNU = [ rec_integer(x, func.lit('YNU')).alias(x) if x in YNU_cols else x
for x in births_transformed.columns ] births_transformed = births_transformed.
select(exprs_YNU)
4.2 熟悉数据
4.2.1 描述性统计
用.colStats(…)方法进行统计。需要注意的一点是,该方法是基于样本的描述性统计,如果你的数据集少于100个观测值,你可能会得到一些奇怪的结果。
import pyspark.mllib.stat as st import numpy as np numeric_cols = [
'MOTHER_AGE_YEARS','FATHER_COMBINED_AGE', 'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI',
'CIG_3_TRI', 'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT',
'MOTHER_WEIGHT_GAIN' ] numeric_rdd = births_transformed\ .select(numeric_cols)\
.rdd \ .map(lambda row: [e for e in row]) mllib_stats =
st.Statistics.colStats(numeric_rdd)for col, m, v in zip(numeric_cols,
mllib_stats.mean(), mllib_stats.variance()): print('{0}: \t{1:.2f} \t {2:.2f}'
.format(col, m, np.sqrt(v)))

.colStats(…)的输入:RDD数据
该方法的函数:count()、max()、min()、mean()、 normL1()、normL2() 、numNonzeros() 、
variance()等。

计算分类变量的频率:
categorical_cols = [e for e in births_transformed.columns if e not in
numeric_cols] categorical_rdd = births_transformed\ .select(categorical_cols)\
.rdd \ .map(lambda row: [e for e in row]) for i, col in
enumerate(categorical_cols): agg = categorical_rdd \ .groupBy(lambda row:
row[i]) \ .map(lambda row: (row[0], len(row[1]))) print(col,
sorted(agg.collect(), key=lambda el: el[1], reverse=True))


4.2.2 相关性
计算特征间的相关性:
corrs = st.Statistics.corr(numeric_rdd) for i, el in enumerate(corrs > 0.5):
correlated = [ (numeric_cols[j], corrs[i][j])for j, e in enumerate(el) if e ==
1.0 and j != i] if len(correlated) > 0: for e in correlated: print('{0}-to-{1}:
{2:.2f}' \ .format(numeric_cols[i], e[0], e[1]))

丢弃相关性很高的特征,保留部分特征:
features_to_keep = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE',
'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_1_TRI', 'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE',
'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_transformed = births_transformed.
select([e for e in features_to_keep])
4.2.3 统计检验
我们不能计算分类特征的相关性。不过,我们可以进行卡方检验,以确定是否存在显著差异。
import pyspark.mllib.linalg as ln for cat in categorical_cols[1:]: agg =
births_transformed \ .groupby('INFANT_ALIVE_AT_REPORT') \ .pivot(cat) \
.count() agg_rdd = agg \ .rdd\ .map(lambda row: (row[1:])) \ .flatMap(lambda
row: [0 if e == None else e for e in row]) \ .collect() row_length =
len(agg.collect()[0]) - 1 agg = ln.Matrices.dense(row_length, 2, agg_rdd) test
= st.Statistics.chiSqTest(agg) print(cat, round(test.pValue,4))


首先遍历所有的分类变量,并通过’INFANT_ALIVE_AT_REPORT“特征获取计数。接下来,将它们转换成一个RDD,然后再使用pyspark.mllib.linalg模块将它们转换成矩阵。
.Matrices.dense(…)
方法的第一个参数指定矩阵中的行数,在我们的例子中,它是分类特征的不同值的长度。第二个参数指定列数:“INFANT_ALIVE_AT_REPORT”目标变量只有两个值。最后一个参数是要转换成矩阵的值列表。
一个小例子:
>>> print(ln.Matrices.dense(3,2, [1,2,3,4,5,6])) DenseMatrix([[ 1., 4.], [ 2.,
5.], [ 3., 6.]])
测试表明,所有的特征应该是显著不同的,应该可以帮助我们预测婴儿的生存机会。

4.3 创建最终的数据集

现在创建我们将用来构建模型的最终数据集,将DataFrame转换成LabeledPoints的RDD。LabeledPoint是用来训练机器学习的一个MLlib结构。它由两个属性组成:标签和特征。标签是我们的目标变量,特征可以是NumPy数组,列表,pyspark.mllib.linalg.SparseVector,pyspark.mllib.linalg.DenseVector或scipy.sparse列矩阵。
4.3.1 创建一个LabeledPoints的RDD

在我们建立最终的数据集之前,我们首先需要处理一个最后的问题:我们的“BIRTH_PLACE”特征仍然是一个字符串。而任何其他的分类变量可以按原样使用(因为它们现在是虚拟变量),我们将使用哈希编码“BIRTH_PLACE”特征:
import pyspark.mllib.feature as ft import pyspark.mllib.regression as reg
hashing = ft.HashingTF(7) births_hashed = births_transformed \ .rdd \ .map(
lambda row: [ list(hashing.transform(row[1]).toArray()) if col == 'BIRTH_PLACE'
else row[i] for i, col in enumerate(features_to_keep)]) \ .map(lambda row: [[e]
if type(e) == int else e for e in row]) \ .map(lambda row: [item for sublist in
rowfor item in sublist]) \ .map(lambda row: reg.LabeledPoint( row[0],
ln.Vectors.dense(row[1:])) )

首先,我们创建哈希模型。我们的特征有7个级别,所以散列值选择7。接下来,如果你的数据集有很多列,但在一行中只有少部分非零值,SparseVector数据结构是首选的。因此使用模型将我们的“BIRTH_PLACE”特征转换为SparseVector,然后,将所有特征组合在一起创建一个LabeledPoint。
4.3.2 切割训练集与测试集

RDD有一个简单的方法,.randomSplit(…)。该方法采用一个比例列表随机分割数据集:
births_train, births_test = births_hashed.randomSplit([0.6, 0.4])
4.4 预测
在这里,我们将构造两种模型:线性分类器 - 逻辑回归和非线性分类器 - 随机森林。对于前者,我们将使用所有的特征,而对于后者,我们将使用
ChiSqSelector(…)方法选择前四个特征。
4.4.1 MLlib之逻辑回归
Spark 2.0中使用LogisticRegressionWithLBFGS模型,该模型使用拟牛顿(BFGS) 优化算法。
训练模型:
from pyspark.mllib.classification \ import LogisticRegressionWithLBFGS
LR_Model = LogisticRegressionWithLBFGS \.train(births_train, iterations=10)
要求的参数 是LabeledPoints的RDD。
预测分类:
LR_results = ( births_test.map(lambda row: row.label) \ .zip(LR_Model \
.predict(births_test\ .map(lambda row: row.features))) ).map(lambda row: (row[0
], row[1] * 1.0))
评估模型:
import pyspark.mllib.evaluation as ev LR_evaluation =
ev.BinaryClassificationMetrics(LR_results) print('Area under PR: {0:.2f}' \
.format(LR_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \
.format(LR_evaluation.areaUnderROC)) LR_evaluation.unpersist()


模型表现相当好,Precision-Recall曲线下的85%面积表示很适合。(ROC)下的区域可以被理解为与随机选择的负实例相比,模型等级的概率高于随机选择的正实例。
63%的值可以被认为是可以接受的。

MLlib允许我们用卡方选择器(Chi-Square)选择最好的特征,ChiSqSelector(…)
方法只能用于数值特征;分类变量需要在选择之前进行散列或虚拟编码才可以使用。
selector = ft.ChiSqSelector(4).fit(births_train) topFeatures_train = (
births_train.map(lambda row: row.label) \ .zip(selector \ .transform
(births_train \.map(lambda row: row.features))) ).map(lambda row: reg
.LabeledPoint(row[0], row[1])) topFeatures_test = ( births_test.map(lambda row:
row.label) \ .zip(selector \ .transform(births_test \ .map(lambda row: row
.features))) ).map(lambda row: reg.LabeledPoint(row[0], row[1]))
4.4.2 MLlib之随机森林
训练模型:
from pyspark.mllib.tree import RandomForest RF_model = RandomForest \
.trainClassifier(data=topFeatures_train, numClasses=2,
categoricalFeaturesInfo={}, numTrees=6, featureSubsetStrategy='all', seed=666)

.trainClassifier(…)方法的第一个参数指定了训练数据集。numClasses表示我们的目标变量有多少类。至于第三个参数,你可以传递一个字典,其中的key是RDD中分类特征的索引,key的值表示分类特征具有的级别数。
numTrees指定森林中树的数量。下一个参数告诉模型使用我们数据集中的所有特征,而不是只保留最具描述性的特征,而最后一个参数指定模型随机部分的种子。
评估模型:
RF_results = ( topFeatures_test.map(lambda row: row.label) \ .zip(RF_model \
.predict(topFeatures_test \ .map(lambda row: row.features))) ) RF_evaluation =
ev.BinaryClassificationMetrics(RF_results)print('Area under PR: {0:.2f}' \ .
format(RF_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format
(RF_evaluation.areaUnderROC)) RF_evaluation.unpersist()


可以看见,随机森林用更少的特征要优于逻辑回归模型。让我们看看逻辑回归减少特征后性能如何:
LR_Model_2 = LogisticRegressionWithLBFGS \ .train(topFeatures_train,
iterations=10) LR_results_2 = ( topFeatures_test.map(lambda row: row.label) \
.zip(LR_Model_2 \ .predict(topFeatures_test \ .map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0)) LR_evaluation_2 =
ev.BinaryClassificationMetrics(LR_results_2)print('Area under PR: {0:.2f}' \ .
format(LR_evaluation_2.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format
(LR_evaluation_2.areaUnderROC)) LR_evaluation_2.unpersist()

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信