mapGroupsWithState

  /**
   * ::Experimental::
   * (Scala-specific)
   * Applies the given function to each group of data, while maintaining a user-defined per-group
   * state. The result Dataset will represent the objects returned by the function.
   * For a static batch Dataset, the function will be invoked once per group. For a streaming
   * Dataset, the function will be invoked for each group repeatedly in every trigger, and
   * updates to each group's state will be saved across invocations.
   * See [[org.apache.spark.sql.streaming.GroupState]] for more details.
   *
   * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
   * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
   * @param func Function to be called on every group.
   * @param timeoutConf Timeout configuration for groups that do not receive data for a while.
   *
   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
   * @since 2.2.0
   */
  @Experimental
  @InterfaceStability.Evolving
  def mapGroupsWithState[S: Encoder, U: Encoder](
      timeoutConf: GroupStateTimeout)(
      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
    val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
    Dataset[U](
      sparkSession,
      FlatMapGroupsWithState[K, V, S, U](
        flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
        groupingAttributes,
        dataAttributes,
        OutputMode.Update,
        isMapGroupsWithState = true,
        timeoutConf,
        child = logicalPlan))
  }

看这个函数的注释:

  /**
   * ::Experimental::
   * (Scala-specific)
   * Applies the given function to each group of data, while maintaining a user-defined per-group
   * state. The result Dataset will represent the objects returned by the function.
   * For a static batch Dataset, the function will be invoked once per group. For a streaming
   * Dataset, the function will be invoked for each group repeatedly in every trigger, and
   * updates to each group's state will be saved across invocations.
   * See [[org.apache.spark.sql.streaming.GroupState]] for more details.
   *
   * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
   * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
   * @param func Function to be called on every group.
   * @param timeoutConf Timeout configuration for groups that do not receive data for a while.
   *
   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
   * @since 2.2.0
   */

这个函数还是实验性质的,并且是 Scala 特定的,Spark 2.2.0 以后才支持这个 API。它还是个柯里化函数。

将给定的函数应用于每组数据,同时维护每个组的用户定义状态。结果数据集将代表该函数返回的对象。对于静态批数据集,每组调用一次函数。对于流数据集,该函数将在每次触发时为每个组重复调用,并且对每组状态的更新将跨调用保存。

参数

  • @tparam S 用户定义状态的类型。必须编码为 Spark SQL 类型。
  • @tparam U 输出对象的类型。必须编码为 Saprk SQL 类型。
  • @param func 每个组上调用的函数。
  • @param timeoutConf 在一段时间内不接收数据的组的超时配置。

注意 tparam 带了个字母 t, 说明这个参数是类型参数。 mapGroupsWithState 这个函数真正接收的是 2 个参数,一个是超时时间, 一个是 func 函数。

func: (K, Iterator[V], GroupState[S]) => U)

看一下这个函数的签名,函数的类型是含有三个元素的元组,函数的返回值是一个 U, 代表输出对象的类型。重点来看一下 func 这个函数的接收的参数:

  • K 明显是一个类型,例如 Int, String,用作 groupByKey 中的键的类型。
  • Iterator[V] V incoming messages of type V
  • GroupState[S] S 输出对象的类型

我们来看一下 func

  def updateSessionEvents(
      id: Int,
      userEvents: Iterator[UserEvent],
      state: GroupState[UserSession]): Option[UserSession] = { ... }

看到 func 函数里面的参数都是普通的参数, 就是带了类型而已。但是 func: (K, Iterator[V], GroupState[S]) => U) 里面的参数,看着好像都是类型参数,而不是普通的函数参数。