Spark中如何使用矩阵运算间接实现i2i

发布于 2021-05-13 21:39 ,所属分类:数据库和大数据技术学习资料

本文主要包含以下几部分:

  • 1.背景

  • 2.Spark支持的数据类型

    • 2.1 Local Vector(本地向量)

    • 2.2 Labeled point(带标签的点)

    • 2.3 Local Matrix(本地矩阵)

    • 2.4 Distributed Matrix(分布式矩阵)

    • 2.4.4 BlockMatrix

  • 3.相似度计算原理探索

    • 3.1 相似度计算

    • 3.2 公式拆解

    • 3.3 矩阵并行

    • 3.4 阅读源码

  • 4.Spark实现Item相似度计算

1.背景

之前小编在计算两两用户的item重合度,根据item重合度去评估两个用户之间的相似度,根据条件进行过滤之后大概有3000个用户,但每个用户对应的item量参差不齐,有上百万的,有几千的,这样在去构建笛卡尔积的时候,进行item数据关联,得到的用户集就会特别大,spark运行的时候就会很慢,而且会出现很严重的数据倾斜。这个时候了解到了spark支持的数据类型,看到了CoordinateMatrix,然后深究其原理,便看到了这篇文章,经过整理形成了此文。

本文出自「xingoo」在原文的基础上加以小编自己的理解形成的学习笔记,希望对读者有帮助。原文链接:Spark MLlib 之 大规模数据集的相似度计算原理探索

2.Spark支持的数据类型

官方文档地址:https://spark.apache.org/docs/latest/mllib-data-types.html

2.1 Local Vector(本地向量)

本地向量是从0开始的下标和double类型的数据组成,存储在本地机器上,所以称为Local Vector。它支持两种形式:

  • Dense (密集的向量)
  • Sparse (稀疏的向量)

比如一个向量[1.0,0.0,3.0],用Dense表示为:[1.0,0.0,3.0],用Sparse表示为:(3,[0,2],[1.0,3.0]),其中3为向量的长度,[0,2]表示元素[1.0,3.0]的位置,可见sparse形式下0.0是不存储的。

importorg.apache.spark.mllib.linalg.Vectors

valdenseVector=Vectors.dense(1.0,0.0,3.0)
valsparseVector1=Vectors.sparse(3,Array(0,2),Array(1.0,3.0))
valsparseVector2=Vectors.sparse(3,Seq((0,1.0),(2,3.0)))

println(s"DenseVectoris:$denseVector")
println(s"DenseVectortoSparseis:${denseVector.toSparse}")

println(s"sparseVector1is:$sparseVector1")
println(s"sparseVector1toDenseis:${sparseVector1.toDense}")

println(s"sparseVector2is:$sparseVector2")
println(s"sparseVector2toDenseis:${sparseVector2.toDense}")

输出为:

DenseVectoris:[1.0,0.0,3.0]
DenseVectortoSparseis:(3,[0,2],[1.0,3.0])

sparseVector1is:(3,[0,2],[1.0,3.0])
sparseVector1toDenseis:[1.0,0.0,3.0]

sparseVector2is:(3,[0,2],[1.0,3.0])
sparseVector2toDenseis:[1.0,0.0,3.0]

2.2 Labeled point(带标签的点)

labeled point由本地向量组成,既可以是dense向量,也可以是sparse向量。在mllib中常用于监督类算法,使用double类型来保存该类型的数据,因为也可以用于回归和分类算法。例如二分类,label可以是0(负例)或1(正例),对于多分类,label可以是0,1,2...

importorg.apache.spark.mllib.linalg.Vectors
importorg.apache.spark.mllib.regression.LabeledPoint

valpos=LabeledPoint(1.0,Vectors.dense(1.0,0.0,3.0))
valneg=LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))

sparse data

稀疏数据存储是非常普遍的现象,mllib支持读取libsvm格式的数据,其数据格式如下:

labelindex1:value1,index2:value2...

其读取方式包括:

importorg.apache.spark.mllib.util.MLUtils

//method1
spark.read.format("libsvm").load("libsvmdatapath")

//method2
MLUtils.loadLibSVMFile(spark.sparkContext,"libsvmdatapath")

2.3 Local Matrix(本地矩阵)

local matrix由行下标,列索引和double类型的值组成,存储在本地机器上,mllib支持密集矩阵和稀疏矩阵,其存储是按照列进行存储的。

例如下面的为密集矩阵:

上面两个向量(x1,y1)和(x2,y2)计算夹角的余弦值就是两个向量方向的相似度,其公式为:

其中,表示的模,即每一项的平方和再开方。

3.2 公式拆解

那么如果向量不只是两维,而是n维呢?比如有两个向量:

第一个向量:

第二个向量:

他们的相似度计算方法套用上面的公式为:

通过上面的公式就可以发现,夹角余弦可以拆解成每一项与另一项对应位置的乘积x1∗y1,再除以每个向量自己的

注意,矩阵里面都是一列代表一个向量....上面是创建矩阵时的三元组,如果在spark中想要创建matrix,可以这样:

