流式DataFrame的操作

发布时间:2023-10-20 09:30

流式DataFrame的操作

  • 一、实验目的
  • 二、实验内容
  • 三、实验原理
  • 四、实验环境
  • 五、实验步骤
    • 5.1 启动Spark集群
    • 5.2 数据源说明
    • 5.3 结构化流DataFrame基本操作:投影和过滤
    • 5.4 结构化流DataFrame基本操作:数据聚合操作
    • 5.3 连接两个流DataFrame

申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址
全文共计3681字,阅读大概需要3分钟

一、实验目的

掌握流式DataFrame的选择、投影和聚合操作。
  掌握流式DataFrame的JOIN连接操作。

二、实验内容

1、读取json格式的文件数据源,并执行转换操作。
  2、连接两个DataFrame。

三、实验原理

使用流数据格式的DataFrame,可以将任何select和filter转换应用到它,以及应用任何在个别列上操作的Spark SQL函数。此外,基本聚合和高级分析函数也可用于流DataFrame。
  在流DataFrame中,不支持以下DataFrame转换,因为它们太过复杂,无法维护状态,或者由于流数据的无界性。
• 在流DataFrame上的多个聚合或聚合链。
• limit和take N行。
• distinct转换。但是,有一种方法可以使用唯一标识符来删除重复数据。
• 在没有任何聚合的情况下对流DataFrame进行排序。然而,在某种形式的聚合之后排序是得到支持的。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器
  软件:JDK 1.8.162,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

5.1 启动Spark集群

1、在终端窗口下,输入以下命令,分别启动Spark集群:

1.	$ start-dfs.sh
2.	$ cd /opt/spark
3.	$ ./sbin/start-all.sh

2、在HDFS上面创建文件

1.	hdfs dfs -mkdir -p /data/dataset/streaming

3、启动spark-shell。在终端窗口下,执行如下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)

1.	$ spark-shell --master spark://localhost:7077

5.2 数据源说明

创建一个数据源文件,文件存放位置位于”/data/dataset/streaming/“目录下。在终端窗口下,执行以下命令:

1.	$  cd  /data/dataset/streaming/
2.	$  vi language.json
   文件内容如下:
1.	{"name":"python","age":12}
2.	{"name":"scala","age":23}
3.	{"name":"python","age":15}

5.3 结构化流DataFrame基本操作:投影和过滤

1、首先创建一个Schema。在spark-shell窗口下,分别执行如下代码:

1.	import org.apache.spark.sql.types._
2.	     
3.	val personSchema = new StructType().add("name", StringType,false).add("age", IntegerType,false)

2、读取文件数据源。在spark-shell窗口下,分别执行如下代码:

1.	val data = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming/")

3、过滤和投影。选择查看数据中的name 和 age 字段,并且过滤 age 大于 12的人。在spark-shell窗口下,执行如下代码:

1.	val result1 = data.select($"name",$"age").where($"age" > 12)

4、将流查询结果输出到控制台,并启动流程序。在spark-shell窗口下,执行如下代码:

1.	val query1 = result1.writeStream.format("console").outputMode("append").start()

5、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:

1.	$ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/

6. 切换回spark-shell窗口,可以看到如下的输出结果:

7、结束流程序运行。在spark-shell窗口下,执行如下代码:

1.	query1.stop

5.4 结构化流DataFrame基本操作:数据聚合操作

1、删除HDFS目录下的文件

1.	$ hdfs dfs -rm /data/dataset/streaming/language.json

2、按name字段对数据进行分组,并分组对age字段进行求和。在spark-shell窗口下,执行如下代码:

1.	import org.apache.spark.sql.types._
2.	     
3.	val personSchema = new StructType()add("name", StringType,false).add("age", IntegerType,false)
4.	     
5.	val data = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming")
6.	     
7.	val result2 = words.groupBy($"name").sum("age")
8.	     
9.	val query2 = result2.writeStream.format("console").outputMode("complete").start()

3、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:

1.	$ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/

4. 切换回spark-shell窗口,可以看到如下的输出结果:

由上面结果可以看出对两条python的数据中的age值进行了求和。
  5、结束流程序运行。在spark-shell窗口下,执行如下代码:

1.	query2.stop

5.3 连接两个流DataFrame

可以用一个streaming DataFrame来做最酷的事情之一,就是join一个静态的DataFrame或者另一个streaming DataFrame。从Spark 2.3开始,结构化流支持join两个streaming DataFrames。
  1、删除HDFS目录下的文件

1.	$ hdfs dfs -rm /data/dataset/streaming/language.json

2、读取流数据。在spark-shell窗口下,执行如下代码:

1.	import org.apache.spark.sql.types._
2.	     
3.	val personSchema = new StructType().add("name", StringType,false).add("age", IntegerType,false)
4.	     
5.	val dataDF = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming")

3、通过join把两个DataFrame连接在一起(按name字段进行连接)。在spark-shell窗口下,执行如下代码:

1.	val df = dataDF.join(dataDF, "name")

4、输出连接后的DataFrame。在spark-shell窗口下,执行如下代码:

1.	val query = df.writeStream.format("console").outputMode("append").start()

5、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:

1.	$ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/

6. 切换回spark-shell窗口,可以看到如下的输出结果:

7、结束流程序运行。在spark-shell窗口下,执行如下代码:

1.	query.stop

— END —

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号