协作过滤

协同过滤通常用于推荐系统。这些技术旨在填补用户项目关联矩阵的缺失条目。
spark.ml目前支持基于模型的协作过滤,其中用户和产品由一组可用于预测缺失条目的潜在因素来描述。 spark.ml使用交替最小二乘(ALS)
算法来学习这些潜在因素。实现中spark.ml有以下参数:

numBlocks是为了并行化计算而将用户和项目划分到的块的数量(默认为10)。
rank是模型中潜在因素的数量(默认为10)。
maxIter是要运行的最大迭代次数(默认为10)。
regParam指定ALS中的正则化参数(默认为1.0)。
implicitPrefs指定是使用显式反馈 ALS变体还是使用 隐式反馈数据(默认为false使用显式反馈的手段)。
alpha是一个适用于ALS的隐式反馈变量的参数,该变量管理偏好观察值的 基线置信度(默认值为1.0)。
nonnegative指定是否对最小二乘使用非负约束(默认为false)。
注意: ALS的基于DataFrame的API目前仅支持用户和项目ID的整数。用户和项目ID列支持其他数字类型,但ID必须在整数值范围内。

显式与隐式反馈

基于矩阵分解的协同过滤的标准方法将用户项矩阵中的条目视为由用户给予该项的明确偏好,例如,给予电影评级的用户。


在许多真实世界的用例中,通常只能访问隐式反馈(例如查看,点击,购买,喜欢,共享等)。用于spark.ml处理这些数据的方法取自隐式反馈数据集的协作过滤。本质上,这种方法不是直接对收视率矩阵进行建模,而是将数据视为代表实力的数字观察用户操作(例如点击次数或某人观看电影的累计持续时间)。然后,这些数字与观察到的用户偏好的信心水平相关,而不是给予项目的明确评分。该模型然后试图找出可用于预测用户对物品的预期偏好的潜在因素。
from __future__ import print_function import sys if sys.version >= '3': long =
intfrom pyspark.sql import SparkSession # $example on$ from
pyspark.ml.evaluationimport RegressionEvaluator from pyspark.ml.recommendation
import ALS from pyspark.sql import Row from pyspark import SparkContext,
SparkConf# $example off$ if __name__ == "__main__": spark = SparkSession\
.builder\ .appName("ALSExample")\ .getOrCreate() inputFile =
"hdfs://node1:8020/mv_training/training_set_stamp_little.txt" outputFile =
"hdfs://node1:8020/data/test" #lines = SparkContext(inputFile) #$example on$
#lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
lines = spark.read.text(inputFile).rdd parts = lines.map(lambda row:
row.value.split(",")) ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]),
movieId=int(p[1]), rating=float(p[2]), timestamp=long(p[3]))) ratings =
spark.createDataFrame(ratingsRDD) (training, test) = ratings.randomSplit([0.8,
0.2]) # Build the recommendation model using ALS on the training data # Note we
set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId",
ratingCol="rating", coldStartStrategy="drop") model = als.fit(training) #
Evaluate the model by computing the RMSE on the test data predictions =
model.transform(test) evaluator = RegressionEvaluator(metricName="rmse",
labelCol="rating", predictionCol="prediction") rmse =
evaluator.evaluate(predictions) print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user userRecs =
model.recommendForAllUsers(2) # Generate top 10 user recommendations for each
movie movieRecs = model.recommendForAllItems(2) # $example off$ # print
(userRecs) # print (movieRecs)
#userRecs.write.format("text").save("hdfs://node1/output/result_little.txt")
userRecs.write.format("json").save("file:///root/output/result_little.json")
#userRecs.saveAsTextFile("file:///root/output/result_little") userRecs.show()
movieRecs.show()
完整的示例代码:Spark repo中 “examples/src/main/python/ml/als_example.py”
如果评级矩阵是从另一个信息源(即它是从其他信号推断)得出,可以设置implicitPrefs为True获得更好的效果:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True, userCol="userId",
itemCol="movieId", ratingCol="rating")
官网链接:Collaborative Filtering
<http://spark.apache.org/docs/latest/ml-collaborative-filtering.html#collaborative-filtering>

得到的结果为DataFrame类型,结果输出到文件即使用DataFrame提供的接口,下例输出为json文件,txt文件即为format(“text”),csv文件即为format(“csv”),:
userRecs.write.format("json").save("file:///root/output/result_little.json")