Spark-MLlib学习日记8:K-Means的扩展学习

前言

关于聚类的经典算法K-Means 算法在第一期就已经讲过了,需要回顾的同学点这里,现在顺着spark的api继续看下去,又看到聚类这一块,就着重讲一下基于流数据的K-Menas算法使用,因为最近比较忙的,所以感觉又一些东西都还没很完整的看完,比如回归的算法,还有聚类的一些统计学方法,这些都会在今后慢慢补上,给自己一个小目标,今年6月底把spark的api全部都过一遍,然后开始又针对地去做人脸识别相关的学习,还有神经网络和深度学习,在此之前希望我能打下较好的基础。

有点扯远了,看回今天的主题,Streaming k-means——基于流的k-means算法,其实就是在原有的k-means算法里面引入流的概念,并放在分布式的集群上去做,下面我们就来简单了解一下。

流式k-means算法

spark官网讲的挺清楚的,我就翻译下搬过来了,官网地址:点这里

当数据是以流的方式到达的时候,我们可能想动态地去估算(estimate)聚类的簇,通过新的到达的数据来更新它们。spark.mllib支持流式k-means聚类,并且可以通过参数控制估计衰减(decay)( 或“忽略”(forgetfulness) )。 这个算法使用一般地小批量更新规则来更新簇。

对每批新到的数据,我们首先将点分配给距离它们最近的簇,然后计算新的数据中心,最后更新每一个簇。使用的公式如下所示:
$$\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation}$$
$$\begin{equation} n_{t+1} = n_t + m_t \end{equation}$$
在上面的公式中,$c_{t}$表示前一个簇中心,$n_{t}$表示至今位置分配给这个簇的点的数量, $x_{t}$表示从当前批数据中计算出来的簇中心,$m_{t}$表示当前添加的这批数据的点数量。衰减因子 $\alpha$ 可以被用来忽略过去的聚类中心:当 $\alpha$ 等于1时,所有的批数据赋予相同的权重,当 $\alpha​$ 等于0时,数据中心点完全通过当前数据确定。

这里我想稍微讲一下这个衰减因子 $\alpha$ ,看他在公式中,当 $\alpha = 0$ 的时候,其实就相当于完全忽略掉前面历史累积下来的聚类中心的结果了,所以说他是“衰减因子”挺贴切的,这个系数可以在每一次迭代中消减历史数据带来的影响,这个系数越大,则代表着越看重最新的数据。要知道源源不断的流数据,其实还是挺看重实时数据的,比如说你冬天喜欢买羽绒,到了夏天不可能还这么喜欢买羽绒吧。。。

衰减因子$\alpha$ 也可以通过halfLife参数联合 时间单元(time unit)来确定,时间单元可以是一批数据也可以是一个数据点。假如数据从t时刻到来并定义了halfLifeh, 在t+h时刻,应用到t时刻的数据的折扣(discount)为0.5。

halfLife翻译过来就是半衰期,挺有意思的,这个参数可以让衰减因子随着时间来动态调整,越来越依赖新的数据,在过完一个时间单元(或者叫周期吧)重置。应该说这个衰减因子的变化在每个时间单元中独立生效。

算法步骤

流式k-means算法的步骤如下所示:

  • (1)分配新的数据点到离其最近的簇;
  • (2)根据时间单元(time unit)计算折扣(discount)值,并更新簇权重;
  • (3)应用更新规则;
  • (4)应用更新规则后,有些簇可能消失了,那么切分最大的簇为两个簇。

应用场景

流式K-means的应用场景还是挺广阔的,比如在实时推荐预测系统方面,比如广告推荐、商业预测之类的,由于推荐预测系统对数据时效性的敏感度较高,而且其数据处于连续实时且快速的变化,所以必须建立起流式的机器学习应用,从而对流式的数据进行实时的预测分析与处理,这对于商业分析与运营而言将十分关键。

另外还有一篇论文,将该算法应用在数据安全领域,用来实时验证分析大数据环境下的DDoS攻击检测,参考论文:《基于Spark Streaming的实时数据分析系统及其应用》

代码demo

这里训练数据我们还是用回之前k-means用过的数据集,值得一提的是,这里的测试集需要改成(1.0), [1.7, 0.4, 0.9]这种标签向量的格式。我们就把上一个k-means聚类出来的结果作为这里的测试集试试。

每个训练点应格式化为[x1, x2, x3],并且每个测试数据点应格式化为(y, [x1, x2, x3]),其中y是一些有用的标签或标识符(例如,真正的类别分配)

除此以外还要注意的是,spark streaming读取文件的时候,旧文件是不识别的,他只会读取新创建的文件,也就是说我们要单独写一个文件IO,把训练集和测试集以流形式给写进去。。。不过生产环境的时候,就可以上kafka来做信息读取了。

下面贴出训练的主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package SparkMLlib.Clustering

import org.apache.spark.SparkConf
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author: voidChen
* @Date: 2019/3/19 17:24
* @Version 1.0
*/
object StreamingKMeansExample {
def main(args: Array[String]) {
val trainingPath = "D:\\data\\train\\" //训练数据集文件路径
// val testPath = userPath + "/src/main/resources/data/StreamingKmeans-test.txt" //测试数据集文件路径
val testPath = "D:\\data\\test\\" //测试数据集文件路径

val conf = new SparkConf().setAppName("StreamingKMeansExample")
.setMaster("local[*]") //TODO: 生成打包前,需注释掉此行

val ssc = new StreamingContext(conf, Seconds(10)) //数据量太少这里设置了1秒给它。。

val trainingData = ssc.textFileStream(trainingPath)
.filter(!isColumnNameLine(_))
.map(line => {//处理数据成标准向量
println(line)
Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
})
val testData = ssc.textFileStream(testPath).map(LabeledPoint.parse)


val model = new StreamingKMeans()
.setK(8) //聚类中心
.setDecayFactor(1.0) //损失因子
.setRandomCenters(8, 0.0) //初始化随机中心,只需要维数。 第一个参数是维数,第二个参数是簇权重

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
}


/**
* 只是用来去掉表头
* @param line
* @return
*/
def isColumnNameLine(line:String):Boolean = {
if (line != null && line.contains("Channel")) true
else false
}
}

再给出一个文件读写的简易demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def FileIODemo(): Unit ={
val f= new File("D:\\data\\test.txt")
// val f= new File("D:\\data\\Wholesale customers data_training.txt")
val in = new InputStreamReader(new FileInputStream(f))
val out = new OutputStreamWriter(new FileOutputStream("D:\\data\\test\\test4.txt"))
// val out = new OutputStreamWriter(new FileOutputStream("D:\\data\\train\\train4.txt"))

val r = new BufferedReader(in)
val w = new BufferedWriter(out)
var lineTxt = Option(r.readLine)

while ( !lineTxt.isEmpty){
println(lineTxt.getOrElse(""))
w.write(lineTxt.getOrElse(""))
w.flush()
lineTxt = Option(r.readLine)
if(!lineTxt.isEmpty) w.newLine()
}

in.close
w.close()
}

完整代码请前往github查看,文件路径请灵性修改。。。另外数据集分别就是Wholesale customers data_training.txtStreamingKmeans-test.txt,我会放在github上:点我前往github

0%