发布时间:2022-08-19 13:21
spark是基于内存的分布式计算框架。
hadoop是一个分布式计算开源框架,包含分布式文件系统HDFS、 MapReduce分布式计算的软件架构和Yarn资源管理调度系统。
1.MR是基于磁盘迭代处理数据,Spark是基于内存处理数据
2.Spark有DAG有向无环图做优化
3.MR是细粒度资源申请,application执行慢,spark是粗粒度资源申请,application执行快
4.MR没有SQL,流处理功能,Spark有SQL功能以及流处理功能
5.MR没有高级算子的封装,Spark提供了各种高级算子(函数)
1)Local:运行在一台机器上。 测试用。
2)Standalone:是Spark自身的一个调度系统。 对集群性能要求非常高时用。国内很少使用。
3)Yarn:采用Hadoop的资源调度器。 国内大量使用。
4)Mesos:国内很少使用。
NameNode WebUI 2.x 50070 3.x 9870
访问MR执行情况端口(yarn) 8088
历史服务器 19888
客户端访问集群端口(内部通信) 2.x 9000 3.x 8020
spark-shell任务端口 4040
内部通讯端口 7077
standalone,master web 8080
历史服务器 18080
zookeeper 2181
redis 6379
mysql 3306
kafka 9092
hbase master WebUI 60010
RDD是由一系列Partition组成
算子是作用在partition上的
RDD之间有依赖关系
分区器是作用在KV格式的RDD上的
partition对外提供了最佳的计算位置(移动数据不如移动计算)
map
mapPartitions
mapPartitionsWithIndex
flatMap
groupBy
filter
sample(true,0.3) 放回抽样,抽样比例0.3
distinct去重,原理reduceByKey 有shuffle
coalesce 减少分区用coalesce(num,false) 不shuffle
repartition 增加分区用repartition(num) =coalesce(num,true) 有shuffle
sortBy
intersection 它是去二个rdd的交集 有shuffle
union 取两个RDD的并集,不去重,会增加partition的数量,同时并行度也会增加
reduceByKey
groupByKey
sortByKey
aggregateByKey
join
collect
count
first take(1)
take
foreach
foreachPartitons
top
takeOrdered(n) 取出前n个元素,按升序列
./spark-submit --master yarn --deploy-mode cluster --class …jar …
1)在提交任务时的几个重要参数
executor-cores 4
num-executors 10
executor-memory 8g
driver-cores 2
driver-memory 8g
2)边给一个提交任务的样式
spark-submit \
–master yarn \
–deploy-mode cluster \
–driver-cores 2 \
–driver-memory 8g \
–executor-cores 4\
–num-executors 10 \
–executor-memory 8g \
–class PackageName.ClassName XXXX.jar \
–name “Spark Job Name” \
InputPath \
OutputPath
窄依赖:父rdd与子rdd是一对一或多对一的关系
宽依赖:父rdd与子rdd是一对多或多对多的关系 有shuffle shuffle会落地到磁盘
比如,coalesce(num,false) 减少分区时,就不会产生shuffle
Application执行之前首先将所有的资源申请完毕,Application中的每个job执行时不需要自己申请资源,自己释放资源,当application中最后一个job执行完成之后,所有的资源才会被释放,job执行快,application执行快,集群的资源不能充分利用。
Application启动之后,每个job自己申请资源自己释放资源,每个job执行完成之后,资源立即释放,集群资源可以充分利用, application执行相对慢。
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
DataFrame的cache默认采用 MEMORY_AND_DISK
RDD 的cache默认方式采用MEMORY_ONLY
其他缓存级别
DISK_ONLY
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2 数据存两份
MEMORY_AND_DISK_2
cache() = persist(StorageLevel.MEMORY_ONLY)
1)Checkpoint 需要手动指定磁盘目录,当Spark的application执行完成之后数据不会被清空,由于这个特点,checkpoint在Spark中常用于状态管理
2)Cache缓存只是将数据保存起来,不切断血缘依赖,当Spark的application执行完成之后数据会被清空。Checkpoint检查点切断血缘依赖。
3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
将数据封装到一个对象中,使用SortByKey对对象排序,对象需要实现可排序 Comparable 接口
1.GroupByKey后分组的数据放到集合中进行排序。取出前n个 数据量大会OOM
2.GroupByKey后分组的数据进行迭代,使用定长数组的方式拿到前n个,迭代器本身不储存数据。
1)当Executor端使用到Driver端的变量时,如果不使用广播变量,每个Executor有多少task就有多少变量的副本。使用广播变量后,每个Executor只有一份Driver端变量的副本。
2)广播变量不能对RDD进行广播,可以将RDD回收到Driver端进行广播
3)广播变量只能在Driver端定义,在Executor端不能修改广播变量的值
4)广播变量的声明和使用
val broadcast :Broadcast[String] = sc.broadcast(str)
rdd.filter{
log=>log.contains(broadcast.value)
}
…
累加器只能在Driver端定义初始化,在Executor累加
val sum:LongAccumulator = sc.longAccumulator(“sum”);
rdd.foreach{
sum.add(1)
}
reduceBykey:map端预聚合
groupByKey:没有map端预聚合
repartition: 重分区,一般用于增加分区
aggregateByKey
SortByKey:
hive on spark : hive映射HDFS上的表,解析优化SQL,底层转化为Spark执行
spark on hive : hive映射HDFS上的表,Spark解析优化SQL,Spark执行,写的是Spark代码
1)RDD 是Spark Core底层操作的对象
2)Dataset是SparkSQL底层操作的对象,Dataset底层hash,filter,sort时不需要反序列化数据成对象,性能比RDD高
3)DataFrame = Dataset[Row] ,DataFrame中有数据和Schema信息,在Java api 没有DataFrame对象,就是Dataset
SparkConf : Spark 配置
SparkContext : Spark上下文,SparkCore编程需要创建SparkContext
SQLContext : 在SparkSQL编程中需要创建的对象,创建SQLContext,需要创建SparkConf,SparkContext()
HiveContext : SparkSQL读取Hive中的数据需要创建的对象,HiveContext就是SQLContext的子类
SparkSession :Spark2.0+ 版本之后推出对象,兼容了SparkConf,SparkContext,SQLContext,HiveContext对象,方便编程。
1)通过反射方式(推荐)
(1)把数据切分组装到bean类中,用map算子返回
(2)导入隐式转换函数 import session.implicits._
(3)rdd.toDF
2)通过动态创建Schema
(1)把数据切分组装到Row中,用map算子返回
(2)构建StructType
val scheme: StructType = StructType(List[StructField](
StructField("id", DataTypes.IntegerType, true),
StructField("name", DataTypes.StringType, true),
StructField("age", DataTypes.IntegerType, true),
StructField("score", DataTypes.IntegerType, true)
))
(3) session.createDataFrame(rdd,scheme)
一对一
(1)导入隐式转换函数 import session.implicits._
(2) 向sparkSessionj注册udf函数 val addName: UserDefinedFunction = session.udf.register(“addName”, (x: String) => “Name=” + x)
(3) 在session.sql(“”)使用
MR只能处理批数据,MR只有map和reduce,有原生api,没有高级算子的封装
Storm 处理实时流数据的框架,处理数据是一条条的,有原生api,不支持sql api
Spark 可以处理批数据,也可以处理流数据 可以使用Sql,有高级算子封装
SparkStreaming 是微批处理,吞吐量高,数据延迟较高,可以保证数据的一致性
val ssc = new StreamingContext(SparkConf,Durations.second(xx))
…
ssc.start()之后业务逻辑不能改变
1.SparkStreaming读取socket数据首先 会启动一个job,接收数据
2.将batchInterval 接收过来的数据封装到一个batch中,batch又被封装到RDD中,RDD又被封装到DStream中
3.DStream有Transformation类算子可以转换数据,懒执行的,需要DStream的outputOperator类算子触发执行
每隔一段时间处理最近一段时间内数据 可以使用窗口操作
窗口长度:window length - wl ,必须是batchInterval整数倍
滑动间隔:sliding interval -si ,必须是batchInterval整数倍
reduceByKeyAndWindow(fun,窗口长度,滑动间隔) : 普通机制,每次计算都是将当前窗口内的批次全部重新计算
reduceByKeyAndWindow(fun1,fun2,窗口长度,滑动间隔) :优化机制,需要设置checkpoint 保存状态,每次计算根据上次结果统计得到