Skip to content

Latest commit

 

History

History
710 lines (616 loc) · 15.6 KB

datasetFunctionScala.md

File metadata and controls

710 lines (616 loc) · 15.6 KB

#一、Flink DateSet定制API详解(Scala版) ##Map

以element为粒度,对element进行1:1的转化

####执行程序:

package code.book.batch.dataset.advance.api

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object MapFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,并创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements("flink vs spark", "buffer vs  shuffer")

    // 2.以element为粒度,将element进行map操作,转化为大写并添加后缀字符串"--##bigdata##"
    val text2 = text.map(new MapFunction[String, String] {
      override def map(s: String): String = s.toUpperCase() + "--##bigdata##"
    })
    text2.print()

    // 3.以element为粒度,将element进行map操作,转化为大写并,并计算line的长度。
    val text3 = text.map(new MapFunction[String, (String, Int)] {
      override def map(s: String): (String, Int) = (s.toUpperCase(), s.length)
    })
    text3.print()

    // 4.以element为粒度,将element进行map操作,转化为大写并,并计算line的长度。
    //4.1定义class
    case class Wc(line: String, lenght: Int)
    //4.2转化成class类型
    val text4 = text.map(new MapFunction[String, Wc] {
      override def map(s: String): Wc = Wc(s.toUpperCase(), s.length)
    })
    text4.print()
  }
}

####执行结果:

text2.print();
FLINK VS SPARK--##bigdata##
BUFFER VS  SHUFFER--##bigdata##

text3.print();
(FLINK VS SPARK,14)
(BUFFER VS  SHUFFER,18)

text4.print();
Wc(FLINK VS SPARK,14)
Wc(BUFFER VS  SHUFFER,18)

##mapPartition

以partition为粒度,对element进行1:1的转化。有时候会比map效率高。

####执行程序:

package code.book.batch.dataset.advance.api

import java.lang.Iterable
import org.apache.flink.api.common.functions.{MapFunction, MapPartitionFunction}
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector

object MapPartitionFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements("flink vs spark", "buffer vs  shuffer")

    //2.以partition为粒度,进行map操作,计算element个数
    val text2 = text.mapPartition(new MapPartitionFunction[String, Long]() {
      override def mapPartition(iterable: Iterable[String], collector: Collector[Long]): Unit = {
        var c = 0
        val itor = iterable.iterator()
        while (itor.hasNext) {
          itor.next()
          c = c + 1
        }
        collector.collect(c)
      }
    })
    text2.print()
    //3.以partition为粒度,进行map操作,转化element内容
    val text3 = text.mapPartition(partitionMapper = new MapPartitionFunction[String, String]() {
      override def mapPartition(iterable: Iterable[String], collector: Collector[String]): Unit = {
        val itor = iterable.iterator()
        while (itor.hasNext) {
          val line = itor.next().toUpperCase + "--##bigdata##"
          collector.collect(line)
        }
      }
    })
    text3.print()

    //4.以partition为粒度,进行map操作,转化为大写并,并计算line的长度。
    //4.1定义class
    case class Wc(line: String, lenght: Int)
    //4.2转化成class类型
    val text4 = text.mapPartition(new MapPartitionFunction[String, Wc] {
      override def mapPartition(iterable: Iterable[String], collector: Collector[Wc]): Unit = {
        val itor = iterable.iterator()
        while (itor.hasNext) {
          var s = itor.next()
          collector.collect(Wc(s.toUpperCase(), s.length))
        }
      }
    })
    text4.print()
  }
}

####执行结果:

text2.print();
2

text3.print();
FLINK VS SPARK--##bigdata##
BUFFER VS  SHUFFER--##bigdata##

text4.print();
Wc(FLINK VS SPARK,14)
Wc(BUFFER VS  SHUFFER,18)

##flatMap

以element为粒度,对element进行1:n的转化。

####执行程序:

