特征抽取并转换成Spark或Flink执行计划的思路

Posted farAway

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了特征抽取并转换成Spark或Flink执行计划的思路相关的知识,希望对你有一定的参考价值。

一、特征抽取并转换成Spark或Flink执行计划的思路

1、执行过程抽象:

  1. 读取并解析配置文件:Text → Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]>;其中 feature key:特征名,aggreagate key:聚合的主键,rule text:特征的计算逻辑,即算子链路;
  2. 将RuleText转换成Expression:Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]> → Seq<FeatureConf[FeatureKey, AggregateKey, Expression]>;其中Expression为Spark或Flink中的Expression表达式对象;
  3. 合并相同聚合维度的逻辑:Seq<FeatureConf[FeatureKey, AggregateKey, Expression]> → Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]>;
  4. 遍历生成执行计划:Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]> → SparkPlan & FlinkPlan;

2、Spark & Flink中将RuleText转换成Expression方法

Spark:SparkSqlParser.parseExpression("count(*)")
Flink:ExpressionParser.parseExpression("count(*)")

3、Spark & Flink中如何支持自定义算子

Spark:spark.udf.register("len", (_:String).length)
Flink:env.registerFunction("log1", new Log)

4、代码示例:

Spark:

package com.yang.spark.framework
 
import com.yang.spark.framework.udf.{Median, StandardDeviation}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
 
/**
  *
  * @author yangfan
  * @since 2021/5/24
  * @version 1.0.0
  */
object AggregateTestDataFrame {
 
  def main(args: Array[String]): Unit = {
 
 
    val source = Seq(
      ("100", "100"),
      ("100", "101"),
      ("100", "105"),
      ("101", "102"),
      ("102", "103")
    )
     
    val groupKeys = Array("id1")
     
    val functions = Array("id2" -> "max", "id2" -> "min", "id2" -> "std")
 
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .config(new SparkConf())
      //.enableHiveSupport()
      .getOrCreate()
    import spark.implicits._
 
    spark.udf.register("median", new Median)
    spark.udf.register("std", new StandardDeviation)
    spark.udf.register("len", (_: String).length)
 
    val inputDF = spark.sparkContext.makeRDD(source).toDF("id1", "id2")
 
    val w = Window.partitionBy("name").orderBy(sum(when(expr("id2/2 > 50"), $"id2")).desc)
    inputDF
      //.groupBy(groupKeys.head, groupKeys.tail:_*)
      .groupBy("id1")
      //.agg(functions.head, functions.tail:_*)
      /*.agg(
        sum(when(expr("id2/2 > 50"), $"id2")).alias("fk1"),
        expr("id2/2 > 50").alias("fk2"),
        sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),
        avg("price").over(w.rowsBetween(Window.currentRow, 4)),
        row_number().over(w.rowsBetween(Window.currentRow, 4))
      )*/
      .agg(
        expr("count(*) as fk1"),
        expr("sum(cnt) as fk2"),
        expr("distinct(count(cnt)) as fk3")
      )
      .show()
  }
}

Flink:

package com.yang.stream
 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Tumble}
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.types.Row
 
/**
  *
  * @author yangfan
  * @since 2021/5/24
  * @version 1.0.0
  */
object TableApiTest {
 
  def main(args: Array[String]): Unit = {
    val host: String = "127.0.0.1"
    val port: Int = 8888
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env, settings)
 
    val inputDStream = env.socketTextStream(host, port)
    //inputDStream.print("input").setParallelism(1)
 
    val dataStream = inputDStream.flatMap(_.split("\\\\s"))
    //dataStream.print("input stream").setParallelism(1)
 
    val table = tableEnv.fromDataStream(dataStream)
 
    /*tableEnv.registerFunction("log1", new Log)
    val table1 = table.addColumns("log1(10) as t1").select("t1")
    val value = tableEnv.toAppendStream[Row](table1)
    println(tableEnv.explain(table1))
    value.print("table1")*/
 
    //tableEnv.registerFunction("log1", new Log)
    val table1 = table.as("id")
      .addColumns("1 as cnt")
      .window(Tumble over 5.minutes on \'rowtime as \'w)
      .groupBy(\'id)
      //.aggregate("sum(cnt) as sum_cnt")
      //.select("id, sum(cnt) as sum_cnt, distinct(count(log1(cnt))) as count_cnt")
      //.select(\'id, \'cnt.sum as \'sum_cnt, \'cnt.count.distinct as \'count_cnt)
      .select(
        expr("id"),
        expr("count(*) as fk1"),
        expr("sum(cnt) as fk2"),
        expr("distinct(count(cnt)) as fk3")
      )
    val value = tableEnv.toRetractStream[Row](table1)
    println(table1.getSchema)
    println(tableEnv.explain(table1))
    value.print("table1")
 
    env.execute(this.getClass.getName)
  }
 
  private final def expr(expr: String) = ExpressionParser.parseExpression(expr)
}

以上是关于特征抽取并转换成Spark或Flink执行计划的思路的主要内容,如果未能解决你的问题,请参考以下文章

spark学习之执行计划explain

什么是数据抽取

特征抽取 — TF-IDF

自然语言处理三大特征抽取器比较

获取minist数据并转换成lmdb

Stream SQL 的执行原理与 Flink 的实现