3 Prepare Data for Modeling

All the data is dirty, Whether it's a dataset downloaded from the Internet, Or other sources. You can't model until you test and prove that your data is clean. therefore, Data sets need to be cleaned up for modeling, You also need to check the feature distribution of the dataset, And confirm that they meet predefined standards.

3.1 Check for duplicates, Missing and outliers

* Duplicate item
Generate a simpledataframe as follows:
>>> df = spark.createDataFrame([ ... (1,144.5,5.9,33,'M'), ... (2,167.2,5.4,45,
'M'), ... (3,124.1,5.2,23,'F'), ... (4,144.5,5.9,33,'M'), ... (5,133.2,5.7,54,
'F'), ... (3,124.1,5.2,23,'F'), ... (5,129.2,5.3,42,'M'), ... ],['id','weight',
'height','age','gender'])
Obviously, There are only a few lines of this data, You can find out if there are duplicate values at a glance. But for a million levels of data?
First thing, Just use.distinct() Method examination.
>>> print('count of rows: {0}'.format(df.count())) count of rows: 7 >>> print('
count of distinct rows: {0}'.format(df.distinct().count())) count of distinct
rows:6
Then? use .dropDuplicates(…) Remove duplicates.
>>> df = df.dropDuplicates() >>> df.show() +---+------+------+---+------+ |
id|weight|height|age|gender| +---+------+------+---+------+ | 5| 133.2| 5.7|
54| F| | 5| 129.2| 5.3| 42| M| | 1| 144.5| 5.9| 33| M| | 4| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|| 3| 124.1| 5.2| 23| F| +---+------+------+---+------+
The above code is based onID Column except for a completely repeated row, We can useID The columns out of the columns are de duplicated again.
>>> print ('count of ids: {0}'.format(df.count())) count of ids: 6 >>> print('
count of distinct ids: {0}'.format(df.select([c for c in df.columns if c!='id
']).distinct().count())) count of distinct ids: 5 >>> df =
df.dropDuplicates(subset = [c for c in df.columns if c!='id']) >>> df.show()
+---+------+------+---+------+ | id|weight|height|age|gender|
+---+------+------+---+------+ | 5| 133.2| 5.7| 54| F| | 1| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M| | 3| 124.1| 5.2| 23| F|| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+
Now we know there's no complete line repeat, Or any of the same lines onlyID Different, Let's see if there's a repeatID.
To calculate in one stepID Total and different quantities of, We can use.agg(…) Method
>>> import pyspark.sql.functions as fn >>> df.agg(fn.count('id').alias('count'
),... fn.countDistinct('id').alias('distinct')).show() +-----+--------+
|count|distinct| +-----+--------+ | 5| 4| +-----+--------+
Use fn.monotonically_increasing_id() Method reassignmentID.
>>> df.withColumn('new_id',fn.monotonically_increasing_id()).show() +---+------
+------+---+------+-------------+ | id|weight|height|age|gender| new_id|
+---+------+------+---+------+-------------+ | 5| 133.2| 5.7| 54| F|
25769803776| | 1| 144.5| 5.9| 33| M| 171798691840| | 2| 167.2| 5.4| 45| M|
592705486848| | 3| 124.1| 5.2| 23| F|1236950581248|| 5| 129.2| 5.3| 42|
M|1365799600128| +---+------+------+---+------+-------------+
* Missing values
If your data is discrete Boolean, You can add a third category by( Defect) Turn it into a categorical variable;

If you're dealing with consecutive numbers, The average value can be used, Median or other predefined values( for example, First or third quartile depending on the distribution shape of the data) Replace missing values.
>>> df_miss = spark.createDataFrame([ ... (1,143.5,5.6,28,'M',100000), ... (2,
167.2,5.4,45,'M',None), ... (3,None,5.2,None,None,None), ... (4,144.5,5.9,33,'M'
,None),... (5,133.2,5.7,54,'F',None), ... (6,124.1,5.2,None,'F',None)], ... [
'id','weight','height','age','gender','income']) >>>
df_miss_no_income=df_miss.select([cfor c in df_miss.columns if c!='income'])
>>> df_miss_no_income.dropna(thresh=3).show() +---+------+------+----+------+ |
id|weight|height| age|gender| +---+------+------+----+------+ |1| 143.5| 5.6| 28
| M| |2| 167.2| 5.4| 45| M| | 4| 144.5| 5.9| 33| M| | 5| 133.2| 5.7| 54| F| | 6|
124.1| 5.2|null| F| +---+------+------+----+------+
Can use.dropna(…) Method delete missing value, use.fillna(…) Method replace missing value.
>>> means = df_miss_no_income.agg(*[fn.mean(c).alias(c) for c in df_miss_no_
income.columns if c !='gender']).toPandas().to_dict('records')[0] >>>
means['gender']='missing' >>> df_miss_no_income.fillna(means).show() +---+------
+------+---+-------+ | id|weight|height|age| gender|
+---+------+------+---+-------+ | 1| 143.5| 5.6| 28| M| | 2| 167.2| 5.4| 45| M|
| 3| 142.5| 5.2| 40|missing| | 4| 144.5| 5.9| 33| M| | 5| 133.2| 5.7| 54| F||
6| 124.1| 5.2| 40| F| +---+------+------+---+-------+
* Outliers
Outliers are those observations that are significantly different from the rest of the sample.

Generally defined as, If all the values are aboutQ1-1.5IQR andQ3 + 1.5IQR Within limits, Then it can be considered that there is no abnormal value, amongIQR Is the interquartile spacing;
IQR Defined as the third quartileQ3 And the first quartileQ1 Gap.

Remarks:

* First quartile (Q1), also called“ Lower quartile”, Equal to the number of all values in the sample arranged from small to large25% Figures.
* second quartile (Q2), also called“ Median”, Equal to the number of all values in the sample arranged from small to large50% Figures.
* third quartile (Q3), also called“ Larger quartile”, Equal to the number of all values in the sample arranged from small to large75% Figures. >>> df_outliers =
spark.createDataFrame([... (1, 143.5, 5.3, 28), ... (2, 154.2, 5.5, 45), ... (3,
342.3, 5.1, 99), ... (4, 144.5, 5.5, 33), ... (5, 133.2, 5.4, 54), ... (6, 124.1
,5.1, 21), ... (7, 129.2, 5.3, 42), ... ], ['id', 'weight', 'height', 'age'])
use .approxQuantile(…)
Method to calculate quartile, The first parameter specified is the name of the column, The second parameter can be between0 or1 Number between( among0.5 Means calculating the median) Or list( In our case), The third parameter specifies the error of the acceptable measurement( If set to0, It will calculate the exact value of the measurement, But it can be very resource intensive).
>>> cols = ['weight','height','age'] >>> bounds={} >>> for col in cols: ...
quantiles = df_outliers.approxQuantile(col,[0.25,0.75],0.05) ... IQR =
quantiles[1]-quantiles[0] ... bounds[col] = [quantiles[0]-1.5*IQR,quantiles[1]+
1.5*IQR] ...
Filter out outliers:
outliers = df_outliers.select(*['id'] + [ ( (df_outliers[c] < bounds[c][0]) | (
df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
outliers.show() df_outliers = df_outliers.join(outliers, on='id') df_outliers.
filter('weight_o').select('id', 'weight').show() df_outliers.filter('age_o'
).select('id', 'age').show()
3.2 descriptive statistics

Cut by comma, And convert each element to an integer:
>>> sc = spark.sparkContext >>> fraud = sc.textFile('ccFraud.csv.gz') >>>
header = fraud.first()>>> fraud = fraud.filter(lambda row: row != header).map(
lambda row: [int(elem) for elem in row.split(',')])
Establishdataframe Ofschema:
>>> fields = [typ.StructField(h[1:-1],typ.IntegerType(),True) for h in
header.split(',')] >>> schema = typ.StructType(fields)
Establishdataframe:
>>> fraud_df = spark.createDataFrame(fraud,schema)
Seeschema:
>>> fraud_df.printSchema() root |-- custID: integer (nullable = true) |--
gender:integer (nullable = true) |-- state: integer (nullable = true) |--
cardholder:integer (nullable = true) |-- balance: integer (nullable = true) |--
numTrans:integer (nullable = true) |-- numIntlTrans: integer (nullable = true)
|-- creditLine:integer (nullable = true) |-- fraudRisk: integer (nullable = true
)
use.groupby(…) Method group statistics:
fraud_df.groupby('gender').count().show() +------+------+ |gender|count |
+------+------+ | 1 |6178231| | 2 |3821769| +------+------+
use .describe() Methods descriptive statistics of the data:
numerical = ['balance', 'numTrans', 'numIntlTrans'] desc =
fraud_df.describe(numerical)desc.show()

From the descriptive statistics above, we can see two points:

1) All the features are positively skewed, The maximum is several times the average.
2) Dispersion coefficient(coefficient of variation, Or coefficient of variation) Very high, Approaching or even surpassing1, It shows that the data is very discrete, A wide range of fluctuations.

Remarks:

* Positive tilt(positively skewed): Average >
Median, Because there are some very large extreme values in the data, So that the overall average is increased by a very small number of extreme large values, Be commonly called“ Average”, And the median is very little affected by extreme values, Therefore, the estimation with median as the central trend is relatively stable.
* Negative inclination: Empathy.
* Dispersion coefficient = standard deviation / average value
Check the skewness of a feature:
fraud_df.agg({'balance': 'skewness'}).show()


Common other functions include:avg() , count() , countDistinct() , first() , kurtosis() , max() ,
mean() , min() , skewness() , stddev() , stddev_pop() , stddev_samp() , sum() ,
sumDistinct() , var_pop() , var_samp() and variance() etc..

Another very useful measure of the relationship between features is correlation(correlation).

Your model should usually only include features that are highly relevant to your goal. however, It is almost equally important to examine the correlation between these features, The best choice is that features are almost unrelated, Meanwhile, the feature is highly related to the target.

As long as the data isDataFrame format, stayPySpark It's very easy to calculate correlation in. The only difficulty is.corr(…)
Method now supportsPearson correlation coefficient, And it can only calculate pairs of correlations, as follows:
fraud_df.corr('balance', 'numTrans')
Create a correlation matrix:
n_numerical = len(numerical) corr = [] for i in range(0, n_numerical): temp =
[None] * ifor j in range(i, n_numerical): temp.append
(fraud_df.corr(numerical[i], numerical[j])) corr.append(temp)

It can be seen that there is almost no correlation between features, therefore, All the features can be used in our model.

3.3 visualization

Preparation:
%matplotlib inline import matplotlib.pyplot as plt
histogram(Histograms) Is the easiest way to evaluate the distribution of features.
usepyspark There are three ways to generate histogram:

* Summaryworkers Data in, And return a summarizedbin list, And in each of the histogramsbin Countingdriver.
* Return all data todriver, And allow the drawing library method to do this for you.
* Sample data, Then return them todriver Mapping.
If the data is millions of lines, The second method is obviously not desirable. So you need to aggregate the data first.
hists = fraud_df.select('balance').rdd.flatMap( lambda row: row ).histogram(20)
Mapping:
data = { 'bins': hists[0][:-1], 'freq': hists[1] } plt.bar(data['bins'],
data['freq'], width=2000) plt.title('Histogram of \'balance\'')