4 Introducing MLlib
MLlib NamelyMachine Learning Library.
4.1 Load and transform data
Dataset Download:births_train.csv.gz.
Create theschema:
import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT',
typ.StringType()), ('BIRTH_YEAR', typ.IntegerType()), ('BIRTH_MONTH',
typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS',
typ.IntegerType()), ('MOTHER_RACE_6CODE', typ.StringType()), ('MOTHER_EDUCATION'
, typ.StringType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), (
typ.StringType()), ('CIG_BEFORE', typ.IntegerType()), ('CIG_1_TRI',
typ.IntegerType()), ('CIG_2_TRI', typ.IntegerType()), ('CIG_3_TRI',
typ.IntegerType()), ('MOTHER_HEIGHT_IN', typ.IntegerType()), (
typ.IntegerType()), ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), (
'MOTHER_WEIGHT_GAIN', typ.IntegerType()), ('DIABETES_PRE', typ.StringType()), (
'DIABETES_GEST', typ.StringType()), ('HYP_TENS_PRE', typ.StringType()), (
'HYP_TENS_GEST', typ.StringType()), ('PREV_BIRTH_PRETERM', typ.StringType()), (
'NO_RISK', typ.StringType()), ('NO_INFECTIONS_REPORTED', typ.StringType()), (
'LABOR_IND', typ.StringType()), ('LABOR_AUGM', typ.StringType()), ('STEROIDS',
typ.StringType()), ('ANTIBIOTICS', typ.StringType()), ('ANESTHESIA',
typ.StringType()), ('DELIV_METHOD_RECODE_COMB', typ.StringType()), (
'ATTENDANT_BIRTH', typ.StringType()), ('APGAR_5', typ.IntegerType()), (
'APGAR_5_RECODE', typ.StringType()), ('APGAR_10', typ.IntegerType()), (
'APGAR_10_RECODE', typ.StringType()), ('INFANT_SEX', typ.StringType()), (
typ.IntegerType()), ('INFANT_ASSIST_VENTI', typ.StringType()), (
typ.StringType()), ('INFANT_SURFACANT', typ.StringType()), ('INFANT_ANTIBIOTICS'
, typ.StringType()), ('INFANT_SEIZURES', typ.StringType()), (
typ.StringType()), ('INFANT_MENINGOMYELOCELE', typ.StringType()), (
typ.StringType()), ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
'INFANT_BREASTFED', typ.StringType()) ] schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels ])
.read.csv(…) Method can read uncompressed or compressed comma separated values:
births = spark.read.csv('births_train.csv.gz', header=True, schema=schema)
header Parameter specified asTrue Indicates that the first row contains the header, We useschema Identify the right type of data.

A lot of features in our dataset are strings. These are mostly categorical variables, We need to convert to numerical form in some way.

Dictionary mapping:
recode_dictionary = { 'YNU': { 'Y': 1, 'N': 0, 'U': 0 } }
Our goal is to predict ‘INFANT_ALIVE_AT_REPORT’ yes 1 or 0.. therefore, We need to remove other baby related features:
selected_features = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE',
'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_trimmed = births.
Feature dictionary mapping:
# 0 Means that the mother does not smoke before or during pregnancy;1-97 Indicates the actual number of smokers,98 Express98 Or more; and99 Unknown, We will assume that the unknown is0 And recode accordingly. import
pyspark.sql.functionsas func def recode(col, key): return
recode_dictionary[key][col]def correct_cig(feat): return func \
.when(func.col(feat) !=99, func.col(feat))\ .otherwise(0) rec_integer =
func.udf(recode, typ.IntegerType())
recode() Method fromrecode_dictionary Mid returnkey Corresponding value,correct_cig() Method
correct_cig Method inspection featuresfeat When is the value of not equal to99, If not equal to99, Then return the value of the feature; If this value is equal to99, Then return0.

We can't justDataFrame Upper userecode function; It needs to be converted toSpark UnderstandingUDF.User Define Function,
User defined function, AbbreviationUDF, Users canSpark SQL To customize the actual needsUDF To process data.
rec_integer function: By passing in therecode Function and specify the return value data type, We can userec_integer Do dictionary mapping.

