5 Introducing the ML Package
in front , We used Spark Based on RDD Of MLlib package . ad locum , We will be based on DataFrame use MLlib package .
in addition , according to Spark file , Now the main Spark machine learning API yes spark.ml Package based on DataFrame A set of models of .
5.1 ML Introduction to the package
From the top floor ,ML Package mainly contains three abstract classes : converter , Predictors and workflows .

5.1.1 converter (Transformer)
from Transformer Every new one derived from an abstract class Transformer All need to achieve one .transform(…)
method , This method can convert a DataFrame Convert to another DataFrame.
stay spark.ml.feature Many of them Transformer:

* Binarizer : Given a threshold , This method requires a continuous variable to convert it to binary .
*
Bucketizer: Sub container ( Segmented processing ): Convert continuous values to discrete categories such as age , Is a continuous number , It needs to be converted to a discrete category ( juveniles , puber , middle-aged person , aged ), It's about to be used Bucketizer 了 .
* ChiSqSelector: For classification target variables ( Considering the classification model ), This method allows you to predefine a number of features ( adopt numTopFeatures Parameter assignment ).
After selection , As shown by the name of the method , Chi square test . Two steps required : first , You need .fit(…)
data ( For this method, chi square test can be calculated ). then , call .fit(…) method ( Put your DataFrame Pass as parameter ) Return one that can be used .transform(…) Converted ChiSqSelectorModel object .
*
CountVectorizer: Vector to convert text document to word count . When there is no prior dictionary ,Countvectorizer As Estimator Extract vocabulary for training , And generate a CountVectorizerModel Used to store the corresponding word vector space . This model produces sparse representation of words in documents , Its representation can be passed to other algorithms ,
* HashingTF : Generate word frequency vector . It uses word sets and converts them into fixed length eigenvectors . In text processing ,“ A group of words ” It could be a bag of words .
HashingTF Using hashing techniques . Mapping original features to indexes by applying hash functions , Then calculate the item frequency based on the mapped index .
* IDF : This method calculates the inverse document frequency . It should be noted that the text is first represented by vectors , It can be used HashingTF perhaps CountVectorizer.
*
MinMaxScaler: maximum - Minimum normalization , Linear transform all feature vectors to user specified maximum - Between minimum values . However, it should be noted that when calculating, it is still a separate feature vector . Usually the maximum , Minimum set to 1 and 0, So it's normalized to [0,1].Spark Middle can be right min and max Set up , The default is [0,1].( Do not move center point )
* MaxAbsScaler: Also operate on a feature , Each eigenvalue divided by the maximum absolute value , So zoom to [-1,1] between . And do not move the center point . Do not make sparse matrix dense .
* NGram : return n-grams, Two consecutive sequential words , Or three or more . for example [‘good’,’morning’, ‘Robin’,
‘Williams’] , Return two [‘good morning’, ‘morning Robin’, ‘Robin Williams’] .
* Normalizer :
Put a feature vector ( A vector consisting of one feature of all samples ) Calculate its p- norm , Then divide each element by p- norm . Put the original feature Normalizer In the future, it can make machine learning algorithm have better performance .( The default is L2).
* PCA : Data dimensionality reduction .
* RegexTokenizer: Using regular expressions .
* StandardScaler
: Put a feature vector ( A vector consisting of one feature of all samples ) Standardize , Make the data mean value 0, Variance is 1.Spark With or without mean and variance .
* StopWordsRemover : Remove stop words .
* Tokenizer: This is the default marker , It converts a string to lowercase , And then divide it in space .
* VectorAssembler: This is a very useful converter , Set multiple values ( Include vector ) Columns into a vector column .
* VectorSlicer: For eigenvectors , Whether dense or sparse : Given an index list , It extracts values from eigenvectors .
* Word2Vec : This method takes a sentence ( character string ) As input , And convert it to {string,vector} Mapping of formats , This format is very useful in natural language processing .
String<->Index Mutual conversion :

* VectorIndexer: Improve decision tree or random forest, etc ML Classification effect of the method .VectorIndexer Is a class in the feature vector of a dataset ( Discrete value ) features (index
categorical features categorical features
) Number . It can automatically determine which features are discrete value features , And number them , By setting a maxCategories, The number of non repeated values of a feature in the feature vector is less than maxCategories, Is renumbered as 0~K(K<=maxCategories-1). The number of non repeated values of a feature is greater than maxCategories, The feature is treated as a continuous value , No renumbering ( Nothing will change ).
* StringIndexer: Press label Frequency of occurrence , convert to 0~num numOfLabels-1( Number of categories ), The highest frequency is converted to 0,
*
IndexToString: Yes StringIndexer, There should be IndexToString. In application StringIndexer Yes labels After renumbering , With these numbers label Trained the data , And then we forecast other data , Get forecast results , Predicted results label It's also renumbered , So we need to switch back .
5.1.2 predictor (Estimators)

