发布时间:2024-04-03 08:01
当数据量少时,并不会严格根据抽取分数抽取
rdd = sc.parallelize(range(10))
print(rdd.sample(1,0.5).collect())
[0, 1, 2, 4, 6, 6, 7, 7, 7]
1或者True
rdd = sc.parallelize(range(10))
print(rdd.sample(0,0.5).collect())
[1, 3, 4, 6, 7, 8]
0或者False
pcq9 = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\作业\\第一次作业\\student.txt\")
pcq9.collect()
pcq9.foreach(print)
# 找到13班的同学
>>> rdd = pcq9.filter(lambda x:eval(x.split()[0])==13)
>>> rdd.collect()
[\'13 张大三 24 男 chinese 60\', \'13 张大三 24 男 math 60\', \'13 张大三 24 男 english 70\', \'13 李大四 20 男 chinese 50\', \'13 李大四 20 男 math 60\', \'13 李大四 20 男 english 50\', \'13 王小芳 17 女 chinese 70\', \'13 王小芳 17 女 math 80\', \'13 王小芳 17 女 english 70\']
>>>
>>> rdd = pcq9.map(lambda x:x.split())
>>> rdd.filter(lambda x:eval(x[0])==13).collect()
[[\'13\', \'张大三\', \'24\', \'男\', \'chinese\', \'60\'], [\'13\', \'张大三\', \'24\', \'男\', \'math\', \'60\'], [\'13\', \'张大三\', \'24\', \'男\', \'english\', \'70\'], [\'13\', \'李大四\', \'20\', \'男\', \'chinese\', \'50\'], [\'13\', \'李大四\', \'20\', \'男\', \'math\', \'60\'], [\'13\', \'李大四\', \'20\', \'男\', \'english\', \'50\'], [\'13\', \'王小芳\', \'17\', \'女\', \'chinese\', \'70\'], [\'13\', \'王小芳\', \'17\', \'女\', \'math\', \'80\'], [\'13\', \'王小芳\', \'17\', \'女\', \'english\', \'70\']]
>>>
pcq9.sortBy(lambda x:x).collect() # 升序
去掉表头
filter直接 过滤掉表头
rdd_pcq1 = rdd_pcq.filter(lambda x:x!=\'班级 姓名 年龄 性别 课程 成绩\')
>>> rdd_pcq = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\作业\\期中\\stu-score.txt\")
>>> rdd1 = rdd_pcq.first()
>>> print(rdd1)
班级 姓名 年龄 性别 课程 成绩
>>> rdd2 = rdd_pcq.filter(lambda x:x!=rdd1)
>>>
显示前5行
>>> rdd2.take(5)
[\'1011 张三 22 F chinese 78\', \'1011 张三 22 F math 80\', \'1011 张三 22 F english 70\', \'1011 李四 19 F chinese 82\', \'1011 李四 19 F math 73\']
>>>
>>> rdd2.collect()[:5]
[\'1011 张三 22 F chinese 78\', \'1011 张三 22 F math 80\', \'1011 张三 22 F english 70\', \'1011 李四 19 F chinese 82\', \'1011 李四 19 F math 73\']
>>>
persist(MEMORY_ONLY) 等于 cache()
unpersist() 销毁持久化
rdd = sc.parallelize([\'a\',\'b\',\'d\',\'c\'])
rdd.sortBy(lambda x:x,False).collect() # 降序
[\'d\', \'c\', \'b\', \'a\']
rdd.sortBy(lambda x:x).collect() # 默认为升序,也可以使用True
[\'a\', \'b\', \'c\', \'d\']
rdd = sc.parallelize(range(10))
rdd.sum()
45
rdd.reduce(lambda a,b:a+b)
45
rdd.mean() # 求平均值
4.5
>>> a = sc.parallelize([[45,89,56],[11,56]])
>>> rdd = a.map(lambda x:sum(list(x)))
>>> rdd.collect()
[190, 67]
>>>
直接创建
pairRDD = sc.parallelize([(2,5),(8,9),(4,5)])
pairRDD.collect()
[(2, 5), (8, 9), (4, 5)]
从文件中加载
rdd = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\word.txt\")
pairRDD = rdd.flatMap(lambda x:x.split())
pairRDD.collect()
[\'pan\', \'hello\', \'hadoop\', \'fan\', \'hello\', \'python\', \'panda\', \'good\']
pairRDD = pairRDD.map(lambda x:(x,1))
pairRDD.collect()
[(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'fan\', 1), (\'hello\', 1), (\'python\', 1), (\'panda\', 1), (\'good\', 1)]
创建列表操作
rdd = sc.parallelize([\'pan\', \'hello\', \'hadoop\', \'fan\', \'hello\', \'python\', \'panda\', \'good\'])
pairRDD = rdd.map(lambda x:(x,1))
pairRDD.collect()
[(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'fan\', 1), (\'hello\', 1), (\'python\', 1), (\'panda\', 1), (\'good\', 1)]
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'fan\', 1), (\'hello\', 1), (\'python\', 1), (\'panda\', 1), (\'good\', 1)])
>>> word1 = words.groupByKey()
>>> word1.foreach(print)
(\'hadoop\', <pyspark.resultiterable.ResultIterable object at 0x00000174798B18D0>)
(\'python\', <pyspark.resultiterable.ResultIterable object at 0x00000174798B17B8>)
(\'panda\', <pyspark.resultiterable.ResultIterable object at 0x00000174798B1898>)
(\'good\', <pyspark.resultiterable.ResultIterable object at 0x00000174798B17B8>)
(\'hello\', <pyspark.resultiterable.ResultIterable object at 0x0000023F5F6418D0>)
(\'fan\', <pyspark.resultiterable.ResultIterable object at 0x0000023F5F641898>)
(\'pan\', <pyspark.resultiterable.ResultIterable object at 0x00000228D8D418D0>)
>>> word1.mapValues(list).foreach(print)
(\'hadoop\', [1])
(\'python\', [1])
(\'panda\', [1])
(\'good\', [1])
(\'hello\', [1, 1])
(\'fan\', [1])
(\'pan\', [1])
>>>
对相同的键的值分组
返回的是一个字典,值是一个可迭代的列表(需要转换)
对groupByKey的值操作
reduceByKey(func)返回一个新的kv的值
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'pan\', 2), (\'hello\', 1), (\'python\',5), (\'good\', 1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)
(\'hadoop\', 1)
(\'python\', 5)
(\'good\', 1)
(\'hello\', 2)
(\'pan\', 3)
reduceByKey和groupByKey区别
>>> rdd = sc.parallelize([\'pan\', \'pan\',\'fan\', \'good\',\'fan\',\'pan\'])
>>> pairRDD = rdd.map(lambda x:(x,1))
>>> wordgroup = pairRDD.groupByKey().map(lambda x:(x[0],sum(x[1])))
>>> wordgroup.foreach(print)
(\'fan\', 2)
(\'good\', 1)
(\'pan\', 3)
>>> rdd10 = rdd9.map(lambda x:(x[0],sum(x[1])/len(x[1])))
# 因为groupByKey运行完后是一个列表,所以能用len来处理,但是RDDb
>>> wordreduce = pairRDD.reduceByKey(lambda a,b:a+b)
>>> wordreduce.foreach(print)
(\'fan\', 2)
(\'pan\', 3)
(\'good\', 1)
总结:
求和时,reduceByKey和groupByKey的效果差不多。
groupbykey用来求均值较为方便。
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'pan\', 2), (\'hello\', 1), (\'python\',5), (\'good\', 1)])
>>> words.keys().foreach(print)
pan
python
pan
hello
good
hadoop
hello
>>> word = words.keys()
>>> word.distinct().collect()
[\'hadoop\', \'python\', \'good\', \'hello\', \'pan\']
>>> word.distinct().count()
>>> words.values().foreach(print)
5
1
1
1
1
2
1
对键排序。参数默认为True,升序。False降序。
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'pan\', 2), (\'hello\', 1), (\'python\',5), (\'good\', 1)])
>>> words.sortByKey(False).foreach(print)
(\'hello\', 1)
(\'hello\', 1)
(\'hadoop\', 1)
(\'pan\', 1)
(\'pan\', 2)
(\'python\', 5)
(\'good\', 1)
当需要对值排序时,使用sortBy;但是有时候排序会不正确(会按照分区进行排序,对每一个分区进行排序),所以当需要对rdd所有的排序就需要将分区数设置为1。
>>> words.sortBy(lambda x:x[1]).foreach(print)
(\'pan\', 1)
(\'hello\', 1)
(\'hadoop\', 1)
(\'hello\', 1)
(\'good\', 1)
(\'pan\', 2)
(\'python\', 5)
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'pan\', 2), (\'hello\', 1), (\'python\',5), (\'good\', 1)],1)
或者words.repartition(1)
>>> words.glom().collect()
[[(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'pan\', 2), (\'hello\', 1), (\'python\', 5), (\'good\', 1)]]
>>> words.sortBy(lambda x:x[1]).foreach(print)
(\'pan\', 1)
(\'hello\', 1)
(\'hadoop\', 1)
(\'hello\', 1)
(\'good\', 1)
(\'pan\', 2)
(\'python\', 5)
>>> words.sortBy(lambda x:x[1],False).foreach(print)
(\'python\', 5)
(\'pan\', 2)
(\'pan\', 1)
(\'hello\', 1)
(\'hadoop\', 1)
(\'hello\', 1)
(\'good\', 1)
>>>
对每一个values处理,不处理key
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'pan\', 2), (\'hello\', 1), (\'python\',5), (\'good\', 1)])
>>> words.mapValues(lambda x:x+10).collect()
[(\'pan\', 11), (\'hello\', 11), (\'hadoop\', 11), (\'pan\', 12), (\'hello\', 11), (\'python\', 15), (\'good\', 11)]
先执行mapValues(func),然后再压平
list=[\'Hadoop\',\'Spark\',\'Hive\',\'spoon\']
rdd = sc.parallelize(list,2) # 默认为cpu个数
rdd.glom().collect() # 查看分区
len(rdd.glom().collect()) # 分区数量
rdd1 = rdd.repartition(3) # 重新分区
join共同拥有的
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1),(\'panda\',2)])
>>> word1 = sc.parallelize([(\'panda\',\'np\')])
>>> word2 = words.join(word1)
>>> word2.collect()
[(\'panda\', (2, \'np\'))]
>>>
>>> word1.leftOuterJoin(words).collect()
[(\'panda\', (\'np\', 2))]
>>> words.leftOuterJoin(word1).collect()
[(\'panda\', (2, \'np\')), (\'pan\', (1, None)), (\'hello\', (1, None))]
>>> word1.rightOuterJoin(words).collect()
[(\'panda\', (\'np\', 2)), (\'pan\', (None, 1)), (\'hello\', (None, 1))]
>>> words.rightOuterJoin(word1).collect()
[(\'panda\', (2, \'np\'))]
>>> words = sc.parallelize([(\'pan\', 1), (\'hello\', 1), (\'hadoop\', 1), (\'pan\', 2), (\'hello\', 1), (\'python\',5), (\'good\', 1)])
>>> words.countByKey()
defaultdict(<class \'int\'>, {\'pan\': 2, \'hello\': 2, \'hadoop\': 1, \'python\': 1, \'good\': 1})
>>> words.countByKey().items()
dict_items([(\'pan\', 2), (\'hello\', 2), (\'hadoop\', 1), (\'python\', 1), (\'good\', 1)])
>>> rdd = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\word.txt\")
>>> rdd1 = rdd.flatMap(lambda x:x.split())
>>> rdd2 = rdd1.map(lambda x:(x,1))
>>> rdd2.reduceByKey(lambda a,b:a+b).collect()
[(\'python\', 2), (\'panda\', 1), (\'fan\', 2), (\'hello\', 2), (\'spark\', 2)]
>>>
# 方法一(不推荐)
>>> word = sc.parallelize([(\'spark\',2),(\'hadoop\',4),(\'spark\',6),(\'hadoop\',6)])
>>> word1 = word.reduceByKey(lambda a,b:a+b)
>>> word2 = word1.map(lambda x:(x[0],x[1]/2))
>>> word2.collect()
[(\'hadoop\', 5.0), (\'spark\', 4.0)]
# 方法二(不推荐)
>>> wordgroup = word.groupByKey().map(lambda x:(x[0],sum(x[1])))
>>> wordgroup.collect()
[(\'hadoop\', 10), (\'spark\', 8)]
>>> wordgroup = word.groupByKey().map(lambda x:(x[0],len(x[1])))
>>> wordgroup.collect()
[(\'hadoop\', 2), (\'spark\', 2)]
# 方法三(推荐)
>>> wordgroup = word.groupByKey().map(lambda x:(x[0],sum(x[1])/len(x[1])))
>>> wordgroup.collect()
[(\'hadoop\', 5.0), (\'spark\', 4.0)]
# 方法四(和方法s原理一样)
wordgroup = word.groupByKey().map(lambda x:(x[0],sum(x[1]),len(x[1])))
>>> wordgroup.collect()
[(\'hadoop\', 10, 2), (\'spark\', 8, 2)]
>>> wordgroup.map(lambda x:(x[0],x[1]/x[2])).collect()
[(\'hadoop\', 5.0), (\'spark\', 4.0)]
第一步可以将两个文件合并到一起组成一个新的RDD
>>> rdd = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\数据集\\file*.txt\")
>>> rdd.collect()
[\'15,594,564,126\', \'789,157,259,115\', \'894,115,157,267\', \'5456,5,6,2\', \'494,199,1,2597\', \'4969,45,69,25\', \'\', \'\', \'12,56\', \'4564,461,2369,16\', \'49,6,56,65\', \'659,652,166,64\', \'6559,65,6,4\', \'599,56\', \'6561,127,489,145\', \'\', \'14\']
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(\'spark\').setMaster(\'local[1]\')
sc = SparkContext(conf=conf)
rdd_pcq = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\数据集\\求TOP值\\file*.txt\")
rdd1 = rdd.filter(lambda x:len(x.split(\',\'))==4)
rdd2 = rdd1.map(lambda x:eval(x.split(\',\')[2]))
rdd3 = rdd2.repartition(1)
rdd4 = rdd3.sortBy(lambda x:x,False)
rdd4.foreach(print)
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(\'spark\').setMaster(\'local[1]\')
sc = SparkContext(conf=conf)
# 以上是独立运用程序必备的
rdd = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\数据集\\二次排序\\file*.txt\")
rdd1 = rdd.map(lambda x:(x.split()[0],x.split()[1]))
rdd2 = rdd1.sortBy(lambda x:x,False)
rdd2.foreach(print)
运行得到结果,如下
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(\'spark\').setMaster(\'local[1]\')
sc = SparkContext(conf=conf)
index = 0
def getindex():
global index
index+=1
return index
rdd = sc.textFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\数据集\\文件排序\\file*.txt\")
rdd1 = rdd.filter(lambda x:len(x)>0)
rdd2 = rdd1.map(lambda x:int(x.strip()))
rdd3 = rdd2.repartition(1)
rdd4 = rdd3.sortBy(lambda x:x)
rdd5 = rdd4.map(lambda x:(getindex(),x))
rdd5.foreach(print)
rdd5.saveAsTextFile(r\"file:///C:\\Users\\86178\\Desktop\\SPARK\\数据集\\文件排序\\export\")
运行结果如下,
结构化数据
df.createGlobalTempView(‘view’)
createGlobalTempView 创建一个全局的临时表 , 这个表的生命周期是整个Spark应用程序 ,
只要Spark 应用程序不关闭 , 那么这个临时表依然是可以使用的 ,并且这个表对其他的SparkSession共享
自定义程序,需要创建一个SparkSession对象。
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
createDataFrame(data,schema=None,samplingRatio=None)
data:可以是列表,RDD,还可以是pandas中的DataFrame
schema:列名,一个列表
samplingRatio:推测
1. 列表创建
这里有个警告,需要创建一个全局变量用来封装df1。
>>> data = [(\'pan\',13),(\'fan\',14)]
>>> df1 = spark.createDataFrame(data,[\'name\',\'age\'])
2021-11-05 11:45:41 WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
2021-11-05 11:45:41 WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
2021-11-05 11:45:43 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
>>> df1.createGlobalTempView(\'view\')
>>> df1 = spark.createDataFrame(data,[\'name\',\'age\'])
>>> print(df1)
DataFrame[name: string, age: bigint]
>>> df1.show()
+----+---+
|name|age|
+----+---+
| pan| 13|
| fan| 14|
+----+---+
>>>
>>> import pandas as pd
>>> test_dict = {\'id\':[1,2,3,4,5,6],
... \'name\':[\'Alice\',\'Bob\',\'Cindy\',\'Eric\',\'Halen\',\'Grace\'],
... \'math\':[90,99,78,98,97,81],
... \'english\':[100,100,95,78,45,75]}
>>> data = pd.DataFrame(test_dict,index=list(\'abcdef\'))
>>> print(type(data))
<class \'pandas.core.frame.DataFrame\'>
>>> spark_df = spark.createDataFrame(data)
>>> print(spark_df)
DataFrame[id: bigint, name: string, math: bigint, english: bigint]
>>> spark_df.show()
+---+-----+----+-------+
| id| name|math|english|
+---+-----+----+-------+
| 1|Alice| 90| 100|
| 2| Bob| 99| 100|
| 3|Cindy| 78| 95|
| 4| Eric| 98| 78|
| 5|Halen| 97| 45|
| 6|Grace| 81| 75|
+---+-----+----+-------+
>>>
也可以通过字典直接创建
>>> rdd = sc.parallelize([(\'pan\',13),(\'fan\',14)])
>>> spark_df = spark.createDataFrame(rdd)
>>> spark_df.show()
+---+---+
| _1| _2|
+---+---+
|pan| 13|
|fan| 14|
+---+---+
>>>
>>> rdd = sc.parallelize([(\'pan\',13),(\'fan\',14)])
>>> spark_df = spark.createDataFrame(rdd,schema=[\'name\',\'age\'])
>>> spark_df.show()
+----+---+
|name|age|
+----+---+
| pan| 13|
| fan| 14|
+----+---+
>>>
>>> rdd = sc.parallelize([(\'pan\',13),(\'fan\',14)])
>>> spark_df = spark.createDataFrame(rdd,[\'name\',\'age\'])
>>> spark_df.show()
+----+---+
|name|age|
+----+---+
| pan| 13|
| fan| 14|
+----+---+
通过加载各种文件数据创建DataFrame
text文件的特殊性
>>> df = spark.read.text(r\"C:\\Users\\86178\\Desktop\\SPARK\\数据集\\创建DataFrame\\panda.txt\")
2021-11-09 10:34:56 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
>>> df.show()
+------+
| value|
+------+
|pan 13|
|fan 14|
| df 30|
+------+
>>>
当然还有更加好用的csv读取。
csv(path, schema=None, sep=None, header=None,dateFormat=None, timestampFormat=None, multiLine=None)
name age
pan 13
fan 14
df 30
读取这个文件
指定分隔符为空格,以及,指定有头部
>>> df = spark.read.csv(r\'C:\\Users\\86178\\Desktop\\SPARK\\数据集\\创建DataFrame\\panda.txt\',sep=\' \',header=True)
>>> df.show()
+----+---+
|name|age|
+----+---+
| pan| 13|
| fan| 14|
| df| 30|
+----+---+
>>>
josn文件读取
json(path, schema=None, mode=None, dateFormat=None, timestampFormat=None, multiLine=None)
通过json方法就可以读取json文件的数据了,通常情况下一般认为一行数据就是一条记录。
path,可以是一个json文件的路径,也可以是一个路径列表,表示读取多个文件、还可以是一个RDD这个rdd存储的是json数据。
schema,指定列名
mode 表示解析的时候如果遇到被损坏的行数据怎么处理,PERMISSIVE表示把能解析的解析出来,其他的字段设置为NULL;DROPMALFORMED直接忽略掉这个记录;FAILFAST直接报错
dateFormat和timestampFormat都是时间格式,如果设置了,满足这种格式的将被解析为日期
multiline 有的时候一个完整的json记录会跨多行,那么要把这个设置为True
>>> df = spark.read.json(r\'C:\\Users\\86178\\Desktop\\SPARK\\数据集\\sql\\employees.json\')
>>> df.show()
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
>>>
>>> from pyspark.sql import Row
>>> Row1=Row(name=\'pan\',age=20)
>>> Row2=Row(name=\'fan\',age=19)
>>> df = spark.createDataFrame([Row1,Row2])
>>> df.show()
+---+----+
|age|name|
+---+----+
| 20| pan|
| 19| fan|
+---+----+
>>>
studentRDD = sc.parallelize([\"7 Rongcheng M 26\",\"8 Guanhua M 27\"]) .map(lambda x:x.split(\" \"))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
import json
people_rdd = sc.textFile(\'/home/hadoop/data/sql/people-all.json\')
people_rdd = people_rdd.map(lambda line: json.loads(line)) #str 转 json字典
print(people_rdd.collect())
people_df = people_rdd.toDF()
people_df.printSchema()
注意:
创建df时要注意一列的类型一样
参数数量也必须一致,如下就会报错:
>>> rdd = sc.parallelize([(\'pan\',13),(\'fan\',14,15)])
>>> spark_df = spark.createDataFrame(rdd,[\'name\',\'age\'])
注意在读取没有列名的列时,不能直接使用schema指定列名,需要先对schema定义。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qvbFNrfK-1639657894967)(spark笔记.assets/image-20211123115237105.png)]
>>> from pyspark.sql.types import *
>>> schema = StructType([StructField(\'dept_id\',FloatType()),StructField(\'dept_name\',StringType())])
>>> df_de = spark.read.csv(r\'C:\\Users\\86178\\Desktop\\SPARK\\作业\\第二次作业\\dept.csv\',schema=schema) # 这里讲FloatType可以指定为StringType(不影响计算)
>>> df_de.show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
| 10.0| 财务部|
| 20.0| 研发部|
| 30.0| 销售部|
| 40.0| 运营部|
+-------+---------+
>>>
>>> from pyspark.sql import Row
>>> rdd = sc.textFile(r\'C:\\Users\\86178\\Desktop\\SPARK\\作业\\第二次作业\\dept.csv\').map(lambda x:x.split(\',\'))
>>> rdd.collect()
[[\'10\', \'财务部\'], [\'20\', \'研发部\'], [\'30\', \'销售部\'], [\'40\', \'运营部\']]
>>> rdd_row = rdd.map(lambda x:Row(dept_id=x[0],dept_name=x[1]))
>>> df_dept = spark.createDataFrame(rdd_row)
>>> df_dept.show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
| 10| 财务部|
| 20| 研发部|
| 30| 销售部|
| 40| 运营部|
+-------+---------+
>>>
还有很多方法可以操作这个,但是推荐使用这两种方法,相对而言方便些。
读写text文件时,不会将文件分割,并且,写入时必须是一列,不然会报错。
>>> rdd = rdd.repartition(1)
>>> spark_df = spark.createDataFrame(rdd,[\'name\',\'age\'])
>>> spark_df.write.json(r\'C:\\Users\\86178\\Desktop\\SPARK\\数据集\\创建DataFrame\\a.json\')
>>>
注意这里也是需要重新分区的,不然会将文件写到多个js文件下
df.write.text(path)或者
df.write.format(‘text’).save(path)
>>> df1 = df1.repartition(1)
>>> df1.write.format(\"json\").save(r\"C:\\Users\\86178\\Desktop\\SPARK\\数据集\\sql\\输出.json\")
>>>
spark_df.printSchema() 返回列名
>>> rdd = sc.parallelize([(\'pan\',13),(\'fan\',14)])
>>> spark_df = spark.createDataFrame(rdd,[\'name\',\'age\'])
>>> spark_df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
>>> print(spark_df.count())
2
>>> print(spark_df.dtypes)
[(\'name\', \'string\'), (\'age\', \'bigint\')]
>>>
limit和take —— 限制返回的行数
>>> df = spark.read.csv(r\'C:\\Users\\86178\\Desktop\\SPARK\\数据集\\创建DataFrame\\panda.txt\',sep=\' \',header=True)
>>> df.show()
+----+---+
|name|age|
+----+---+
| pan| 13|
| fan| 14|
| df| 30|
+----+---+
>>> df.collect()
[Row(name=\'pan\', age=\'13\'), Row(name=\'fan\', age=\'14\'), Row(name=\'df\', age=\'30\')]
# limit
>>> df.limit(2).collect()
[Row(name=\'pan\', age=\'13\'), Row(name=\'fan\', age=\'14\')]
>>> df.limit(2).show()
+----+---+
|name|age|
+----+---+
| pan| 13|
| fan| 14|
+----+---+
# take
>>> df.take(2)
[Row(name=\'pan\', age=\'13\'), Row(name=\'fan\', age=\'14\')]
>>>
# 注意take不能和show一起使用,limit可以
>>> df.first()
Row(name=\'pan\', age=\'13\')
>>> df.head()
Row(name=\'pan\', age=\'13\')
>>> df.head(2)
[Row(name=\'pan\', age=\'13\'), Row(name=\'fan\', age=\'14\')]
>>>
select 和 selectExpr
>>> newdf = spark.createDataFrame([(\"王伟\",8),(\"卢露\",6),(\"卢露\",8),(\"图编\",7),(\"图编\",7)],[\"姓名\",\"年龄\"])
>>> newdf.show()
+---+---+
| 姓名| 年龄|
+---+---+
| 王伟| 8|
| 卢露| 6|
| 卢露| 8|
| 图编| 7|
| 图编| 7|
+---+---+
>>> newdf.select(newdf.年龄,newdf.姓名)
DataFrame[年龄: bigint, 姓名: string]
>>> newdf.select(newdf[\'年龄\'],newdf[\'姓名\'])
DataFrame[年龄: bigint, 姓名: string]
>>>
主要掌握select就行,方法有:newdf.年龄 或者 newdf[‘年龄’],当然也还有直接写字段名的方法,但是不推荐使用。
因为一下就会报错,以防万一就用以上两种方法写。
DataFrame[(年龄 + 1): bigint, 姓名: string]
>>> newdf.select(\'年龄\'+1,newdf[\'姓名\'])
Traceback (most recent call last):
File \"\" , line 1, in <module>
TypeError: can only concatenate str (not \"int\") to str
>>>
**首先需要导入库 **
import pyspark.sql.functions as func
df = spark.read.csv(r\'C:\\Users\\86178\\Desktop\\SPARK\\数据集\\sql\\people.txt\',sep=\',\',header=True)
df.select(df.age,func.sum(df.score)).show()
函数 | 描述 |
---|---|
count、countDistinct | 返回个数 |
min、max | 最小值、最大值 |
first、last | 返回指定列的第一个、最后一个值 |
sum、sumDistinct | 求和 |
avg 、mean | 求平均值 |
>>> newdf = spark.createDataFrame([(\"王伟\",8),(\"卢露\",6),(\"卢露\",8),(\"图编\",7),(\"图编\",7)],[\"姓名\",\"年龄\"])
>>> newdf.select(func.sum(newdf.年龄)).show()
+-------+
|sum(年龄)|
+-------+
| 36|
+-------+
>>> newdf.select(func.mean(newdf.年龄)).show()
+-------+
|avg(年龄)|
+-------+
| 7.2|
+-------+
>>>
这里补充一点求和
>>> newdf.groupBy().sum().show()
+-------+
|sum(年龄)|
+-------+
| 36|
+-------+
函数名 | 描述 |
---|---|
length | 计算字符串长度 |
instr | 返回子字符串第一次出现在给定字符串(列)的位置 |
split | 根据给定的字符分割某个字符串,返回一个数组 |
>>> authors = [[\'Thomas\',\'Hardy\',\'June 2,1840\'],
... [\'Thomas\',\'Hardy\',\'June 2,1840\'],
... [\'Thomas\',\'H\',None],
... [\'Jane\',\'Austen\',\'16 December 1775\'],
... [\'Emily\',None,None]]
>>> df1 = spark.createDataFrame(authors,schema=[\"FirstName\",\"LastName\",\"Dob\"])
>>> df1.select(func.length(df1.FirstName)).show()
+-----------------+
|length(FirstName)|
+-----------------+
| 6|
| 6|
| 6|
| 4|
| 5|
+-----------------+
>>> df1.select(func.instr(df1.FirstName,\'ho\')).show()
+--------------------+
|instr(FirstName, ho)|
+--------------------+
| 2|
| 2|
| 2|
| 0|
| 0|
+--------------------+
>>> df1.select(func.split(df1.FirstName,\'ho\')).show()
+--------------------+
|split(FirstName, ho)|
+--------------------+
| [T, mas]|
| [T, mas]|
| [T, mas]|
| [Jane]|
| [Emily]|
+--------------------+
filter 和 where
filter 方法可以在原有的dataframe对象上指定过滤条件,并返回所有满足这个条件的行数据构成的dataframe。
filter方法的传参,可以用字符串字段名,也可以用 对象名.字段名来写过滤条件。
如果是用字符串形式传参,一个filter函数可以写多个过滤条件。
where 方法与filter方法用法一致,where函数是filter函数的别名。
>>> df.show()
+-------+---+-----+
| name|age|score|
+-------+---+-----+
|Michael| 29| 98|
| Andy| 30| 78|
| Justin| 19| 45|
| panda| 21| 99|
+-------+---+-----+
>>> df.filter((df.age>20) & (df.score>90)).show()
+-------+---+-----+
| name|age|score|
+-------+---+-----+
|Michael| 29| 98|
| panda| 21| 99|
+-------+---+-----+
>>> df.filter(df.age>20).filter(df.score>80).show()
+-------+---+-----+
| name|age|score|
+-------+---+-----+
|Michael| 29| 98|
| panda| 21| 99|
+-------+---+-----+
>>>
sort(cols,ascending)
>>> df.sort(df.age,ascending=False).show()
+-------+---+-----+
| name|age|score|
+-------+---+-----+
| Andy| 30| 78|
|Michael| 29| 98|
| panda| 21| 99|
| Justin| 19| 45|
+-------+---+-----+
>>> df.sort(df.age,df.score,ascending=False,ascending=True).show()
File \"\" , line 1
SyntaxError: keyword argument repeated
当连续对两个列排序时,指定排序顺序就会报错
#Column类的desc/asc方法也表示对这个列降序/升序
>>> df.sort(df.age.desc(),df.score.asc()).show()
+-------+---+-----+
| name|age|score|
+-------+---+-----+
| Andy| 30| 78|
|Michael| 29| 98|
| panda| 21| 99|
| Justin| 19| 45|
+-------+---+-----+
总结:
withColumn(colName,col)
>>> newdf = spark.createDataFrame([(\"王伟\",8),(\"卢露\",6),(\"卢露\",8),(\"图编\",7),(\"图编\",7)],[\"姓名\",\"年龄\"])
>>> newdf.withColumn(\'score\',newdf.年龄+1)
DataFrame[姓名: string, 年龄: bigint, score: bigint]
>>> df = newdf.withColumn(\'score\',newdf.年龄+1).show()
+---+---+-----+
| 姓名| 年龄|score|
+---+---+-----+
| 王伟| 8| 9|
| 卢露| 6| 7|
| 卢露| 8| 9|
| 图编| 7| 8|
| 图编| 7| 8|
+---+---+-----+
方法一:withColumnRenamed
>>> newdf = spark.createDataFrame([(\"王伟\",8),(\"卢露\",6),(\"卢露\",8),(\"图编\",7),(\"图编\",7)],[\"姓名\",\"年龄\"])
>>> newdf = newdf.withColumnRenamed(\'年龄\',\'age\')
>>> newdf.show()
+---+---+
| 姓名|age|
+---+---+
| 王伟| 8|
| 卢露| 6|
| 卢露| 8|
| 图编| 7|
| 图编| 7|
+---+---+
方法二:alias
>>> newdf = newdf.select(newdf.姓名.alias(\'name\'),newdf.年龄.alias(\'age\'))
>>> newdf.show()
+----+---+
|name|age|
+----+---+
| 王伟| 8|
| 卢露| 6|
| 卢露| 8|
| 图编| 7|
| 图编| 7|
+----+---+
>>>
groupBy
可用来求和,
>>> newdf = spark.createDataFrame([(\"王 伟\",\"语文\",90,9),(\"卢露\",\"数学\",60,8),(\"卢露\",\"语文\",80,8),(\"图编\",\"数学\",70,8),(\"图编\",\"语文\",70,6)],[\"姓名\",\"学科\",\"成绩\",\"年龄\"])
>>> newdf.groupBy().sum().show()
+-------+-------+
|sum(成绩)|sum(年龄)|
+-------+-------+
| 370| 39|
+-------+-------+
join(other,on = ,how = ) 其中how为内连接、外连接、左连接等
On=[df.姓名==newdf.name],当两个df连接名不一样的,所以
参数说明:other指定要连接的另一个dataframe,on指定连接条件(就是根据什么字段关联,可以是多个连接条件),how指定连接方式。
可以是内连接inner,outer外连接,left左连接,right右连接,left_outer左外连接,right_outer右外连接等
>>> df = spark.createDataFrame([(\"王伟\",88),(\"卢露\",46),(\"panda\",59),(\"fan\",56),(\"pan\",98)],[\"姓名\",\"score\"])
>>> newdf = spark.createDataFrame([(\"王伟\",8),(\"卢露\",6),(\"panda\",8)],[\"姓名\",\"年龄\"])
# 这里on最好写成 On=[df.姓名==newdf.姓名]
>>> df1 = newdf.join(df,on=\'姓名\',how=\'left_outer\').show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| 王伟| 8| 88|
+-----+---+-----+
>>> df1 = newdf.join(df,on=\'姓名\',how=\'left\').show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| 王伟| 8| 88|
+-----+---+-----+
>>> df1 = newdf.join(df,on=\'姓名\',how=\'right\').show()
+-----+----+-----+
| 姓名| 年龄|score|
+-----+----+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| pan|null| 98|
| fan|null| 56|
| 王伟| 8| 88|
+-----+----+-----+
>>> df1 = newdf.join(df,on=\'姓名\',how=\'inner\').show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| 王伟| 8| 88|
+-----+---+-----+
注意:
df1 = newdf.join(df,on=\'姓名\',how=\'inner\').show()
这样写存在着弊端,就是你接下来使用show就会报错,因为df1现在是空的
>>> df1.show()
Traceback (most recent call last):
File \"\" , line 1, in <module>
AttributeError: \'NoneType\' object has no attribute \'show\'
>>> df1.printSchema()
Traceback (most recent call last):
File \"\" , line 1, in <module>
AttributeError: \'NoneType\' object has no attribute \'printSchema\'
>>>
所以一般我们不会在赋值后面show(等号后面不加show),这里为了方便展示这样写。
>>> df1.show()
+-----+----+-----+
| 姓名| 年龄|score|
+-----+----+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| pan|null| 98|
| fan|null| 56|
| 王伟| 8| 88|
+-----+----+-----+
>>> df1.select(df1.年龄.isNull(),df1.score.isNull()).show()
+------------+---------------+
|(年龄 IS NULL)|(score IS NULL)|
+------------+---------------+
| false| false|
| false| false|
| true| false|
| true| false|
| false| false|
+------------+---------------+
# 返回包含空值的所有数据
>>> df1.select(\'*\').filter(df1.年龄.isNull()).show()
+---+----+-----+
| 姓名| 年龄|score|
+---+----+-----+
|pan|null| 98|
|fan|null| 56|
+---+----+-----+
# 将不为空的数据全部返回
>>> df1.select(\'*\').filter(df1.年龄.isNotNull()).show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| 王伟| 8| 88|
+-----+---+-----+
>>> newdf.filter(newdf.年龄.isNotNull()).show()
+---+---+-----------+
| 姓名| 年龄| 手机|
+---+---+-----------+
| 王伟| 8|13567289098|
| 卢露| 6| 1364567653|
| 卢露| 8| 1325577653|
| 图编| 7| 1325577653|
| 图编| 7| 1325577653|
+---+---+-----------+
>>>
>>> df1.show()
+-----+----+-----+
| 姓名| 年龄|score|
+-----+----+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| pan|null| 98|
| fan|null| 56|
| 王伟| 8| 88|
+-----+----+-----+
>>> df1.drop(\'年龄\').show()
+-----+-----+
| 姓名|score|
+-----+-----+
|panda| 59|
| 卢露| 46|
| pan| 98|
| fan| 56|
| 王伟| 88|
+-----+-----+
na.drop(how=‘any’, thresh=None, subset=None)
how 如果为any的话,表示只要有任意一列的值为空,那么就把这行数据删掉,all表示只有所有列都为空值才删除这一行数据
thresh 表示阈值,是一个整数,就是说这行数据的非空值个数大于等于thresh指定的个数就保留这行数据,会覆盖how的作用。当一行有5个数据,其中2个空值,
subset 默认情况下考虑所有列,但是如果指定了subset,那么就只考虑特定的列,subset是一个列表,列表里面的元素是列名
>>> df1.na.drop().show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| 王伟| 8| 88|
+-----+---+-----+
# 空行中的非空值的个数为2,大于thresh,所以保留
>>> df1.na.drop(thresh=1).show()
+-----+----+-----+
| 姓名| 年龄|score|
+-----+----+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| pan|null| 98|
| fan|null| 56|
| 王伟| 8| 88|
+-----+----+-----+
# 空行中的非空值的个数为2,等于thresh,所以保留
>>> df1.na.drop(thresh=2).show()
+-----+----+-----+
| 姓名| 年龄|score|
+-----+----+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| pan|null| 98|
| fan|null| 56|
| 王伟| 8| 88|
+-----+----+-----+
# 空行中的非空值的个数为2,小于thresh,所以删除空行
>>> df1.na.drop(thresh=3).show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| 王伟| 8| 88|
+-----+---+-----+
# na.fill({}) 字典指定特定列填充
>>> df1.na.fill({\'年龄\':15}).show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| pan| 15| 98|
| fan| 15| 56|
| 王伟| 8| 88|
+-----+---+-----+
>>> df1.na.fill(15).show()
+-----+---+-----+
| 姓名| 年龄|score|
+-----+---+-----+
|panda| 8| 59|
| 卢露| 6| 46|
| pan| 15| 98|
| fan| 15| 56|
| 王伟| 8| 88|
+-----+---+-----+
substr(star,end) 传入int值,不是按照索引的,从1开始
>>> newdf = spark.createDataFrame([(\"王伟\",8,13567289098),(\"卢露\",6,1364567653),(\"卢露\",8,1325577653),(\"图编\",7,1325577653),(\"图编\",7,1325577653)],[\"姓名\",\"年龄\",\"手机\"])
>>> newdf.select(newdf.姓名,newdf.手机.substr(1,4)).show()
+---+-------------------+
| 姓名|substring(手机, 1, 4)|
+---+-------------------+
| 王伟| 1356|
| 卢露| 1364|
| 卢露| 1325|
| 图编| 1325|
| 图编| 1325|
+---+-------------------+
>>>
concat
concat_ws
>>> import pyspark.sql.functions as func
>>> df = df1.select(func.concat(\'姓名\',\'手机\'))
>>> df.show()
+--------------+
|concat(姓名, 手机)|
+--------------+
|pan17830389225|
+--------------+
>>> df = newdf.select(newdf.手机,func.concat_ws(\'---\',newdf.姓名,newdf.年龄))
>>> df.show()
+-----------+----------------------+
| 手机|concat_ws(---, 姓名, 年龄)|
+-----------+----------------------+
|13567289098| 王伟---8|
| 1364567653| 卢露---6|
| 1325577653| 卢露---8|
| 1325577653| 图编---7|
| 1325577653| 图编---7|
+-----------+----------------------+
>>>
like
% 匹配任何几个字符
_ 只匹配一个字符
>>> authors = [[\'Thomas\',\'Hardy\',\'June 2,1840\'],
... [\'Thomas\',\'Hardy\',\'June 2,1840\'],
... [\'Thomas\',\'Ha\',None],
... [\'Jane\',\'Austen\',\'16 December 1775\'],
... [\'Emily\',None,None]]
>>>
>>> df1 = spark.createDataFrame(authors,schema=[\"FirstName\",\"LastName\",\"Dob\"])
>>> df1.select(\'*\').filter(df1.FirstName.like(\'Th%\')).show()
+---------+--------+-----------+
|FirstName|LastName| Dob|
+---------+--------+-----------+
| Thomas| Hardy|June 2,1840|
| Thomas| Hardy|June 2,1840|
| Thomas| Ha| null|
+---------+--------+-----------+
>>>
找出员工姓名是三个字的(模糊查询),并且工资在2000~3000范围内的员工信息,字段包括:员工编号,姓名,性别,年龄,岗位,薪水,部门编号;
代码:
>>> df_pcq19 = spark.sql(\"select emp_id,emp_name,gender,emp_age,emp_job,salary,dept_id from df2 where emp_name like \'___\' and salary>2000 and salary<3000\")
>>> df_pcq19.show()
between(a,b) # [a,b]
一般结合filter来用,含头又含尾。要求传入两个参数,分别为要过滤的最小值和最大值。
>>> newdf.show()
+---+---+-----------+
| 姓名| 年龄| 手机|
+---+---+-----------+
| 王伟| 8|13567289098|
| 卢露| 6| 1364567653|
| 卢露| 8| 1325577653|
| 图编| 7| 1325577653|
| 图编| 7| 1325577653|
+---+---+-----------+
>>> newdf.select(newdf.年龄.between(6,7)).show()
+-------------------------+
|((年龄 >= 6) AND (年龄 <= 7))|
+-------------------------+
| false|
| true|
| false|
| true|
| true|
+-------------------------+
>>>
请注意:不建议在没有服务器身份验证的情况下建立SSL连接。
根据MySQL 5.5.45+、5.6.26+和5.7.6+的要求,如果不设置显式选项,则必须建立默认的SSL连接。
您需要通过设置useSSL=false显式地禁用SSL,或者设置useSSL=true并为服务器证书验证提供信任存储。
连接代码:
# 连接MySQL
>>> jdbcDF= spark.read.jdbc(\'jdbc:mysql://localhost:3306/test_sql?useUnicode=true&characterEncoding=utf-8&useSSL=false&user=root&password=panda&serverTimezone=Asia/Shanghai\', table=\'student\')
>>> jdbcDF.show()
+---+--------+------+---+
| ID| name|gender|age|
+---+--------+------+---+
| 1|zhangsan| M| 16|
| 2| lier| M| 17|
| 3| xiexun| M| 18|
| 4|zhaoling| F| 19|
| 5|zhaoming| M| 20|
| 6| wangwu| F| 18|
+---+--------+------+---+
# 创建一个DataFrame
>>> df = spark.createDataFrame([[7,\'panda\',\'M\',21],[8,\'fan\',\'F\',20]],schema=[\'id\',\'name\',\'gender\',\'age\'])
# 将DataFrame写进MySQL中,可直接复制
prop = {}
prop[\'user\'] = \'root\'
prop[\'password\'] = \'panda\'
prop[\'driver\'] = \"com.mysql.jdbc.Driver\"
df.write.jdbc(\'jdbc:mysql://localhost:3306/test_sql?useUnicode=true&characterEncoding=utf-8&useSSL=false\',\'student\',\'append\',prop)
>>> jdbcDF.show()
+---+--------+------+---+
| ID| name|gender|age|
+---+--------+------+---+
| 1|zhangsan| M| 16|
| 2| lier| M| 17|
| 3| xiexun| M| 18|
| 4|zhaoling| F| 19|
| 5|zhaoming| M| 20|
| 6| wangwu| F| 18|
| 7| panda| M| 21|
| 8| fan| F| 20|
+---+--------+------+---+
首先需要创建临时表或者临时视图
>>> df_dept.createTempView(\"dept_df\")
>>> spark.sql(\"select * from dept_df\").show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
| 10| 财务部|
| 20| 研发部|
| 30| 销售部|
| 40| 运营部|
+-------+---------+
>>>
Spark Streaming兼容hive,但是不受限于hive
流计算框架 – 处理流数据
静态数据和流数据 :
静态数据,数据不会变 (批量计算)
流数据会变(PM2.5检测、网站用户点击流等等) (实时计算)
特点:快速到达;
来源众多,格式复杂;
数据量大;
关注整体数据,不关心个别价值;
数据顺序颠倒,或不完整。
流计算:1. 实时获取来自不同的数据源的海量数据,经过实时分析处理,获得有价值的信息。
2. 数据的价值随着时间的流逝而降低,出现时就应该及时处理,而不是存储下来批量处理。
3. 为了及时处理就应该需要一个 低延迟、可扩展、高可靠 的处理引擎。
高性能
海量式(支持处理TB、甚至PB级数据)
实时性
分布式
易用性
可靠性。
流计算处理流程:数据实时采集、数据实时计算、实时查询服务
输入:Kafka、Flume、HDFS、TCP socket
输出:HDFS、Databases、显示在仪表盘里
数据抽象
spark core – RDD
spark sql – dataframe
Spark Streaming – DStream
通过创建输入DStream来定义输入源
通过对DStream应用转换操作和输出操作来定义流计算
用streamingContext.start()来开始接收数据和处理流程(启动)
通过streamingContext.awaitTermination()方法来等待处理结束(手动结束(ctrl+c)或因为错误而结束)
可以通过streamingContext.stop()来手动结束流计算进程
需要自己去创建(RDD和DataFrame都自带sc和spark,不需要创建,但是DStream没有)
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
这里1是指分段时间的长短,接收数据1秒为时间段(每1秒创建RDD,作为一个输出字段)
当需要编写独立运用程序时,如下
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName(\'TestDStream\')
conf.setMaster(\'local[2]\')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 20)
lines = ssc.textFileStream(r\'C:\\Users\\86178\\Desktop\\SPARK\\DStreaming\')
lines.pprint()
ssc.start()
ssc.awaitTermination()
注意:
1). 只会读取新建的文件,而且是在这20秒内新建的文件才行
2). 在20秒内更改名称也行**(不能存在以前存在的名称)**
存在问题:
1). 文件被读取过,或者经过了一个周期时,文件不会被读出,当新建不会被使用。
2). 当文件修改多次后,在此修改不会被读取了(当创建后有两次修改机会)。
3). 文件名被使用了就不能再次使用。
Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理
cd C:\\software\\netcat
nc -lp 9999
nc 172.24.108.252 9999
netcat作为服务器,作为数据源,连接两台电脑
spark-submit C:\\Users\\86178\\Desktop\\word.py 172.17.32.104 9999
RDD队列流
使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream
spark-submit path
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == \"__main__\":
sc = SparkContext(appName=\"PythonStreamingQueueStream\")
ssc = StreamingContext(sc, 2)
#创建一个队列,通过该队列可以把RDD推给一个RDD队列流
rddQueue = []
for i in range(5):
rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
time.sleep(1)
#创建一个RDD队列流
inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
reducedStream.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True) # stopGraceFully=True当运行完才会结束
每行的输入就是创建一个rdd
无状态就是对在一段时间内的数据统计,不考虑历史的输入。
有状态就是包括对历史的处理,对历史处理汇总。
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# pprint 方法用来打印这个DStream中生成的每个RDD的前个num元素(默认是前20个),无返回值
def run_pprint():
sc = SparkContext()
ssc = StreamingContext(sc,10)
ds = ssc.socketTextStream(\'localhost\',9999)
ds.pprint(num=5)
ssc.start()
ssc.awaitTermination()
run_pprint()
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 元素为原DStream的每个RDD元素个数
def run_count ():
sc = SparkContext()
ssc = StreamingContext(sc,10)
ds = ssc.socketTextStream(\'localhost\',9999)
ds2 = ds.count()
ds2.pprint(num=5)
ssc.start()
ssc.awaitTermination()
run_count()
返回一个只包含满足指定条件的元素构成的DStream
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def run_filter ():
sc = SparkContext()
ssc = StreamingContext(sc,10)
ds = ssc.socketTextStream(\'localhost\',9999)
ds2 = ds.filter(lambda x: \'p\' in x) # 输出包含a的RDD
ds2.pprint(num=5)
ssc.start()
ssc.awaitTermination()
run_filter()
源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def run_map ():
sc = SparkContext()
ssc = StreamingContext(sc,10)
ds = ssc.socketTextStream(\'localhost\',9999)
ds2 = ds.map(lambda x:(x,1)) # 输出包含a的RDD
ds2.pprint(num=5)
ssc.start()
ssc.awaitTermination()
run_map()
结果为:
(‘pan fdj’, 1)
(\'fas \', 1)
(‘sdaf’, 1
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# map 源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream
def run_flatmap ():
sc = SparkContext()
ssc = StreamingContext(sc,10)
ds = ssc.socketTextStream(\'localhost\',9999)
ds2 = ds.flatMap(lambda x:x.split()) # 将每每行按照空格拆分,然后将其输出
ds2.pprint(num=5)
ssc.start()
ssc.awaitTermination()
run_flatmap()
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def run_reduce ():
sc = SparkContext()
ssc = StreamingContext(sc,10)
ds = ssc.socketTextStream(\'localhost\',9999)
ds2 = ds.flatMap(lambda x:x.split()).map(lambda x:eval(x))
ds2.pprint()
ds3 = ds2.reduce(lambda a,b:a+b)
ds3.pprint()
ssc.start()
ssc.awaitTermination()
run_reduce()
输入
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WP9ktynN-1639657894987)(spark笔记.assets/image-20211209205522173.png)]
输出
可以输入的个数数据统计
无状态操作一个批次
这个操作一个框内的所有批次,先将time1的结果计算出,然后计算time2,time2与time1汇总,最后计算time3,然后结果与time2与time1汇总。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QA41Cqpg-1639657894988)(spark笔记.assets/image-20211210105123868.png)]
1. window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream
就相当于在滑动窗口设置没有时间段。
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 读取文件流
def run_file():
sc = SparkContext()
ssc = StreamingContext(sc, 20)
lines = ssc.textFileStream(r\'C:\\Users\\86178\\Desktop\\SPARK\\作业\\第三次作业\\dstreaming\')
lines1 = lines.map(lambda x:x.split()).filter(lambda x:x[0]==\'13\')
lines1.saveAsTextFiles(r\'C:\\Users\\86178\\Desktop\\SPARK\\作业\\第三次作业\\输出\\file\')
lines1.pprint()
ssc.start()
ssc.awaitTermination()
# run_file()
# 无状态---对输入的数据进行词频统计
def run_func1():
sc = SparkContext()
ssc = StreamingContext(sc,10)
ssc.checkpoint(r\"C:\\Users\\86178\\Desktop\\SPARK\\test_dstreaming\\checkpoint\") # 对来不及处理的数据设置检查点
ds = ssc.socketTextStream(\'localhost\',9999)
ds1 = ds.flatMap(lambda x:x.split()).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
ds1.pprint()
ssc.start()
ssc.awaitTermination()
# run_func1()
# 有状态(滑动窗口) -- 对输入的数据进行词频统计
def run_func2():
sc = SparkContext()
ssc = StreamingContext(sc,5)
ssc.checkpoint(r\"C:\\Users\\86178\\Desktop\\SPARK\\test_dstreaming\\checkpoint\") # 对来不及处理的数据设置检查点
ds = ssc.socketTextStream(\'localhost\',9999)
ds1 = ds.flatMap(lambda x:x.split()).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).\\
reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 15, 5)
# 窗口的长度为15,每5秒向后移动一次,移动时间段为5秒的距离
ds1.pprint()
ssc.start()
ssc.awaitTermination()
# run_func2()
# 有状态 -- 对输入的数据进行词频统计
def run_func3():
sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.checkpoint(r\"C:\\Users\\86178\\Desktop\\SPARK\\test_dstreaming\\checkpoint\") # 对来不及处理的数据设置检查点
ds = ssc.socketTextStream(\'localhost\', 9999)
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
ds2 = ds.flatMap(lambda line: line.split())\\
.map(lambda word: (word, 1))\\
.updateStateByKey(updateFunc)
ds2.pprint()
ssc.start()
ssc.awaitTermination()
run_func3()