发布时间:2023-10-22 17:00
1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20
2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30
<properties>
<maven.compiler.source>1.8maven.compiler.source>
<maven.compiler.target>1.8maven.compiler.target>
<scala.version>2.12.10scala.version>
<spark.version>3.0.1spark.version>
<hbase.version>2.2.5hbase.version>
<hadoop.version>3.2.1hadoop.version>
<encoding>UTF-8encoding>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.httpcomponentsgroupId>
<artifactId>httpclientartifactId>
<version>4.5.12version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_2.12artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_2.12artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.73version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.47version>
dependency>
dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.2version>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.5.1version>
plugin>
plugins>
pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<executions>
<execution>
<id>scala-compile-firstid>
<phase>process-resourcesphase>
<goals>
<goal>add-sourcegoal>
<goal>compilegoal>
goals>
execution>
<execution>
<id>scala-test-compileid>
<phase>process-test-resourcesphase>
<goals>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<executions>
<execution>
<phase>compilephase>
<goals>
<goal>compilegoal>
goals>
execution>
executions>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<version>2.4.3version>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<filters>
<filter>
<artifact>*:*artifact>
<excludes>
<exclude>META-INF/*.SFexclude>
<exclude>META-INF/*.DSAexclude>
<exclude>META-INF/*.RSAexclude>
excludes>
filter>
filters>
configuration>
execution>
executions>
plugin>
plugins>
build>
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.appName(\"DataFrameDemo_FlowRollUp_DSL\")
.master(\"local\")
.getOrCreate()
val structType: StructType = StructType(List(
StructField(\"uid\", DataTypes.StringType, false),
StructField(\"start_time\", DataTypes.StringType, false),
StructField(\"end_time\", DataTypes.StringType, false),
StructField(\"flow\", DataTypes.DoubleType, false)
))
// 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
// E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt
val dataFrame: DataFrame = sparkSession.read
.schema(structType)
.csv(\"E:\\\\DOITLearning\\\\12.Spark\\\\netflowRollupSourceData.txt\")
// dataFrame.show()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
// lag($\"end_time\", 1, \"start_time\") 无法将默认值设置为变量,所以改为expr使用sql片段
// 为了防止数据重复,再去重一下
// 因为直接使用Date类型,解析到日,无法解析到秒,
// 所以需要先使用字符串去解析,然后使用SELECT to_unix_timestamp(\'2016-04-08\', \'yyyy-MM-dd\');转换为时间戳
val dataFrame1: DataFrame = dataFrame.select(
$\"uid\",
$\"start_time\",
$\"end_time\",
$\"flow\",
expr(\"lag(end_time, 1, start_time)\") over (Window.partitionBy(\"uid\").orderBy(\"start_time\")) as \"last_end_time\"
).distinct()
// dataFrame1.show()
// 使用expr执行sql片段,使用if函数,to_unix_timestamp函数,给数据打上是否符合上下行时间差小于10min的标记
val dataFrame2: DataFrame = dataFrame1.select(
$\"uid\",
$\"start_time\",
$\"end_time\",
$\"flow\",
// SELECT if(1 < 2, \'a\', \'b\'); to_unix_timestamp(\'2016-04-08\', \'yyyy-MM-dd\')
expr(\"if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') >60*10, 1, 0 )\") as \"flag\"
)
// dataFrame2.show()
// 使用sum聚合函数,对每条数据打标记
val dataFrame3: DataFrame = dataFrame2.select(
$\"uid\",
$\"start_time\",
$\"end_time\",
$\"flow\",
sum($\"flag\") over (Window.partitionBy(\"uid\").orderBy($\"start_time\" asc)) as \"sum_flag\"
)
// dataFrame3.show()
// 使用分组,聚合函数,对数据做最终的处理
val dataFrame4: DataFrame = dataFrame3.select(
$\"uid\",
$\"start_time\",
$\"end_time\",
$\"flow\",
$\"sum_flag\"
).groupBy(\"uid\", \"sum_flag\").agg(
$\"uid\",
min(\"start_time\") as \"minDate\",
max(\"end_time\") as \"maxDate\",
sum(\"flow\") as \"total_flow\",
count(\"*\") as \"sumed_count\"
)
dataFrame4.show()
sparkSession.close()
}
object DataFrameDemo_FlowRollUp_SQL {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.appName(\"DataFrameDemo_FlowRollUp_SQL\")
.master(\"local\")
.getOrCreate()
// 使用structType手动指定列名和数据类型,比读数据时自动推到更加有效和准确
val structType: StructType = StructType(List(
StructField(\"uid\", DataTypes.StringType, false),
StructField(\"start_time\", DataTypes.StringType, false),
StructField(\"end_time\", DataTypes.StringType, false),
StructField(\"flow\", DataTypes.DoubleType, false)
))
// 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
// E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt
// 这里使用dsl风格API,对数据做去重,相比sql要简单很多。
// 本身对来源数据做去重和筛选清洗是每个环节都需要考虑的,类似函数的参数异常检查
val dataFrame: DataFrame = sparkSession.read
.schema(structType)
.csv(\"E:\\\\DOITLearning\\\\12.Spark\\\\netflowRollupSourceData.txt\")
.distinct()
// 使用sql前,先注册视图
dataFrame.createTempView(\"v_flow\")
// 实际开发时,一般都是分步骤执行,这样可读性更好
val dataFrame1: DataFrame = sparkSession.sql(
\"\"\"
|select
|uid,
|min(start_time) as min_date,
|max(end_time) as end_time,
|sum(flow) as total_flow
|from
|(
| select
| uid,
| start_time,
| end_time,
| flow,
| sum(flag) over (partition by uid order by start_time asc) as sum_flag
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow,
| if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow,
| lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow
| from
| v_flow
| )
| )
| )
|)
|group by uid, sum_flag
|\"\"\".stripMargin)
dataFrame1.show()
sparkSession.close()
}
}
/*
*select
uid,
start_time,
end_time,
flow
from
v_flow
select
uid,
start_time,
end_time,
flow,
lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
from
(
select
uid,
start_time,
end_time,
flow
from
v_flow
)
-- to_unix_timestamp(\'2016-04-08\', \'yyyy-MM-dd\');
select
uid,
start_time,
end_time,
flow,
if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
from
(
select
uid,
start_time,
end_time,
flow
from
v_flow
)
)
select
uid,
start_time,
end_time,
flow,
sum(flag) over (partition by uid order by start_time asc) as sum_flag
from
(
select
uid,
start_time,
end_time,
flow,
if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
from
(
select
uid,
start_time,
end_time,
flow
from
v_flow
)
)
)
select
uid,
min(start_time) as min_date,
max(end_time) as end_time,
sum(flow) as total_flow
from
(
select
uid,
start_time,
end_time,
flow,
sum(flag) over (partition by uid order by start_time asc) as sum_flag
from
(
select
uid,
start_time,
end_time,
flow,
if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
from
(
select
uid,
start_time,
end_time,
flow
from
v_flow
)
)
)
)
group by uid, sum_flag
*
* */