Spark SQL案例(二)流量累加

发布时间:2023-10-22 17:00

Spark SQL案例(二)流量累加

1. 背景

  1. 本身Spark SQL支持2种风格的API,sql和dsl,各有优势,实际企业开发时,看情况选择。
  2. 本文种案例是基于企业开发中常见场景抽象出来的案例,数据按照一定规则,将某些字段数据进行聚合,如流量分段累计等常见场景

2. 案例

  1. 需求,计算连续3天及以上登录用户
  2. 数据
1,2020-02-18 14:20:30,2020-02-18 14:46:30,20  
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30        
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40            
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50          
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60          
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20
2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30
  1. 环境准备
  • idea 2020
  • jdk1.8
  • scala 2.12.12
  • maven 3.6.3
  • pom

    <properties>
        <maven.compiler.source>1.8maven.compiler.source>
        <maven.compiler.target>1.8maven.compiler.target>
        <scala.version>2.12.10scala.version>
        <spark.version>3.0.1spark.version>
        <hbase.version>2.2.5hbase.version>
        <hadoop.version>3.2.1hadoop.version>
        <encoding>UTF-8encoding>
    properties>

    <dependencies>
        
        <dependency>
            <groupId>org.scala-langgroupId>
            <artifactId>scala-libraryartifactId>
            <version>${scala.version}version>
            
            
        dependency>

        
        <dependency>
            <groupId>org.apache.httpcomponentsgroupId>
            <artifactId>httpclientartifactId>
            <version>4.5.12version>
        dependency>

        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-sql_2.12artifactId>
            <version>${spark.version}version>
        dependency>

        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-core_2.12artifactId>
            <version>${spark.version}version>
            
            
        dependency>

        
        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>1.2.73version>
        dependency>

        
        <dependency>
            <groupId>mysqlgroupId>
            <artifactId>mysql-connector-javaartifactId>
            <version>5.1.47version>
        dependency>

    dependencies>

    <build>
        <pluginManagement>
            <plugins>
                
                <plugin>
                    <groupId>net.alchim31.mavengroupId>
                    <artifactId>scala-maven-pluginartifactId>
                    <version>3.2.2version>
                plugin>
                
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-compiler-pluginartifactId>
                    <version>3.5.1version>
                plugin>
            plugins>
        pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.mavengroupId>
                <artifactId>scala-maven-pluginartifactId>
                <executions>
                    <execution>
                        <id>scala-compile-firstid>
                        <phase>process-resourcesphase>
                        <goals>
                            <goal>add-sourcegoal>
                            <goal>compilegoal>
                        goals>
                    execution>
                    <execution>
                        <id>scala-test-compileid>
                        <phase>process-test-resourcesphase>
                        <goals>
                            <goal>testCompilegoal>
                        goals>
                    execution>
                executions>
            plugin>

            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-compiler-pluginartifactId>
                <executions>
                    <execution>
                        <phase>compilephase>
                        <goals>
                            <goal>compilegoal>
                        goals>
                    execution>
                executions>
            plugin>

            
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-shade-pluginartifactId>
                <version>2.4.3version>
                <executions>
                    <execution>
                        <phase>packagephase>
                        <goals>
                            <goal>shadegoal>
                        goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SFexclude>
                                        <exclude>META-INF/*.DSAexclude>
                                        <exclude>META-INF/*.RSAexclude>
                                    excludes>
                                filter>
                            filters>
                        configuration>
                    execution>
                executions>
            plugin>
        plugins>
    build>

2.1 代码一(DSL)

def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName(\"DataFrameDemo_FlowRollUp_DSL\")
      .master(\"local\")
      .getOrCreate()

    val structType: StructType = StructType(List(
      StructField(\"uid\", DataTypes.StringType, false),
      StructField(\"start_time\", DataTypes.StringType, false),
      StructField(\"end_time\", DataTypes.StringType, false),
      StructField(\"flow\", DataTypes.DoubleType, false)
    ))

    // 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
    //  E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt
    val dataFrame: DataFrame = sparkSession.read
      .schema(structType)
      .csv(\"E:\\\\DOITLearning\\\\12.Spark\\\\netflowRollupSourceData.txt\")

//    dataFrame.show()

    import org.apache.spark.sql.functions._
    import sparkSession.implicits._

    // lag($\"end_time\", 1, \"start_time\") 无法将默认值设置为变量,所以改为expr使用sql片段
    // 为了防止数据重复,再去重一下
    // 因为直接使用Date类型,解析到日,无法解析到秒,
    // 所以需要先使用字符串去解析,然后使用SELECT to_unix_timestamp(\'2016-04-08\', \'yyyy-MM-dd\');转换为时间戳
    val dataFrame1: DataFrame = dataFrame.select(
      $\"uid\",
      $\"start_time\",
      $\"end_time\",
      $\"flow\",
      expr(\"lag(end_time, 1, start_time)\") over (Window.partitionBy(\"uid\").orderBy(\"start_time\")) as \"last_end_time\"
    ).distinct()

//    dataFrame1.show()

    // 使用expr执行sql片段,使用if函数,to_unix_timestamp函数,给数据打上是否符合上下行时间差小于10min的标记
    val dataFrame2: DataFrame = dataFrame1.select(
      $\"uid\",
      $\"start_time\",
      $\"end_time\",
      $\"flow\",
      // SELECT if(1 < 2, \'a\', \'b\');  to_unix_timestamp(\'2016-04-08\', \'yyyy-MM-dd\')
      expr(\"if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') >60*10, 1, 0 )\") as \"flag\"
    )

//    dataFrame2.show()

    // 使用sum聚合函数,对每条数据打标记
    val dataFrame3: DataFrame = dataFrame2.select(
      $\"uid\",
      $\"start_time\",
      $\"end_time\",
      $\"flow\",
      sum($\"flag\") over (Window.partitionBy(\"uid\").orderBy($\"start_time\" asc)) as \"sum_flag\"
    )

//    dataFrame3.show()

    // 使用分组,聚合函数,对数据做最终的处理
    val dataFrame4: DataFrame = dataFrame3.select(
      $\"uid\",
      $\"start_time\",
      $\"end_time\",
      $\"flow\",
      $\"sum_flag\"
    ).groupBy(\"uid\", \"sum_flag\").agg(
      $\"uid\",
      min(\"start_time\") as \"minDate\",
      max(\"end_time\") as \"maxDate\",
      sum(\"flow\") as \"total_flow\",
      count(\"*\") as \"sumed_count\"
    )

    dataFrame4.show()

    sparkSession.close()
  }

2.2 代码二(SQL)

object DataFrameDemo_FlowRollUp_SQL {
  def main(args: Array[String]): Unit = {

    val sparkSession: SparkSession = SparkSession.builder()
      .appName(\"DataFrameDemo_FlowRollUp_SQL\")
      .master(\"local\")
      .getOrCreate()

    // 使用structType手动指定列名和数据类型,比读数据时自动推到更加有效和准确
    val structType: StructType = StructType(List(
      StructField(\"uid\", DataTypes.StringType, false),
      StructField(\"start_time\", DataTypes.StringType, false),
      StructField(\"end_time\", DataTypes.StringType, false),
      StructField(\"flow\", DataTypes.DoubleType, false)
    ))

    // 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
    //  E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt
    // 这里使用dsl风格API,对数据做去重,相比sql要简单很多。
    // 本身对来源数据做去重和筛选清洗是每个环节都需要考虑的,类似函数的参数异常检查
    val dataFrame: DataFrame = sparkSession.read
      .schema(structType)
      .csv(\"E:\\\\DOITLearning\\\\12.Spark\\\\netflowRollupSourceData.txt\")
      .distinct()

    // 使用sql前,先注册视图
    dataFrame.createTempView(\"v_flow\")

    // 实际开发时,一般都是分步骤执行,这样可读性更好
    val dataFrame1: DataFrame = sparkSession.sql(
      \"\"\"
        |select
        |uid,
        |min(start_time) as min_date,
        |max(end_time) as end_time,
        |sum(flow) as total_flow
        |from
        |(
        |  select
        |  uid,
        |  start_time,
        |  end_time,
        |  flow,
        |  sum(flag) over (partition by uid order by start_time asc) as sum_flag
        |  from
        |  (
        |    select
        |    uid,
        |    start_time,
        |    end_time,
        |    flow,
        |    if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
        |    from
        |    (
        |      select
        |      uid,
        |      start_time,
        |      end_time,
        |      flow,
        |      lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
        |      from
        |      (
        |        select
        |        uid,
        |        start_time,
        |        end_time,
        |        flow
        |        from
        |        v_flow
        |      )
        |    )
        |  )
        |)
        |group by uid, sum_flag
        |\"\"\".stripMargin)

    dataFrame1.show()

    sparkSession.close()
  }
}

/*
*select
uid,
start_time,
end_time,
flow
from
v_flow


select
uid,
start_time,
end_time,
flow,
lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
from
(
  select
  uid,
  start_time,
  end_time,
  flow
  from
  v_flow
)

-- to_unix_timestamp(\'2016-04-08\', \'yyyy-MM-dd\');
select
uid,
start_time,
end_time,
flow,
if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
from
(
  select
  uid,
  start_time,
  end_time,
  flow,
  lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
  from
  (
    select
    uid,
    start_time,
    end_time,
    flow
    from
    v_flow
  )
)


select
uid,
start_time,
end_time,
flow,
sum(flag) over (partition by uid order by start_time asc) as sum_flag
from
(
  select
  uid,
  start_time,
  end_time,
  flow,
  if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
  from
  (
    select
    uid,
    start_time,
    end_time,
    flow,
    lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
    from
    (
      select
      uid,
      start_time,
      end_time,
      flow
      from
      v_flow
    )
  )
)

select
uid,
min(start_time) as min_date,
max(end_time) as end_time,
sum(flow) as total_flow
from
(
  select
  uid,
  start_time,
  end_time,
  flow,
  sum(flag) over (partition by uid order by start_time asc) as sum_flag
  from
  (
    select
    uid,
    start_time,
    end_time,
    flow,
    if( to_unix_timestamp(start_time, \'yyyy-MM-dd HH:mm:ss\') - to_unix_timestamp(last_end_time, \'yyyy-MM-dd HH:mm:ss\') > 60*10, 1, 0) as flag
    from
    (
      select
      uid,
      start_time,
      end_time,
      flow,
      lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
      from
      (
        select
        uid,
        start_time,
        end_time,
        flow
        from
        v_flow
      )
    )
  )
)
group by uid, sum_flag
*
* */

2.3 总结

  1. 在企业开发中,因为sql一般都会嵌套多个子查询,所以SQL的可维护性很重要。
  2. SQL也是可以写注释的,可以的化,尽量写上注释
  3. SQL可以格式化,嵌套时,使用空格将这些sql语句尽量格式化出来
  4. 代码可维护性,可读性是非常重要的,一定一定注意。

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号