3 Prepare Data for Modeling
所有的数据都是脏的,不管是从网上下载的数据集,或其他来源。直到你测试和证明你的数据处于干净状态才能用来建模。因此,为了建模需要清理数据集,还需要检查数据集的特征分布,并确认它们符合预定义的标准。
3.1 检查重复项、缺失值和异常值
* 重复项
生成一个简单的dataframe如下:
>>> df = spark.createDataFrame([ ... (1,144.5,5.9,33,'M'), ... (2,167.2,5.4,45,
'M'), ... (3,124.1,5.2,23,'F'), ... (4,144.5,5.9,33,'M'), ... (5,133.2,5.7,54,
'F'), ... (3,124.1,5.2,23,'F'), ... (5,129.2,5.3,42,'M'), ... ],['id','weight',
'height','age','gender'])
显然,这个数据只有几行,一眼就可以发现是否有重复值。但对于百万级别的数据呢?
第一件事,就是用.distinct()方法检查。
>>> print('count of rows: {0}'.format(df.count())) count of rows: 7 >>> print('
count of distinct rows: {0}'.format(df.distinct().count())) count of distinct
rows:6
然后,用 .dropDuplicates(…)去掉重复项。
>>> df = df.dropDuplicates() >>> df.show() +---+------+------+---+------+ |
id|weight|height|age|gender| +---+------+------+---+------+ | 5| 133.2| 5.7|
54| F| | 5| 129.2| 5.3| 42| M| | 1| 144.5| 5.9| 33| M| | 4| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|| 3| 124.1| 5.2| 23| F| +---+------+------+---+------+
上面的代码根据ID列去除了完全重复的一行,我们可以使用ID列以外的列再次去重。
>>> print ('count of ids: {0}'.format(df.count())) count of ids: 6 >>> print('
count of distinct ids: {0}'.format(df.select([c for c in df.columns if c!='id
']).distinct().count())) count of distinct ids: 5 >>> df =
df.dropDuplicates(subset = [c for c in df.columns if c!='id']) >>> df.show()
+---+------+------+---+------+ | id|weight|height|age|gender|
+---+------+------+---+------+ | 5| 133.2| 5.7| 54| F| | 1| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M| | 3| 124.1| 5.2| 23| F|| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+
现在我们知道没有完整的行重复,或任何相同的行只有ID不同,我们来检查是否有重复的ID。
要在一步中计算ID的总数和不同数量,我们可以使用.agg(…)方法
>>> import pyspark.sql.functions as fn >>> df.agg(fn.count('id').alias('count'
),... fn.countDistinct('id').alias('distinct')).show() +-----+--------+
|count|distinct| +-----+--------+ | 5| 4| +-----+--------+
使用 fn.monotonically_increasing_id() 方法重新分配ID。
>>> df.withColumn('new_id',fn.monotonically_increasing_id()).show() +---+------
+------+---+------+-------------+ | id|weight|height|age|gender| new_id|
+---+------+------+---+------+-------------+ | 5| 133.2| 5.7| 54| F|
25769803776| | 1| 144.5| 5.9| 33| M| 171798691840| | 2| 167.2| 5.4| 45| M|
592705486848| | 3| 124.1| 5.2| 23| F|1236950581248|| 5| 129.2| 5.3| 42|
M|1365799600128| +---+------+------+---+------+-------------+
* 缺失值
如果你的数据是离散的布尔值,可以通过添加第三个类别(缺失)将其转变为分类变量;
如果你处理的是连续数值,则可以用平均值,中位数或其他预定义值(例如,取决于数据的分布形状的第一或第三四分位数)替代缺失值。
>>> df_miss = spark.createDataFrame([ ... (1,143.5,5.6,28,'M',100000), ... (2,
167.2,5.4,45,'M',None), ... (3,None,5.2,None,None,None), ... (4,144.5,5.9,33,'M'
,None),... (5,133.2,5.7,54,'F',None), ... (6,124.1,5.2,None,'F',None)], ... [
'id','weight','height','age','gender','income']) >>>
df_miss_no_income=df_miss.select([cfor c in df_miss.columns if c!='income'])
>>> df_miss_no_income.dropna(thresh=3).show() +---+------+------+----+------+ |
id|weight|height| age|gender| +---+------+------+----+------+ |1| 143.5| 5.6| 28
| M| |2| 167.2| 5.4| 45| M| | 4| 144.5| 5.9| 33| M| | 5| 133.2| 5.7| 54| F| | 6|
124.1| 5.2|null| F| +---+------+------+----+------+
可以用.dropna(…)方法删除缺失值,用.fillna(…) 方法替换缺失值。
>>> means = df_miss_no_income.agg(*[fn.mean(c).alias(c) for c in df_miss_no_
income.columns if c !='gender']).toPandas().to_dict('records')[0] >>>
means['gender']='missing' >>> df_miss_no_income.fillna(means).show() +---+------
+------+---+-------+ | id|weight|height|age| gender|
+---+------+------+---+-------+ | 1| 143.5| 5.6| 28| M| | 2| 167.2| 5.4| 45| M|
| 3| 142.5| 5.2| 40|missing| | 4| 144.5| 5.9| 33| M| | 5| 133.2| 5.7| 54| F||
6| 124.1| 5.2| 40| F| +---+------+------+---+-------+
* 异常值
异常值是那些明显偏离样本其余部分分布的观测值。
一般定义为,如果所有的值大致在Q1-1.5IQR和Q3 + 1.5IQR范围内,那么可以认为没有异常值,其中IQR是四分位间距;
IQR被定义为第三四分位数Q3与第一四分位数Q1的差距。
备注:
* 第一四分位数 (Q1),又称“较小四分位数”,等于该样本中所有数值由小到大排列后第25%的数字。
* 第二四分位数 (Q2),又称“中位数”,等于该样本中所有数值由小到大排列后第50%的数字。
* 第三四分位数 (Q3),又称“较大四分位数”,等于该样本中所有数值由小到大排列后第75%的数字。 >>> df_outliers =
spark.createDataFrame([... (1, 143.5, 5.3, 28), ... (2, 154.2, 5.5, 45), ... (3,
342.3, 5.1, 99), ... (4, 144.5, 5.5, 33), ... (5, 133.2, 5.4, 54), ... (6, 124.1
,5.1, 21), ... (7, 129.2, 5.3, 42), ... ], ['id', 'weight', 'height', 'age'])
用 .approxQuantile(…)
方法计算四分位数,指定的第一个参数是列的名称,第二个参数可以是介于0或1之间的数字(其中0.5意味着计算中位数)或列表(在我们的例子中),第三个参数指定可接受的度量的误差(如果设置为0,它将计算度量的精确值,但这样做可能会非常耗资源)。
>>> cols = ['weight','height','age'] >>> bounds={} >>> for col in cols: ...
quantiles = df_outliers.approxQuantile(col,[0.25,0.75],0.05) ... IQR =
quantiles[1]-quantiles[0] ... bounds[col] = [quantiles[0]-1.5*IQR,quantiles[1]+
1.5*IQR] ...
筛选出异常值:
outliers = df_outliers.select(*['id'] + [ ( (df_outliers[c] < bounds[c][0]) | (
df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
outliers.show() df_outliers = df_outliers.join(outliers, on='id') df_outliers.
filter('weight_o').select('id', 'weight').show() df_outliers.filter('age_o'
).select('id', 'age').show()
3.2 描述性统计
按逗号切割,并将每个元素转换为一个整数:
>>> sc = spark.sparkContext >>> fraud = sc.textFile('ccFraud.csv.gz') >>>
header = fraud.first()>>> fraud = fraud.filter(lambda row: row != header).map(
lambda row: [int(elem) for elem in row.split(',')])
创建dataframe的schema:
>>> fields = [typ.StructField(h[1:-1],typ.IntegerType(),True) for h in
header.split(',')] >>> schema = typ.StructType(fields)
创建dataframe:
>>> fraud_df = spark.createDataFrame(fraud,schema)
查看schema:
>>> fraud_df.printSchema() root |-- custID: integer (nullable = true) |--
gender:integer (nullable = true) |-- state: integer (nullable = true) |--
cardholder:integer (nullable = true) |-- balance: integer (nullable = true) |--
numTrans:integer (nullable = true) |-- numIntlTrans: integer (nullable = true)
|-- creditLine:integer (nullable = true) |-- fraudRisk: integer (nullable = true
)
用.groupby(…)方法分组统计:
fraud_df.groupby('gender').count().show() +------+------+ |gender|count |
+------+------+ | 1 |6178231| | 2 |3821769| +------+------+
用 .describe()方法对数值进行描述性统计:
numerical = ['balance', 'numTrans', 'numIntlTrans'] desc =
fraud_df.describe(numerical)desc.show()
从上面的描述性统计可以看出两点:
1)所有的特征都是正倾斜的,最大值是平均数的几倍。
2)离散系数(coefficient of variation,或变异系数)非常高,接近甚至超过1,说明数据的离散程度很大,波动范围很大。
备注:
* 正倾斜(positively skewed): 平均数 >
中位数,由于数据中有些很大很大的极端值,使得整体平均数被极少数的极端大值拉大了,俗称“被平均”,而中位数受极端值的影响其实很小,因而此时用中位数作为中心趋势的估计比较稳健。
* 负倾斜:同理。
* 离散系数 = 标准差 / 平均值
检查某个特征的偏度:
fraud_df.agg({'balance': 'skewness'}).show()
常用其他函数包括:avg() , count() , countDistinct() , first() , kurtosis() , max() ,
mean() , min() , skewness() , stddev() , stddev_pop() , stddev_samp() , sum() ,
sumDistinct() , var_pop() , var_samp() and variance() 等。
特征间相互关系的另一个非常有用的度量是相关性(correlation)。
你的模型通常应该只包括那些与你的目标高度相关的特征。然而,检查这些特征之间的相关性几乎是同等重要的,最好选择的是特征之间几乎不相关,而同时特征与目标高度相关。
只要数据是DataFrame格式,在PySpark中计算相关性非常容易。唯一的困难是.corr(…)
方法现在支持Pearson相关系数,而它只能计算成对的相关性,如下:
fraud_df.corr('balance', 'numTrans')
创建一个相关矩阵:
n_numerical = len(numerical) corr = [] for i in range(0, n_numerical): temp =
[None] * ifor j in range(i, n_numerical): temp.append
(fraud_df.corr(numerical[i], numerical[j])) corr.append(temp)
可以看见特征之间几乎不存在相关性,因此,所有的特征都能用到我们的模型中。
3.3 可视化
准备工作:
%matplotlib inline import matplotlib.pyplot as plt
直方图(Histograms) 是评估特征分布最简单的方法。
用pyspark有三种方法生成直方图:
* 汇总workers中的数据,并返回一个汇总的bin列表,并在直方图的每个bin中计数给driver。
* 将所有数据返回给driver,并允许绘图库的方法为你完成这项工作。
* 对数据进行采样,然后将它们返回给driver进行绘图。
如果数据是几百万行,第二种方法显然不可取。因此需要先聚合数据。
hists = fraud_df.select('balance').rdd.flatMap( lambda row: row ).histogram(20)
绘图:
data = { 'bins': hists[0][:-1], 'freq': hists[1] } plt.bar(data['bins'],
data['freq'], width=2000) plt.title('Histogram of \'balance\'')
热门工具 换一换