Predictors can be considered statistical models that need to be evaluated , To predict or classify observations .
If derived from abstract Estimator class , The new model must implement .fit(…) method , This method gives DataFrame Data in and some default or user specified parameter generalization models .

One , classification
ML Package provides seven classification models , Here are four common models .

* LogisticRegression: Logical regression is the basic model of classification . Use of logical regression logit Function to calculate the probability of observation belonging to a specific category .
* DecisionTreeClassifier
: Constructing a decision tree to predict the classifier of observation categories .maxDepth Specify parameters to limit the growth depth of the tree ,minInstancePerNode Determines the minimum number of observations in the tree nodes required for further splitting ,maxBins Parameter specifies the interval of the maximum number of consecutive variables to be divided ,
impurity Specifies a measure to measure and calculate the information gain from the split .
* RandomForestClassifier: This model generates multiple decision trees ( So it's called forest ), And use the patterns of these decision trees to output the classification results .
RandomForestClassifier Support binary and multiple tags .
* NaiveBayes: Based on Bayes theorem , This model uses conditional probability to classify observations . PySpark ML In NaiveBayes Model supports binary and multivariate Tags .
Two , regression

PySpark ML There are seven models in the package that can be used for regression tasks . Only two models are introduced here , If you need to use it later, please refer to the official manual .

* LinearRegression: The simplest regression model , It assumes a linear relationship between features and continuous Tags , And the normality of the error term .
* DecisionTreeRegressor: Similar to the classification model , Labels are continuous rather than binary or multivariate .
Three , clustering

Clustering is an unsupervised model .PySpark ML Package provides four models .

* BisectingKMeans :k-means Combination of clustering and hierarchical clustering . The algorithm starts with all observations in a single cluster , And divide the data iteratively into k Clusters .
* KMeans :
Divide data into k Clusters , Random generation k Initial points as center of mass , Divide the data in the data set into clusters according to the distance from the center of mass , Average the data in each cluster , As a new center of mass , Repeat the previous step , Until all clusters don't change .
* GaussianMixture: This method uses k Unknown parameters of Gaussian distribution to analyze data set . Using expectation maximization algorithm , Finding Gaussian parameters by maximizing the log likelihood function .
* LDA: This model is used to model topics in NLP applications .
5.1.3 The Conduit / Workflow (Pipeline)

Multiple converters and predictors in one pipe , Define a machine learning workflow .
Predicting infant survival in the previous article .

* Load data : import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT',
typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS',
typ.IntegerType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('CIG_BEFORE',
typ.IntegerType()), ('CIG_1_TRI', typ.IntegerType()), ('CIG_2_TRI',
typ.IntegerType()), ('CIG_3_TRI', typ.IntegerType()), ('MOTHER_HEIGHT_IN',
typ.IntegerType()), ('MOTHER_PRE_WEIGHT', typ.IntegerType()), (
'MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), ('MOTHER_WEIGHT_GAIN',
typ.IntegerType()), ('DIABETES_PRE', typ.IntegerType()), ('DIABETES_GEST',
typ.IntegerType()), ('HYP_TENS_PRE', typ.IntegerType()), ('HYP_TENS_GEST',
typ.IntegerType()), ('PREV_BIRTH_PRETERM', typ.IntegerType()) ] # Specifying
the schema of the DataFrame schema = typ.StructType([ typ.StructField(e[0], e[1
],False) for e in labels ]) births = spark.read.csv('births_transformed.csv.gz'
, header=True, schema=schema)
* Create converter
Because the statistical model can only operate on numerical data , We have to be right birth_place Variable to code .
import pyspark.ml.feature as ft # Casting the column to an IntegerType births
= births \ .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] \
.cast(typ.IntegerType())) #Using the OneHotEncoder to encode encoder =
ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT', outputCol='BIRTH_PLACE_VEC') #
Using the VectorAssembler to create a single column with all the features
collated together. featuresCreator = ft.VectorAssembler( inputCols=[col[0] for
colin labels[2:]] + \ [encoder.getOutputCol()], outputCol='features' )
* Create predictor import pyspark.ml.classification as cl logistic =
cl.LogisticRegression(maxIter=10, regParam=0.01, labelCol
='INFANT_ALIVE_AT_REPORT')
* Create a workflow
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[ encoder,
featuresCreator, logistic ])
* Training model births_train, births_test = births \ .randomSplit([0.7, 0.3], seed=666
) model = pipeline.fit(births_train) test_model = model.transform(births_test)
* assessment import pyspark.ml.evaluation as ev evaluator =
ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol=
'INFANT_ALIVE_AT_REPORT') print(evaluator.evaluate(test_model,
{evaluator.metricName:'areaUnderROC'})) print(evaluator.evaluate(test_model,
{evaluator.metricName:'areaUnderPR'}))


* Save model pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath) # You canload it up later and
use it straight away to .fit(...)and predict: loadedPipeline = Pipeline.load
(pipelinePath) loadedPipeline \ .fit(births_train)\ .transform(births_test)