valdf=spark.createDataFrame(Seq(
(0,0,1.0),
(1,0,1.0),
(2,0,1.0),
(3,0,1.0),
(0,1,2.0),
(1,1,2.0),
(2,1,1.0),
(3,1,1.0),
(0,2,3.0),
(1,2,3.0),
(2,2,3.0),
(0,3,1.0),
(1,3,1.0),
(3,3,4.0)
))

valmatrix=newCoordinateMatrix(df.map(row=>MatrixEntry(row.getAs[Integer](0).toLong,row.getAs[Integer](1).toLong,row.getAs[Double](2))).toJavaRDD)

然后计算每一个向量的normL2,即平方和开根号。

以第一个和第二个向量计算为例,第一个向量为(1,1,1,1),第二个向量为(2,2,1,1),每一项除以对应的normL2,得到后面的两个向量:

两个向量最终的相似度为0.94。

那么在Spark如何快速并行处理呢?通过上面的例子,可以看到两个向量的相似度,需要把每一维度乘积后相加,但是一个向量一般都是跨RDD保存的,所以可以先计算所有向量的第一维,得出结果

最后对做一次reduceByKey累加结果即可.....

3.4 阅读源码

首先创建dataframe形成matrix:

importorg.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}importorg.apache.spark.sql.SparkSessionobjectMatrixSimTest{defmain(args:Array[String]):Unit={//创建dataframe,转换成matrixvalspark=SparkSession.builder().master("local[*]").appName("sim").getOrCreate()spark.sparkContext.setLogLevel("WARN")importspark.implicits._valdf=spark.createDataFrame(Seq((0,0,1.0),(1,0,1.0),(2,0,1.0),(3,0,1.0),(0,1,2.0),(1,1,2.0),(2,1,1.0),(3,1,1.0),(0,2,3.0),(1,2,3.0),(2,2,3.0),(0,3,1.0),(1,3,1.0),(3,3,4.0)))valmatrix=newCoordinateMatrix(df.map(row=>MatrixEntry(row.getAs[Integer](0).toLong,row.getAs[Integer](1).toLong,row.getAs[Double](2))).toJavaRDD)//调用sim方法valx=matrix.toRowMatrix().columnSimilarities()//得到相似度结果x.entries.collect().foreach(println)}}

得到的结果为:

MatrixEntry(0,3,0.7071067811865476)
MatrixEntry(0,2,0.8660254037844386)
MatrixEntry(2,3,0.2721655269759087)
MatrixEntry(0,1,0.9486832980505139)
MatrixEntry(1,2,0.9128709291752768)
MatrixEntry(1,3,0.596284793999944)

直接进入columnSimilarities方法看看是怎么个流程吧!

defcolumnSimilarities():CoordinateMatrix={
columnSimilarities(0.0)
}

内部调用了带阈值的相似度方法,这里的阈值是指相似度小于该值时,输出结果时,会自动过滤掉。

defcolumnSimilarities(threshold:Double):CoordinateMatrix={
//检查参数...

valgamma=if(threshold<1e-6){
Double.PositiveInfinity
}else{
10*math.log(numCols())/threshold
}

columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray,gamma)
}

这里的gamma用于采样,具体的做法咱们来继续看源码。然后看一下computeColumnSummaryStatistics().normL2.toArray这个方法:

defcomputeColumnSummaryStatistics():MultivariateStatisticalSummary={
valsummary=rows.treeAggregate(newMultivariateOnlineSummarizer)(
(aggregator,data)=>aggregator.add(data),
(aggregator1,aggregator2)=>aggregator1.merge(aggregator2))
updateNumRows(summary.count)
summary
}

之前有介绍这个treeAggregate是一种带“预reduce”的map-reduce,返回的summary,里面帮我们统计了每一个向量的很多指标,比如

currMean为每一个向量的平均值
currM2为每个向量的每一维的平方和
currL1为每个向量的绝对值的和
currMax为每个向量的最大值
currMin为每个向量的最小值
nnz为每个向量的非0个数

这里我们只需要currM2,它是每个向量的平方和。summary调用的normL2方法:

overridedefnormL2:Vector={
require(totalWeightSum>0,s"Nothinghasbeenaddedtothissummarizer.")

valrealMagnitude=Array.ofDim[Double](n)

vari=0
vallen=currM2.length
while(i<len){
realMagnitude(i)=math.sqrt(currM2(i))
i+=1
}
Vectors.dense(realMagnitude)
}

上面这步就是对平方和开个根号,这样就求出来了每个向量的分母部分。下面就是最关键的地方了:

private[mllib]defcolumnSimilaritiesDIMSUM(
colMags:Array[Double],
gamma:Double):CoordinateMatrix={
//一些参数校验

//对gamma进行开方
valsg=math.sqrt(gamma)//sqrt(gamma)usedmanytimes

//这里把前面算的平方根的值设置一个默认值,因为如果为0,除0会报异常,所以设置为1
valcolMagsCorrected=colMags.map(x=>if(x==0)1.0elsex)

//把抽样概率数组和平方根数组进行广播
valsc=rows.context
valpBV=sc.broadcast(colMagsCorrected.map(c=>sg/c))
valqBV=sc.broadcast(colMagsCorrected.map(c=>math.min(sg,c)))

//遍历每一行,计算每个向量该维的乘积,形成三元组
valsims=rows.mapPartitionsWithIndex{(indx,iter)=>
valp=pBV.value
valq=qBV.value
//获得随机值
valrand=newXORShiftRandom(indx)
valscaled=newArray[Double](p.size)
iter.flatMap{row=>
rowmatch{
caseSparseVector(size,indices,values)=>
//如果是稀疏向量,遍历向量的每一维,除以平方根
valnnz=indices.size
vark=0
while(k<nnz){
scaled(k)=values(k)/q(indices(k))
k+=1
}

//遍历向量数组,计算每一个数值与其他数值的乘机。
//比如向量(1,2,0,1)
//得到的结果为(0,1,value)(0,3,value)(2,3,value)
Iterator.tabulate(nnz){k=>
valbuf=newListBuffer[((Int,Int),Double)]()
vali=indices(k)
valiVal=scaled(k)
//判断当前列是否符合采样范围,如果小于采样值,就忽略
if(iVal!=0&&rand.nextDouble()<p(i)){
varl=k+1
while(l<nnz){
valj=indices(l)
valjVal=scaled(l)
if(jVal!=0&&rand.nextDouble()<p(j)){
//计算每一维与其他维的值
buf+=(((i,j),iVal*jVal))
}
l+=1
}
}
buf
}.flatten
caseDenseVector(values)=>
//跟稀疏同理
valn=values.size
vari=0
while(i<n){
scaled(i)=values(i)/q(i)
i+=1
}
Iterator.tabulate(n){i=>
valbuf=newListBuffer[((Int,Int),Double)]()
valiVal=scaled(i)
if(iVal!=0&&rand.nextDouble()<p(i)){
varj=i+1
while(j<n){
valjVal=scaled(j)
if(jVal!=0&&rand.nextDouble()<p(j)){
buf+=(((i,j),iVal*jVal))
}
j+=1
}
}
buf
}.flatten
}
}
//最后再执行一个reduceBykey,累加所有的值,就是i和j的相似度
}.reduceByKey(_+_).map{case((i,j),sim)=>
MatrixEntry(i.toLong,j.toLong,sim)
}
newCoordinateMatrix(sims,numCols(),numCols())
}

这样把所有向量的平方和广播后,每一行都可以在不同的节点并行处理了。

总结来说,Spark提供的这个计算相似度的方法有两点优势:

  • 通过拆解公式,使得每一行独立计算,加快速度
  • 提供采样方案,以采样方式抽样固定的特征维度计算相似度

不过杰卡德目前并不能使用这种方法来计算,因为杰卡德中间有一项需要对向量求dot,这种方式就不适合了;如果杰卡德想要快速计算,可以去参考LSH局部敏感哈希算法,这里就不详细说明了。

4.Spark实现Item相似度计算

这里使用的数据集是MovieLens,计算Item的相似度,为用户推荐部分没有实现,不过也比较简单,感兴趣的用户可以自己试着实现一下看看。

//加载数据(userid,itemid,score)=>(string,long,double)
valdataPath="data/ml-100k/ua.base"
valdataTemp:RDD[(String,(Long,Double))]=spark.sparkContext.textFile(dataPath).map(_.split("\t")).map(l=>(l(0),(l(1).toLong,l(2).toDouble)))

//理论为上userid 可能为设备id等字符串,所以进行编码
valuserIndex:RDD[(String,Long)]=dataTemp.map(_._1).distinct().zipWithIndex()

//(useridindex,itemid,score)
valdata:RDD[(Long,Long,Double)]=dataTemp.leftOuterJoin(userIndex).filter(_._2._2.nonEmpty)
.map(l=>(l._2._2.get,l._2._1._1,l._2._1._2))
.persist(StorageLevel.MEMORY_AND_DISK)
println(s"使用的数据条数为:${data.count()}")
data.take(3).foreach(l=>println(l))

valmatrix=data.map(_match{case(uuid,spuid,rate)=>MatrixEntry(uuid,spuid,rate)})

//newCoordinateMatrix(matrix)除了传入一个rdd之外
//还有另外两个参数,rows和cols,如果不传的话默认是i,j中的最大值
valtopicSims:CoordinateMatrix=newCoordinateMatrix(matrix)

//toRowMatrix()调用的是toIndexedRowMatrix().toRowMatrix()
valitemSim:CoordinateMatrix=topicSims.toRowMatrix().columnSimilarities()

valitemSimRDD=itemSim.entries.union(itemSim.entries.map(m=>MatrixEntry(m.j,m.i,m.value)))

println("生成计算结果...")
itemSimRDD.map(f=>(f.i.toLong,f.j.toLong,f.value)).take(10).foreach(l=>println(l))

Over!


我们不错过每一篇精彩
「搜索与推荐Wiki」猜你喜欢
1、推荐系统中稀疏特征Embedding的优化表示方法
2迁移学习与跨域推荐,以及解决跨域推荐的方法
3、最全面的推荐系统评估方法介绍
4、聊一聊海量gongzhong号下我是如何进行筛选和内容消费的



相关资源