3 Prepare Data for Modeling

3.1 检查重复项、缺失值和异常值

* 重复项

>>> df = spark.createDataFrame([ ... (1,144.5,5.9,33,'M'), ... (2,167.2,5.4,45,
'M'), ... (3,124.1,5.2,23,'F'), ... (4,144.5,5.9,33,'M'), ... (5,133.2,5.7,54,
'F'), ... (3,124.1,5.2,23,'F'), ... (5,129.2,5.3,42,'M'), ... ],['id','weight',
'height','age','gender'])

>>> print('count of rows: {0}'.format(df.count())) count of rows: 7 >>> print('
count of distinct rows: {0}'.format(df.distinct().count())) count of distinct
rows:6

>>> df = df.dropDuplicates() >>> df.show() +---+------+------+---+------+ |
id|weight|height|age|gender| +---+------+------+---+------+ | 5| 133.2| 5.7|
54| F| | 5| 129.2| 5.3| 42| M| | 1| 144.5| 5.9| 33| M| | 4| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|| 3| 124.1| 5.2| 23| F| +---+------+------+---+------+

>>> print ('count of ids: {0}'.format(df.count())) count of ids: 6 >>> print('
count of distinct ids: {0}'.format(df.select([c for c in df.columns if c!='id
']).distinct().count())) count of distinct ids: 5 >>> df =
df.dropDuplicates(subset = [c for c in df.columns if c!='id']) >>> df.show()
+---+------+------+---+------+ | id|weight|height|age|gender|
+---+------+------+---+------+ | 5| 133.2| 5.7| 54| F| | 1| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M| | 3| 124.1| 5.2| 23| F|| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+

>>> import pyspark.sql.functions as fn >>> df.agg(fn.count('id').alias('count'
),... fn.countDistinct('id').alias('distinct')).show() +-----+--------+
|count|distinct| +-----+--------+ | 5| 4| +-----+--------+

>>> df.withColumn('new_id',fn.monotonically_increasing_id()).show() +---+------
+------+---+------+-------------+ | id|weight|height|age|gender| new_id|
+---+------+------+---+------+-------------+ | 5| 133.2| 5.7| 54| F|
25769803776| | 1| 144.5| 5.9| 33| M| 171798691840| | 2| 167.2| 5.4| 45| M|
592705486848| | 3| 124.1| 5.2| 23| F|1236950581248|| 5| 129.2| 5.3| 42|
M|1365799600128| +---+------+------+---+------+-------------+
* 缺失值

>>> df_miss = spark.createDataFrame([ ... (1,143.5,5.6,28,'M',100000), ... (2,
167.2,5.4,45,'M',None), ... (3,None,5.2,None,None,None), ... (4,144.5,5.9,33,'M'
,None),... (5,133.2,5.7,54,'F',None), ... (6,124.1,5.2,None,'F',None)], ... [
'id','weight','height','age','gender','income']) >>>
df_miss_no_income=df_miss.select([cfor c in df_miss.columns if c!='income'])
>>> df_miss_no_income.dropna(thresh=3).show() +---+------+------+----+------+ |
id|weight|height| age|gender| +---+------+------+----+------+ |1| 143.5| 5.6| 28
| M| |2| 167.2| 5.4| 45| M| | 4| 144.5| 5.9| 33| M| | 5| 133.2| 5.7| 54| F| | 6|
124.1| 5.2|null| F| +---+------+------+----+------+

>>> means = df_miss_no_income.agg(*[fn.mean(c).alias(c) for c in df_miss_no_
income.columns if c !='gender']).toPandas().to_dict('records')[0] >>>
means['gender']='missing' >>> df_miss_no_income.fillna(means).show() +---+------
+------+---+-------+ | id|weight|height|age| gender|
+---+------+------+---+-------+ | 1| 143.5| 5.6| 28| M| | 2| 167.2| 5.4| 45| M|
| 3| 142.5| 5.2| 40|missing| | 4| 144.5| 5.9| 33| M| | 5| 133.2| 5.7| 54| F||
6| 124.1| 5.2| 40| F| +---+------+------+---+-------+
* 异常值

IQR被定义为第三四分位数Q3与第一四分位数Q1的差距。

* 第一四分位数 (Q1)，又称“较小四分位数”，等于该样本中所有数值由小到大排列后第25%的数字。
* 第二四分位数 (Q2)，又称“中位数”，等于该样本中所有数值由小到大排列后第50%的数字。
* 第三四分位数 (Q3)，又称“较大四分位数”，等于该样本中所有数值由小到大排列后第75%的数字。 >>> df_outliers =
spark.createDataFrame([... (1, 143.5, 5.3, 28), ... (2, 154.2, 5.5, 45), ... (3,
342.3, 5.1, 99), ... (4, 144.5, 5.5, 33), ... (5, 133.2, 5.4, 54), ... (6, 124.1
,5.1, 21), ... (7, 129.2, 5.3, 42), ... ], ['id', 'weight', 'height', 'age'])

>>> cols = ['weight','height','age'] >>> bounds={} >>> for col in cols: ...
quantiles = df_outliers.approxQuantile(col,[0.25,0.75],0.05) ... IQR =
quantiles[1]-quantiles[0] ... bounds[col] = [quantiles[0]-1.5*IQR,quantiles[1]+
1.5*IQR] ...

outliers = df_outliers.select(*['id'] + [ ( (df_outliers[c] < bounds[c][0]) | (
df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
outliers.show() df_outliers = df_outliers.join(outliers, on='id') df_outliers.
filter('weight_o').select('id', 'weight').show() df_outliers.filter('age_o'
).select('id', 'age').show()
3.2 描述性统计

>>> sc = spark.sparkContext >>> fraud = sc.textFile('ccFraud.csv.gz') >>>
lambda row: [int(elem) for elem in row.split(',')])

>>> fields = [typ.StructField(h[1:-1],typ.IntegerType(),True) for h in

>>> fraud_df = spark.createDataFrame(fraud,schema)

>>> fraud_df.printSchema() root |-- custID: integer (nullable = true) |--
gender:integer (nullable = true) |-- state: integer (nullable = true) |--
cardholder:integer (nullable = true) |-- balance: integer (nullable = true) |--
numTrans:integer (nullable = true) |-- numIntlTrans: integer (nullable = true)
|-- creditLine:integer (nullable = true) |-- fraudRisk: integer (nullable = true
)

fraud_df.groupby('gender').count().show() +------+------+ |gender|count |
+------+------+ | 1 |6178231| | 2 |3821769| +------+------+

numerical = ['balance', 'numTrans', 'numIntlTrans'] desc =
fraud_df.describe(numerical)desc.show()

1）所有的特征都是正倾斜的，最大值是平均数的几倍。
2）离散系数（coefficient of variation，或变异系数）非常高，接近甚至超过1，说明数据的离散程度很大，波动范围很大。

* 正倾斜（positively skewed）： 平均数 >

* 负倾斜：同理。
* 离散系数 = 标准差 / 平均值

fraud_df.agg({'balance': 'skewness'}).show()

mean() , min() , skewness() , stddev() , stddev_pop() , stddev_samp() , sum() ,
sumDistinct() , var_pop() , var_samp() and variance() 等。

fraud_df.corr('balance', 'numTrans')

n_numerical = len(numerical) corr = [] for i in range(0, n_numerical): temp =
[None] * ifor j in range(i, n_numerical): temp.append
(fraud_df.corr(numerical[i], numerical[j])) corr.append(temp)

3.3 可视化

%matplotlib inline import matplotlib.pyplot as plt

* 汇总workers中的数据，并返回一个汇总的bin列表，并在直方图的每个bin中计数给driver。
* 将所有数据返回给driver，并允许绘图库的方法为你完成这项工作。
* 对数据进行采样，然后将它们返回给driver进行绘图。

hists = fraud_df.select('balance').rdd.flatMap( lambda row: row ).histogram(20)

data = { 'bins': hists[0][:-1], 'freq': hists[1] } plt.bar(data['bins'],
data['freq'], width=2000) plt.title('Histogram of \'balance\'')