package code.book.batch.dataset.advance.api

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object FlatMapFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,并创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements("flink vs spark", "buffer vs  shuffer")

    // 2.以element为粒度,将element进行map操作,转化为大写并添加后缀字符串"--##bigdata##"
    val text2 = text.flatMap(new FlatMapFunction[String, String]() {
      override def flatMap(s: String, collector: Collector[String]): Unit = {
        collector.collect(s.toUpperCase() + "--##bigdata##")
      }
    })
    text2.print()

    //3.对每句话进行单词切分,一个element可以转化为多个element,这里是一个line可以转化为多个Word
    //map的只能对element进行1:1转化,而flatMap可以对element进行1:n转化
    val text3 = text.flatMap {
      new FlatMapFunction[String, Array[String]] {
        override def flatMap(s: String, collector: Collector[Array[String]]): Unit = {
          val arr: Array[String] = s.toUpperCase().split("\\s+")
          collector.collect(arr)
        }
      }
    }
    //显示结果的简单写法
    text3.collect().foreach(_.foreach(println(_)))
    //实际上是先获取Array[String],再从中获取到String
    text3.collect().foreach(arr => {
      arr.foreach(token => {
        println(token)
      })
    })
  }
}

####执行结果:

text2.print()
FLINK VS SPARK--##bigdata##
BUFFER VS  SHUFFER--##bigdata##

text3.collect().foreach(_.foreach(println(_)))
FLINK
VS
SPARK
BUFFER
VS
SHUFFLE

##filter

以element为粒度,对element进行过滤操作。将满足过滤条件的element组成新的DataSet

####执行程序:

package code.book.batch.dataset.advance.api

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object FilterFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,并创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(2, 4, 7, 8, 9, 6)

    //2.对DataSet的元素进行过滤,筛选出偶数元素
    val text2 = text.filter(new FilterFunction[Int] {
      override def filter(t: Int): Boolean = {
        t % 2 == 0
      }
    })
    text2.print()

    //3.对DataSet的元素进行过滤,筛选出大于5的元素
    val text3 = text.filter(new FilterFunction[Int] {
      override def filter(t: Int): Boolean = {
        t >5
      }
    })
    text3.print()
  }
}

####执行结果:

text2.print()
2
4
8
6

text3.print()
7
8
9
6

##Reduce

以element为粒度,对element进行合并操作。最后只能形成一个结果。

####执行程序:

package code.book.batch.dataset.advance.api

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object ReduceFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,并创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(1, 2, 3, 4, 5, 6, 7)

    //2.对DataSet的元素进行合并,这里是计算累加和
    val text2 = text.reduce(new ReduceFunction[Int] {
      override def reduce(intermediateResult: Int, next: Int): Int = {
        intermediateResult + next
      }
    })
    text2.print()

    //3.对DataSet的元素进行合并,这里是计算累乘积
    val text3 = text.reduce(new ReduceFunction[Int] {
      override def reduce(intermediateResult: Int, next: Int): Int = {
        intermediateResult * next
      }
    })
    text3.print()

    //4.对DataSet的元素进行合并,逻辑可以写的很复杂
    val text4 = text.reduce(new ReduceFunction[Int] {
      override def reduce(intermediateResult: Int, next: Int): Int = {
        if (intermediateResult % 2 == 0) {
          intermediateResult + next
        } else {
          intermediateResult * next
        }
      }
    })
    text4.print()

    //5.对DataSet的元素进行合并,可以看出intermediateResult是临时合并结果,next是下一个元素
    val text5 = text.reduce(new ReduceFunction[Int] {
      override def reduce(intermediateResult: Int, next: Int): Int = {
        println("intermediateResult=" + intermediateResult + " ,next=" + next)
        intermediateResult + next
      }
    })
    text5.collect()
  }
}

####执行结果:

text2.print()
28

text3.print()
5040

text4.print()
157

text5.print()
intermediateResult=1 ,next=2
intermediateResult=3 ,next=3
intermediateResult=6 ,next=4
intermediateResult=10 ,next=5
intermediateResult=15 ,next=6
intermediateResult=21 ,next=7

##reduceGroup

对每一组的元素分别进行合并操作。与reduce类似,不过它能为每一组产生一个结果。
如果没有分组,就当作一个分组,此时和reduce一样,只会产生一个结果。

####执行程序:

package code.book.batch.dataset.advance.api

import java.lang.Iterable
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector

object GroupReduceFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,并创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(1, 2, 3, 4, 5, 6, 7)

    //2.对DataSet的元素进行分组合并,这里是计算累加和
    val text2 = text.reduceGroup(new GroupReduceFunction[Int, Int] {
      override def reduce(iterable: Iterable[Int], collector: Collector[Int]): Unit = {
        var sum = 0
        val itor = iterable.iterator()
        while (itor.hasNext) {
          sum += itor.next()
        }
        collector.collect(sum)
      }
    })
    text2.print()

    //3.对DataSet的元素进行分组合并,这里是分别计算偶数和奇数的累加和
    val text3 = text.reduceGroup(new GroupReduceFunction[Int, (Int, Int)] {
      override def reduce(iterable: Iterable[Int], collector: Collector[(Int, Int)]): Unit = {
        var sum0 = 0
        var sum1 = 0
        val itor = iterable.iterator()
        while (itor.hasNext) {
          val v = itor.next
          if (v % 2 == 0) {
            //偶数累加和
            sum0 += v
          } else {
            //奇数累加和
            sum1 += v
          }
        }
        collector.collect(sum0, sum1)
      }
    })
    text3.print()

    //4.对DataSet的元素进行分组合并,这里是对分组后的数据进行合并操作,统计每个人的工资总和(每个分组会合并出一个结果)
    val data = env.fromElements(
    ("zhangsan", 1000), ("lisi", 1001), ("zhangsan", 3000), ("lisi", 1002))
    //4.1根据name进行分组,
    val data2 = data.groupBy(0).reduceGroup(new GroupReduceFunction[(String, Int), (String, Int)]{
      override def reduce(iterable: Iterable[(String, Int)], collector: Collector[(String, Int)]):
      Unit = {
        var salary = 0
        var name = ""
        val itor = iterable.iterator()
        //4.2统计每个人的工资总和
        while (itor.hasNext) {
          val t = itor.next()
          name = t._1
          salary += t._2
        }
        collector.collect(name, salary)
      }
    })
    data2.print
  }
}

####执行结果:

text3.print()
28

text3.print()
(12,16)

data2.print
(lisi,2003)
(zhangsan,4000)

##Join

join将两个DataSet按照一定的关联度进行类似SQL中的Join操作。

####执行程序:

package code.book.batch.dataset.advance.api

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object JoinFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,并创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment

    val authors = env.fromElements(
      Tuple3("A001", "zhangsan", "[email protected]"),
      Tuple3("A001", "lisi", "[email protected]"),
      Tuple3("A001", "wangwu", "[email protected]"))
    val posts = env.fromElements(
      Tuple2("P001", "zhangsan"),
      Tuple2("P002", "lisi"),
      Tuple2("P003", "wangwu"),
      Tuple2("P004", "lisi"))
    // 2.scala中没有with方法来使用JoinFunction
    val text2 = authors.join(posts).where(1).equalTo(1)

    //3.显示结果
    text2.print()
  }
}

####执行结果:

text2.print()
((A001,wangwu,wangwu@qq.com),(P003,wangwu))
((A001,zhangsan,zhangsan@qq.com),(P001,zhangsan))
((A001,lisi,lisi@qq.com),(P002,lisi))
((A001,lisi,lisi@qq.com),(P004,lisi))

##CoGroup

将2个DataSet中的元素,按照key进行分组,一起分组2个DataSet。而groupBy值能分组一个DataSet

####执行程序:

package code.book.batch.dataset.advance.api

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object CoGroupFunction001scala {
  def main(args: Array[String]): Unit = {
    // 1.设置运行环境,并创造测试数据
    val env = ExecutionEnvironment.getExecutionEnvironment

    val authors = env.fromElements(
      Tuple3("A001", "zhangsan", "[email protected]"),
      Tuple3("A001", "lisi", "[email protected]"),
      Tuple3("A001", "wangwu", "[email protected]"))
    val posts = env.fromElements(
      Tuple2("P001", "zhangsan"),
      Tuple2("P002", "lisi"),
      Tuple2("P003", "wangwu"),
      Tuple2("P004", "lisi"))
    // 2.scala中coGroup没有with方法来使用CoGroupFunction
    val text2 = authors.coGroup(posts).where(1).equalTo(1)

    //3.显示结果
    text2.print()
  }
}

####执行结果:

text2.print()
([Lscala.Tuple3;@6c2c1385,[Lscala.Tuple2;@5f354bcf)
([Lscala.Tuple3;@3daf7722,[Lscala.Tuple2;@78641d23)
([Lscala.Tuple3;@74589991,[Lscala.Tuple2;@146dfe6)

https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#specifying-keys

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果:

##XXXX

####执行程序:

####执行结果: