发布时间:2024-01-28 11:00
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
Spark SQL functions.scala 源码解析(一)Sort functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(二)Aggregate functions(基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(三)Window functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(四)Non-aggregate functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(五)Math Functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(六)Misc functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(七)String functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(八)DateTime functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(九)Collection functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(十)Partition transform functions(基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(十一)Scala UDF functions(基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(十二)Java UDF functions(基于 Spark 3.3.0)
Spark SQL 内置函数(三)Date and Timestamp Functions(基于 Spark 3.2.0)
/**
* @since 1.5.0
*/
def add_months(startDate: Column, numMonths: Int): Column = add_months(startDate, lit(numMonths))
/**
* @since 3.0.0
*/
def add_months(startDate: Column, numMonths: Column): Column = withExpr {
AddMonths(startDate.expr, numMonths.expr)
}
/**
*
* @since 1.5.0
*/
def current_date(): Column = withExpr { CurrentDate() }
/**
* @since 1.5.0
*/
def current_timestamp(): Column = withExpr { CurrentTimestamp() }
/**
* @since 3.3.0
*/
def localtimestamp(): Column = withExpr { LocalTimestamp() }
/**
* @since 1.5.0
*/
def date_format(dateExpr: Column, format: String): Column = withExpr {
DateFormatClass(dateExpr.expr, Literal(format))
}
/**
* @since 1.5.0
*/
def date_add(start: Column, days: Int): Column = date_add(start, lit(days))
/**
* @since 3.0.0
*/
def date_add(start: Column, days: Column): Column = withExpr { DateAdd(start.expr, days.expr) }
* @since 1.5.0
*/
def date_sub(start: Column, days: Int): Column = date_sub(start, lit(days))
/**
* @since 3.0.0
*/
def date_sub(start: Column, days: Column): Column = withExpr { DateSub(start.expr, days.expr) }
/**
* @since 1.5.0
*/
def datediff(end: Column, start: Column): Column = withExpr { DateDiff(end.expr, start.expr) }
// returns 1
dateddiff(\"2018-01-10 00:00:00\", \"2018-01-09 23:59:59\")
/**
* @since 1.5.0
*/
def year(e: Column): Column = withExpr { Year(e.expr) }
/**
* @since 1.5.0
*/
def quarter(e: Column): Column = withExpr { Quarter(e.expr) }
/**
* @since 1.5.0
*/
def month(e: Column): Column = withExpr { Month(e.expr) }
/**
* @since 2.3.0
*/
def dayofweek(e: Column): Column = withExpr { DayOfWeek(e.expr) }
/**
* @since 1.5.0
*/
def dayofmonth(e: Column): Column = withExpr { DayOfMonth(e.expr) }
/**
* @since 1.5.0
*/
def dayofyear(e: Column): Column = withExpr { DayOfYear(e.expr) }
/**
* @since 1.5.0
*/
def hour(e: Column): Column = withExpr { Hour(e.expr) }
/**
* @since 1.5.0
*/
def last_day(e: Column): Column = withExpr { LastDay(e.expr) }
/**
* @since 1.5.0
*/
def minute(e: Column): Column = withExpr { Minute(e.expr) }
/**
* @since 3.3.0
*/
def make_date(year: Column, month: Column, day: Column): Column = withExpr {
MakeDate(year.expr, month.expr, day.expr)
}
基于年、月和日字段创建的日期。
/**
* @since 1.5.0
*/
def months_between(end: Column, start: Column): Column = withExpr {
new MonthsBetween(end.expr, start.expr)
}
months_between(\"2017-11-14\", \"2017-07-14\") // returns 4.0
months_between(\"2017-01-01\", \"2017-01-10\") // returns 0.29032258
months_between(\"2017-06-01\", \"2017-06-16 12:00:00\") // returns -0.5
/**
* @since 2.4.0
*/
def months_between(end: Column, start: Column, roundOff: Boolean): Column = withExpr {
MonthsBetween(end.expr, start.expr, lit(roundOff).expr)
}
/**
* @since 1.5.0
*/
def next_day(date: Column, dayOfWeek: String): Column = next_day(date, lit(dayOfWeek))
/**
* @since 3.2.0
*/
def next_day(date: Column, dayOfWeek: Column): Column = withExpr {
NextDay(date.expr, dayOfWeek.expr)
}
/**
* @since 1.5.0
*/
def second(e: Column): Column = withExpr { Second(e.expr) }
/**
* @since 1.5.0
*/
def weekofyear(e: Column): Column = withExpr { WeekOfYear(e.expr) }
国际标准化组织的国际标准ISO 8601是日期和时间的表示方法,全称为《数据存储和交换形式·信息交换·日期和时间的表示方法》。最新为第三版ISO8601:2004,第一版为ISO8601:1988,第二版为ISO8601:2000。可以用2位数表示年内第几个日历星期,再加上一位数表示日历星期内第几天,但日历星期前要加上一个大写字母W,如2004年5月3日可写成2004-W19-1或2004W191。但2005-W011是从2005年1月3日开始的,前几天属于上年的第53个日历星期。每个日历星期从星期一开始,星期日为第7天。
第一个日历星期有以下四种等效说法:
1,本年度第一个星期四所在的星期;
2,1月4日所在的星期;
3,本年度第一个至少有4天在同一星期内的星期;
4,星期一在去年12月29日至今年1月4日以内的星期;
推理可得,如果1月1日是星期一、星期二、星期三或者星期四,它所在的星期就是第一个日历星期;如果1月1日是星期五、星期六或者星期日,它所在的星期就是上一年第52或者53个日历星期;12月28日总是在一年最后一个日历星期。
/**
* @since 1.5.0
*/
def from_unixtime(ut: Column): Column = withExpr {
FromUnixTime(ut.expr, Literal(TimestampFormatter.defaultPattern))
}
* @since 1.5.0
*/
def from_unixtime(ut: Column, f: String): Column = withExpr {
FromUnixTime(ut.expr, Literal(f))
}
一个字符串,如果ut是一个无法转换为 long 的字符串,或者f是一个无效的日期时间模式,则返回 null
/**
* @group datetime_funcs
* @since 1.5.0
*/
def unix_timestamp(): Column = withExpr {
UnixTimestamp(CurrentTimestamp(), Literal(TimestampFormatter.defaultPattern))
}
/**
* @since 1.5.0
*/
def unix_timestamp(s: Column): Column = withExpr {
UnixTimestamp(s.expr, Literal(TimestampFormatter.defaultPattern))
}
/**
* @since 1.5.0
*/
def unix_timestamp(s: Column, p: String): Column = withExpr { UnixTimestamp(s.expr, Literal(p)) }
/**
* @since 2.2.0
*/
def to_timestamp(s: Column): Column = withExpr {
new ParseToTimestamp(s.expr)
}
/**
* @since 2.2.0
*/
def to_timestamp(s: Column, fmt: String): Column = withExpr {
new ParseToTimestamp(s.expr, Literal(fmt))
}
/**
* @since 1.5.0
*/
def to_date(e: Column): Column = withExpr { new ParseToDate(e.expr) }
/**
* @since 2.2.0
*/
def to_date(e: Column, fmt: String): Column = withExpr {
new ParseToDate(e.expr, Literal(fmt))
}
/**
* @since 1.5.0
*/
def trunc(date: Column, format: String): Column = withExpr {
TruncDate(date.expr, Literal(format))
}
/**
* @since 2.3.0
*/
def date_trunc(format: String, timestamp: Column): Column = withExpr {
TruncTimestamp(Literal(format), timestamp.expr)
}
/**
* @since 1.5.0
*/
def from_utc_timestamp(ts: Column, tz: String): Column = withExpr {
FromUTCTimestamp(ts.expr, Literal(tz))
}
/**
* @since 2.4.0
*/
def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
FromUTCTimestamp(ts.expr, tz.expr)
}
/**
* @since 1.5.0
*/
def to_utc_timestamp(ts: Column, tz: String): Column = withExpr {
ToUTCTimestamp(ts.expr, Literal(tz))
}
/**
* @since 2.4.0
*/
def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
ToUTCTimestamp(ts.expr, tz.expr)
}
/**
* @since 2.0.0
*/
def window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String): Column = {
withExpr {
TimeWindow(timeColumn.expr, windowDuration, slideDuration, startTime)
}.as(\"window\")
}
以下是一个平均股价的例子,每10秒种一个一分钟窗口,从当前小时开始后5秒开始计算:
val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
df.groupBy(window($\"timestamp\", \"1 minute\", \"10 seconds\", \"5 seconds\"), $\"stockId\").agg(mean(\"price\"))
窗口将看起来像:
09:00:05-09:01:05
09:00:15-09:01:15
09:00:25-09:01:25
...
对于流式查询,可以使用函数current_timestamp
生成处理时间窗口。
TimestampType
。10 minutes
,1 second
。查看org.apache.spark.unsafe.types.CalendarInterval
获取有效的持续时间(duration
)标识符。请注意,持续时间(duration
)是一个固定的时间长度,不会随日历的不同而变化。例如,1天总是意味着86400000毫秒,而不是日历日。windowDuration
。查看org.apache.spark.unsafe.types.CalendarInterval
获取有效的持续时间(duration
)标识符。这一持续时间(duration
)同样是绝对的,不会因日历而异。1970-01-01 00:00:00 UTC
的偏移量,用于开始窗口间隔。例如,为了使每小时滚动窗口在一小时后15分钟开始,例如12:15-13:15,13:15-14:15。。。提供15分钟的启动时间。
CalendarInterval
表示日历间隔的类。日历间隔内部存储在三个组件中: 一个整数值,表示此间隔内的“月”数; 一个整数值,表示此间隔内的“天”数; 一个长整数值,表示此间隔中的“微秒”数。 “月”和“天”不是长度不变的时间单位(不同于小时、秒),因此它们是两个与微秒分开的字段。一个月可能等于28、29、30或31天,一天可能等于23、24或25小时(夏令时)。
/**
* @since 2.0.0
*/
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column = {
window(timeColumn, windowDuration, slideDuration, \"0 second\")
}
以下是一个平均股价的例子,每10秒种一个一分钟窗口,从当前小时开始后5秒开始计算:
val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
df.groupBy(window($\"timestamp\", \"1 minute\", \"10 seconds\"), $\"stockId\").agg(mean(\"price\"))
窗口将看起来像:
09:00:00-09:01:00
09:00:10-09:01:10
09:00:20-09:01:20
...
对于流式查询,可以使用函数current_timestamp
生成处理时间窗口。
TimestampType
。10 minutes
,1 second
。查看org.apache.spark.unsafe.types.CalendarInterval
获取有效的持续时间(duration
)标识符。请注意,持续时间(duration
)是一个固定的时间长度,不会随日历的不同而变化。例如,1天总是意味着86400000毫秒,而不是日历日。windowDuration
。查看org.apache.spark.unsafe.types.CalendarInterval
获取有效的持续时间(duration
)标识符。这一持续时间(duration
)同样是绝对的,不会因日历而异。 /**
* @since 2.0.0
*/
def window(timeColumn: Column, windowDuration: String): Column = {
window(timeColumn, windowDuration, windowDuration, \"0 second\")
}
以下是一个平均股价的例子,每10秒种一个一分钟窗口,从当前小时开始后5秒开始计算:
val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
df.groupBy(window($\"timestamp\", \"1 minute\"), $\"stockId\").agg(mean(\"price\"))
窗口将看起来像:
09:00:00-09:01:00
09:01:00-09:02:00
09:02:00-09:03:00
...
对于流式查询,可以使用函数current_timestamp
生成处理时间窗口。
TimestampType
。10 minutes
,1 second
。查看org.apache.spark.unsafe.types.CalendarInterval
获取有效的持续时间(duration
)标识符。请注意,持续时间(duration
)是一个固定的时间长度,不会随日历的不同而变化。例如,1天总是意味着86400000毫秒,而不是日历日。windowDuration
。查看org.apache.spark.unsafe.types.CalendarInterval
获取有效的持续时间(duration
)标识符。这一持续时间(duration
)同样是绝对的,不会因日历而异。 /**
* @since 3.2.0
*/
def session_window(timeColumn: Column, gapDuration: String): Column = {
withExpr {
SessionWindow(timeColumn.expr, gapDuration)
}.as(\"session_window\")
}
gapDuration
。current_timestamp
生成处理时间窗口。TimestampType
。10 minutes
, 1 second
。查看org.apache.spark.unsafe.types.CalendarInterval
获取有效的持续时间(duration
)标识符。这一持续时间(duration
)同样是绝对的,不会因日历而异。 /**
* @since 3.2.0
*/
def session_window(timeColumn: Column, gapDuration: Column): Column = {
withExpr {
SessionWindow(timeColumn.expr, gapDuration.expr)
}.as(\"session_window\")
}
gapDuration
。current_timestamp
生成处理时间窗口。TimestampType
。10 minutes
, 1 second
,也可以是基于输入行动态指定间隙持续时间的表达式/UDF。 /**
* Creates timestamp from the number of seconds since UTC epoch.
* @group datetime_funcs
* @since 3.1.0
*/
def timestamp_seconds(e: Column): Column = withExpr {
SecondsToTimestamp(e.expr)
}