mapGroupsWithState
/**
* ::Experimental::
* (Scala-specific)
* Applies the given function to each group of data, while maintaining a user-defined per-group
* state. The result Dataset will represent the objects returned by the function.
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
* See [[org.apache.spark.sql.streaming.GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
* @param func Function to be called on every group.
* @param timeoutConf Timeout configuration for groups that do not receive data for a while.
*
* See [[Encoder]] for more details on what types are encodable to Spark SQL.
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
Dataset[U](
sparkSession,
FlatMapGroupsWithState[K, V, S, U](
flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
groupingAttributes,
dataAttributes,
OutputMode.Update,
isMapGroupsWithState = true,
timeoutConf,
child = logicalPlan))
}
看这个函数的注释:
/**
* ::Experimental::
* (Scala-specific)
* Applies the given function to each group of data, while maintaining a user-defined per-group
* state. The result Dataset will represent the objects returned by the function.
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
* See [[org.apache.spark.sql.streaming.GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
* @param func Function to be called on every group.
* @param timeoutConf Timeout configuration for groups that do not receive data for a while.
*
* See [[Encoder]] for more details on what types are encodable to Spark SQL.
* @since 2.2.0
*/
这个函数还是实验性质的,并且是 Scala 特定的,Spark 2.2.0 以后才支持这个 API。它还是个柯里化函数。
将给定的函数应用于每组数据,同时维护每个组的用户定义状态。结果数据集将代表该函数返回的对象。对于静态批数据集,每组调用一次函数。对于流数据集,该函数将在每次触发时为每个组重复调用,并且对每组状态的更新将跨调用保存。
参数
- @tparam S 用户定义状态的类型。必须编码为 Spark SQL 类型。
- @tparam U 输出对象的类型。必须编码为 Saprk SQL 类型。
- @param func 每个组上调用的函数。
- @param timeoutConf 在一段时间内不接收数据的组的超时配置。
注意 tparam
带了个字母 t, 说明这个参数是类型参数。 mapGroupsWithState 这个函数真正接收的是 2 个参数,一个是超时时间, 一个是 func 函数。
func: (K, Iterator[V], GroupState[S]) => U)
看一下这个函数的签名,函数的类型是含有三个元素的元组,函数的返回值是一个 U
, 代表输出对象的类型。重点来看一下 func 这个函数的接收的参数:
- K 明显是一个类型,例如 Int, String,用作 groupByKey 中的键的类型。
- Iterator[V] V incoming messages of type V
- GroupState[S] S 输出对象的类型
我们来看一下 func
def updateSessionEvents(
id: Int,
userEvents: Iterator[UserEvent],
state: GroupState[UserSession]): Option[UserSession] = { ... }
看到 func 函数里面的参数都是普通的参数, 就是带了类型而已。但是 func: (K, Iterator[V], GroupState[S]) => U)
里面的参数,看着好像都是类型参数,而不是普通的函数参数。