发布时间:2023-09-28 12:30
掌握Spark机器学习管道中常用Estimator的使用。
1、使用IDF estimator,计算每个单词的重要性。
2、使用StringIndexer estimator来对电影类型进行编码。
3、使用OneHotEncoderEstimator estimator将分类值的索引编码为二元向量。
4、使用MinMaxScaler estimator对数值数据进行规范化。
5、使用MinMaxScaler estimator对数值数据进行标准化。
一个Estimator代表了一种机器学习算法,用来在训练数据集上训练或拟合机器学习模型。它实现了一个名为fit的方法,该方法接受一个DataFrame作为参数并返回一个机器学习模型。
Estimator所代表的算法可分为两类,一类是用于机器学习的算法,另一类是用于进行数据转换的算法。
从技术的角度来看,一个estimator有一个名为fit的函数,它在输入列上应用一个算法,结果被封装在一个叫做Model的对象类型中,它是一个Transformer类型。输入列和输出列名称可以在estimator的构造过程中指定。
下图描述了一个estimator及其输入和输出。
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:
1. $ cd /opt/spark
2. $ ./sbin/start-all.sh
3. $ zeppelin-daemon.sh start
然后使用jps命令查看启动的进程,确保Spark集群和Zeppelin服务器已经正确启动。
2、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页,点击”Create new note”链接,创建一个新的笔记本。如下图所示:
IDF estimator是用于处理文本的常用的estimators之一。它的名字是inverse document frequency(反转文档频率)的缩写。这个estimator经常在文本被分词和术语频率被计算之后立即使用。这个estimator背后的思想是通过计算它出现的文档数量来计算每个单词的重要性或权重。
在zeppelin中输入以下代码:
1. // 使用IDF estimator来计算每个单词的权重
2. import org.apache.spark.ml.feature.Tokenizer
3. import org.apache.spark.ml.feature.HashingTF
4. import org.apache.spark.ml.feature.IDF
5.
6. // 构造一个DataFrame,代表一个文档
7. val text_data = spark.createDataFrame(Seq(
8. (1, "Spark is a unified data analytics engine"),
9. (2, "Spark is cool and it is fun to work with Spark"),
10. (3, "There is a lot of exciting sessions at upcoming Spark summit"),
11. (4, "mllib transformer estimator evaluator and pipelines") )
12. ).toDF("id", "line")
13.
14. // 分析转换器
15. val tokenizer = new Tokenizer().setInputCol("line").setOutputCol("words")
16. val tkResult = tokenizer.transform(text_data)
17.
18. // HashingTF转换器
19. val tf = new HashingTF().setInputCol("words").setOutputCol("wordFreqVect").setNumFeatures(4096)
20. val tfResult = tf.transform(tkResult ) // Tokenizer transformer的输出列是HashingTF的输入
21.
22. // IDF estimator
23. // HashingTF转换器的输出是IDF estimator的输入
24. val idf = new IDF().setInputCol("wordFreqVect").setOutputCol("features")
25.
26. // 因为IDF是一个estimator,所以调用fit函数, 得到一个学习过的模型
27. val idfModel = idf.fit(tfResult)
28.
29. // 返回对象是一个模型(Model), 它是Transformer类型
30. val weightedWords = idfModel.transform(tfResult)
31. // weightedWords.select("label", "features").show(false)
32. weightedWords.select("features").show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
StringIndexer estimator是一个知道什么时候处理包含分类值的文本数据的estimator。它将一个分类值编码成一个基于其频率的索引,这样最频繁的分类值就会得到0的索引值,以此类推。
在zeppelin中输入以下代码:
1. // 使用StringIndexer estimator来对电影类型进行编码
2. import org.apache.spark.ml.feature.StringIndexer
3.
4. // 构造一个DataFrame
5. val movie_data = spark.createDataFrame(
6. Seq((1, "Comedy"),
7. (2, "Action"),
8. (3, "Comedy"),
9. (4, "Horror"),
10. (5, "Action"),
11. (6, "Comedy"))
12. ).toDF("id", "genre")
13.
14. // StringIndexer estimator
15. val movieIndexer = new StringIndexer().setInputCol("genre").setOutputCol("genreIdx")
16.
17. // 首先拟合数据
18. val movieIndexModel = movieIndexer.fit(movie_data)
19.
20. // 使用返回的transformer来转换该数据
21. val indexedMovie = movieIndexModel.transform(movie_data)
22.
23. // 查看结果
24. indexedMovie.orderBy("genreIdx").show()
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
OneHotEncoderEstimator estimator是另一种有用的分类值的estimator,它将分类值的索引编码为二元向量。这个estimator经常与StringIndexer estimator一起使用,其中StringIndexer的输出成为这个estimator的输入。
在zeppelin中输入以下代码:
1. // OneHotEncoderEstimator estimator消费StringIndexer estimator的输出
2. import org.apache.spark.ml.feature.OneHotEncoderEstimator
3.
4. // 输入列genreIdx是之前示例中StringIndex的输出列
5. val oneHotEncoderEst = new OneHotEncoderEstimator().setInputCols(Array("genreIdx"))
6. .setOutputCols(Array("genreIdxVector"))
7.
8. // 指使indexedMovie数据(在上一个示例中产生的)
9. val oneHotEncoderModel = oneHotEncoderEst.fit(indexedMovie)
10. val oneHotEncoderVect = oneHotEncoderModel.transform(indexedMovie)
11.
12. // 显示
13. oneHotEncoderVect .orderBy("genre").show()
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
规范化数值数据是将其原始范围映射到从0到1的范围的过程。当observations有多个不同范围的属性时,这一点特别有用。
在zeppelin中输入以下代码:
1. // 使用MinMaxScaler estimator来重新调节特征
2. import org.apache.spark.ml.feature.MinMaxScaler
3. import org.apache.spark.ml.linalg.Vectors
4.
5. // 构造DataFrame
6. val employee_data = spark.createDataFrame(
7. Seq((1, Vectors.dense(125400, 5.3)),
8. (2, Vectors.dense(179100, 6.9)),
9. (3, Vectors.dense(154770, 5.2)),
10. (4, Vectors.dense(199650, 4.11)))
11. ).toDF("empId", "features")
12.
13. // MinMaxScaler estimator
14. val minMaxScaler = new MinMaxScaler().setMin(0.0)
15. .setMax(5.0)
16. .setInputCol("features")
17. .setOutputCol("scaledFeatures")
18.
19. // 拟合数据,建立模型
20. val scalerModel = minMaxScaler.fit(employee_data)
21.
22. // 使用学习到的模型对数据集进行转换
23. val scaledData = scalerModel.transform(employee_data)
24.
25. // 输出特征缩放到的范围
26. println(s"特征缩放到的范围: [${minMaxScaler.getMin},${minMaxScaler.getMax}]")
27.
28. // 显示结果
29. scaledData.select("features", "scaledFeatures").show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
除了数值数据规范化之外,另一个经常用于处理数值数据的操作称为标准化(standardization.)。当数值数据有一个接近与钟形曲线关闭的分布,这个操作尤其适用。标准化操作可以帮助将数据转换为标准化形式,其中数据的范围为-1和1,平均值为0。
在zeppelin中输入以下代码:
1. // 使用StandardScaler estimator标准化围绕均值0的特征
2. import org.apache.spark.ml.feature.StandardScaler
3. import org.apache.spark.ml.linalg.Vectors
4.
5. // 构造DataFrame
6. val employee_data = spark.createDataFrame(Seq(
7. (1, Vectors.dense(125400, 5.3)),
8. (2, Vectors.dense(179100, 6.9)),
9. (3, Vectors.dense(154770, 5.2)),
10. (4, Vectors.dense(199650, 4.11)))
11. ).toDF("empId", "features")
12.
13. // 将单位标准偏差设置为true并围绕平均值
14. val standardScaler = new StandardScaler().setWithStd(true)
15. .setWithMean(true)
16. .setInputCol("features")
17. .setOutputCol("scaledFeatures")
18.
19. // 拟合数据,建立模型
20. val standardMode = standardScaler.fit(employee_data)
21.
22. // 使用学习到的模型对数据集进行转换
23. val standardData = standardMode.transform(employee_data)
24.
25. // 显示结果
26. standardData.show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
一个Estimator代表了一种机器学习算法,用来在训练数据集上训练或拟合机器学习模型。它实现了一个名为fit的方法,该方法接受一个DataFrame作为参数并返回一个机器学习模型。
Estimator所代表的算法可分为两类,一类是用于机器学习的算法,另一类是用于进行数据转换的算法。例如,称为LinearRegression的ML算法就属于第一种类型,它的fit方法返回一个LinearRegressionModel类的实例。它用于诸如预测房价等回归任务。而StringIndexer就属于第二种类型,它用来将一列的分类值编码成索引,这样每个分类值的索引值都是基于它出现在DataFrame的整个输入列中的频率。
从技术的角度来看,一个estimator有一个名为fit的函数,它在输入列上应用一个算法,结果被封装在一个叫做Model的对象类型中,它是一个Transformer类型。输入列和输出列名称可以在estimator的构造过程中指定。