First, correct the characteristics related to smoking quantity:
births_transformed = births_trimmed \ .withColumn('CIG_BEFORE', correct_cig(
'CIG_BEFORE'))\ .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\ .withColumn(
'CIG_2_TRI', correct_cig('CIG_2_TRI'))\ .withColumn('CIG_3_TRI', correct_cig(
.withColumn(…) The first parameter of the method is the new column name, The second parameter is to specify a column of the original data.

Find out which characteristics areYes/No/Unknown :
cols = [(col.name, col.dataType) for col in births_trimmed.schema] YNU_cols =
[] for i, sin enumerate(cols): if s[1] == typ.StringType(): dis = births.select
(s[0]) \ .distinct() \ .rdd \ .map(lambda row: row[0]) \ .collect() if 'Y' in
dis: YNU_cols.append(s[0])
DataFrames Features can be batch converted while features are selected:
births.select([ 'INFANT_NICU_ADMISSION', rec_integer(
'INFANT_NICU_ADMISSION', func.lit('YNU') ) \ .alias(
Convert all with one list YNU_cols :
exprs_YNU = [ rec_integer(x, func.lit('YNU')).alias(x) if x in YNU_cols else x
for x in births_transformed.columns ] births_transformed = births_transformed.
4.2 Familiarity with data
4.2.1 descriptive statistics
use.colStats(…) Methods statistics. One thing to note is that, This method is based on sample descriptive statistics, If your dataset is less than100 Observed values, You may get some strange results.
import pyspark.mllib.stat as st import numpy as np numeric_cols = [
'MOTHER_WEIGHT_GAIN' ] numeric_rdd = births_transformed\ .select(numeric_cols)\
.rdd \ .map(lambda row: [e for e in row]) mllib_stats =
st.Statistics.colStats(numeric_rdd)for col, m, v in zip(numeric_cols,
mllib_stats.mean(), mllib_stats.variance()): print('{0}: \t{1:.2f} \t {2:.2f}'
.format(col, m, np.sqrt(v)))

.colStats(…) Input:RDD data
Functions of this method:count(),max(),min(),mean(), normL1(),normL2() ,numNonzeros() ,
variance() etc..

Calculate frequency of categorical variables:
categorical_cols = [e for e in births_transformed.columns if e not in
numeric_cols] categorical_rdd = births_transformed\ .select(categorical_cols)\
.rdd \ .map(lambda row: [e for e in row]) for i, col in
enumerate(categorical_cols): agg = categorical_rdd \ .groupBy(lambda row:
row[i]) \ .map(lambda row: (row[0], len(row[1]))) print(col,
sorted(agg.collect(), key=lambda el: el[1], reverse=True))

4.2.2 Relevance
Calculating the correlation between features:
corrs = st.Statistics.corr(numeric_rdd) for i, el in enumerate(corrs > 0.5):
correlated = [ (numeric_cols[j], corrs[i][j])for j, e in enumerate(el) if e ==
1.0 and j != i] if len(correlated) > 0: for e in correlated: print('{0}-to-{1}:
{2:.2f}' \ .format(numeric_cols[i], e[0], e[1]))

Discard highly relevant features, Keep some features:
features_to_keep = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE',
'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_transformed = births_transformed.
select([e for e in features_to_keep])
4.2.3 statistical test
We can't calculate the relevance of classification features. However, We can do chi square test, To determine if there are significant differences.
import pyspark.mllib.linalg as ln for cat in categorical_cols[1:]: agg =
births_transformed \ .groupby('INFANT_ALIVE_AT_REPORT') \ .pivot(cat) \
.count() agg_rdd = agg \ .rdd\ .map(lambda row: (row[1:])) \ .flatMap(lambda
row: [0 if e == None else e for e in row]) \ .collect() row_length =
len(agg.collect()[0]) - 1 agg = ln.Matrices.dense(row_length, 2, agg_rdd) test
= st.Statistics.chiSqTest(agg) print(cat, round(test.pValue,4))

First, we traverse all the classification variables, And pass’INFANT_ALIVE_AT_REPORT“ Feature acquisition count. Next, Convert them into aRDD, Then usepyspark.mllib.linalg Modules convert them into matrices.
The first parameter of the method specifies the number of rows in the matrix, In our case, It is the length of different values of classification features. The second parameter specifies the number of columns:“INFANT_ALIVE_AT_REPORT” The target variable has only two values. The last parameter is the list of values to convert to a matrix.
A small example:
>>> print(ln.Matrices.dense(3,2, [1,2,3,4,5,6])) DenseMatrix([[ 1. 4.], [ 2.
5.], [ 3. 6.]])
Test shows, All characteristics should be significantly different, It should help us to predict the survival opportunities of infants.

4.3 Create the final dataset

Now create the final dataset we will use to build the model, takeDataFrame convert toLabeledPoints OfRDD.LabeledPoint It's used to train machine learningMLlib structure. It consists of two attributes: Labels and features. Tag is our target variable, Features can beNumPy array, list,pyspark.mllib.linalg.SparseVector,pyspark.mllib.linalg.DenseVector orscipy.sparse Column matrix.
4.3.1 Create aLabeledPoints OfRDD

Before we build the final dataset, We need to deal with one last question first: Ours“BIRTH_PLACE” Feature is still a string. Any other classification variable can be used as is( Because they are now virtual variables), We will use hash encoding“BIRTH_PLACE” Features:
import pyspark.mllib.feature as ft import pyspark.mllib.regression as reg
hashing = ft.HashingTF(7) births_hashed = births_transformed \ .rdd \ .map(
lambda row: [ list(hashing.transform(row[1]).toArray()) if col == 'BIRTH_PLACE'
else row[i] for i, col in enumerate(features_to_keep)]) \ .map(lambda row: [[e]
if type(e) == int else e for e in row]) \ .map(lambda row: [item for sublist in
rowfor item in sublist]) \ .map(lambda row: reg.LabeledPoint( row[0],
ln.Vectors.dense(row[1:])) )

First, We create a hash model. Our characteristics are7 Grade, So hash value selection7. Next, If your dataset has many columns, But there are only a few nonzero values in a row,SparseVector Data structure is preferred. So using the model“BIRTH_PLACE” Feature toSparseVector, Then? Combine all features to create aLabeledPoint.
4.3.2 Cutting training set and test set

RDD There's a simple way,.randomSplit(…). This method uses a scale list to segment data set randomly:
births_train, births_test = births_hashed.randomSplit([0.6, 0.4])
4.4 Forecast
Ad locum, We will construct two models: Linear classifier - Logical regression and nonlinear classifier - Random forest. For the former, We will use all the features, And for the latter, We will use
ChiSqSelector(…) Method select the first four features.
4.4.1 MLlib Logical regression of
Spark 2.0 Use inLogisticRegressionWithLBFGS Model, The model uses quasi Newton(BFGS) optimization algorithm.
Training model:
from pyspark.mllib.classification \ import LogisticRegressionWithLBFGS
LR_Model = LogisticRegressionWithLBFGS \.train(births_train, iterations=10)
Required parameters yesLabeledPoints OfRDD.
Prediction classification:
LR_results = ( births_test.map(lambda row: row.label) \ .zip(LR_Model \
.predict(births_test\ .map(lambda row: row.features))) ).map(lambda row: (row[0
], row[1] * 1.0))
Evaluation model:
import pyspark.mllib.evaluation as ev LR_evaluation =
ev.BinaryClassificationMetrics(LR_results) print('Area under PR: {0:.2f}' \
.format(LR_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \
.format(LR_evaluation.areaUnderROC)) LR_evaluation.unpersist()

The model performs quite well,Precision-Recall Under the curve85% Area representation is very suitable.(ROC) The lower region can be understood as compared to a negative example of random selection, The probability of model level is higher than the positive case of random selection.
63% The value of can be considered acceptable.

MLlib Allow us to use chi square selector(Chi-Square) Choose the best features,ChiSqSelector(…)
Method can only be used for numerical characteristics; Classification variables need to be hashed or virtual encoded before they can be used.
selector = ft.ChiSqSelector(4).fit(births_train) topFeatures_train = (
births_train.map(lambda row: row.label) \ .zip(selector \ .transform
(births_train \.map(lambda row: row.features))) ).map(lambda row: reg
.LabeledPoint(row[0], row[1])) topFeatures_test = ( births_test.map(lambda row:
row.label) \ .zip(selector \ .transform(births_test \ .map(lambda row: row
.features))) ).map(lambda row: reg.LabeledPoint(row[0], row[1]))
4.4.2 MLlib Random forest
Training model:
from pyspark.mllib.tree import RandomForest RF_model = RandomForest \
.trainClassifier(data=topFeatures_train, numClasses=2,
categoricalFeaturesInfo={}, numTrees=6, featureSubsetStrategy='all', seed=666)

.trainClassifier(…) The first parameter of the method specifies the training data set.numClasses Indicates how many classes our target variable has. As for the third parameter, You can pass on a dictionary, Among themkey yesRDD Index of classification features in,key The value of represents the number of levels the classification feature has.
numTrees Specify the number of trees in the forest. The next parameter tells the model to use all the features in our dataset, Rather than just retaining the most descriptive features, The last parameter specifies the seed of the random part of the model.
Evaluation model:
RF_results = ( topFeatures_test.map(lambda row: row.label) \ .zip(RF_model \
.predict(topFeatures_test \ .map(lambda row: row.features))) ) RF_evaluation =
ev.BinaryClassificationMetrics(RF_results)print('Area under PR: {0:.2f}' \ .
format(RF_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format
(RF_evaluation.areaUnderROC)) RF_evaluation.unpersist()

Can see, Random forest is better than logistic regression model with fewer features. Let's look at the performance of logistic regression after feature reduction:
LR_Model_2 = LogisticRegressionWithLBFGS \ .train(topFeatures_train,
iterations=10) LR_results_2 = ( topFeatures_test.map(lambda row: row.label) \
.zip(LR_Model_2 \ .predict(topFeatures_test \ .map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0)) LR_evaluation_2 =
ev.BinaryClassificationMetrics(LR_results_2)print('Area under PR: {0:.2f}' \ .
format(LR_evaluation_2.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format
(LR_evaluation_2.areaUnderROC)) LR_evaluation_2.unpersist()