Prayer X 2020-09-04 19:26 采纳率: 0%
浏览 227

关于Flink中窗口计算的Reduce

我写了一个wordcount,想用窗口一直进行累加。但是写出来reduce那里一直报错,请问是哪里错了?

object FlinkDemoSource1 {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    val conf = new Configuration()
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val input: DataStream[String] = env.socketTextStream("localhost", 6666)

    input.flatMap(_.split(" ")).map((_, 1)).keyBy(_._1)
      .timeWindow(Time.seconds(10))
      .reduce((x, y) => {
      (x._1,x._2 + y._2)}
    ,new AssignWindowEndProcessFunction())


    count.print()
    env.execute("wordcount")
  }
}

class AssignWindowEndProcessFunction extends ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[Iterable[(String, Int)]]): Unit = {

    var tmp:Int  = 0;
    for (i <- elements){
      tmp += i._2
    }
    out.collect((key,tmp))
  }
}
  • 写回答

1条回答 默认 最新

报告相同问题?