特征抽取并转换成Spark或Flink执行计划的思路
Posted farAway
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了特征抽取并转换成Spark或Flink执行计划的思路相关的知识,希望对你有一定的参考价值。
一、特征抽取并转换成Spark或Flink执行计划的思路
1、执行过程抽象:
- 读取并解析配置文件:Text → Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]>;其中 feature key:特征名,aggreagate key:聚合的主键,rule text:特征的计算逻辑,即算子链路;
- 将RuleText转换成Expression:Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]> → Seq<FeatureConf[FeatureKey, AggregateKey, Expression]>;其中Expression为Spark或Flink中的Expression表达式对象;
- 合并相同聚合维度的逻辑:Seq<FeatureConf[FeatureKey, AggregateKey, Expression]> → Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]>;
- 遍历生成执行计划:Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]> → SparkPlan & FlinkPlan;
2、Spark & Flink中将RuleText转换成Expression方法
Spark:SparkSqlParser.parseExpression("count(*)")
Flink:ExpressionParser.parseExpression("count(*)")
3、Spark & Flink中如何支持自定义算子
Spark:spark.udf.register("len", (_:String).length)
Flink:env.registerFunction("log1", new Log)
4、代码示例:
Spark:
package com.yang.spark.framework
import com.yang.spark.framework.udf.{Median, StandardDeviation}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
/**
*
* @author yangfan
* @since 2021/5/24
* @version 1.0.0
*/
object AggregateTestDataFrame {
def main(args: Array[String]): Unit = {
val source = Seq(
("100", "100"),
("100", "101"),
("100", "105"),
("101", "102"),
("102", "103")
)
val groupKeys = Array("id1")
val functions = Array("id2" -> "max", "id2" -> "min", "id2" -> "std")
val spark = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.config(new SparkConf())
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
spark.udf.register("median", new Median)
spark.udf.register("std", new StandardDeviation)
spark.udf.register("len", (_: String).length)
val inputDF = spark.sparkContext.makeRDD(source).toDF("id1", "id2")
val w = Window.partitionBy("name").orderBy(sum(when(expr("id2/2 > 50"), $"id2")).desc)
inputDF
//.groupBy(groupKeys.head, groupKeys.tail:_*)
.groupBy("id1")
//.agg(functions.head, functions.tail:_*)
/*.agg(
sum(when(expr("id2/2 > 50"), $"id2")).alias("fk1"),
expr("id2/2 > 50").alias("fk2"),
sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),
avg("price").over(w.rowsBetween(Window.currentRow, 4)),
row_number().over(w.rowsBetween(Window.currentRow, 4))
)*/
.agg(
expr("count(*) as fk1"),
expr("sum(cnt) as fk2"),
expr("distinct(count(cnt)) as fk3")
)
.show()
}
}
Flink:
package com.yang.stream
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Tumble}
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.types.Row
/**
*
* @author yangfan
* @since 2021/5/24
* @version 1.0.0
*/
object TableApiTest {
def main(args: Array[String]): Unit = {
val host: String = "127.0.0.1"
val port: Int = 8888
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val inputDStream = env.socketTextStream(host, port)
//inputDStream.print("input").setParallelism(1)
val dataStream = inputDStream.flatMap(_.split("\\\\s"))
//dataStream.print("input stream").setParallelism(1)
val table = tableEnv.fromDataStream(dataStream)
/*tableEnv.registerFunction("log1", new Log)
val table1 = table.addColumns("log1(10) as t1").select("t1")
val value = tableEnv.toAppendStream[Row](table1)
println(tableEnv.explain(table1))
value.print("table1")*/
//tableEnv.registerFunction("log1", new Log)
val table1 = table.as("id")
.addColumns("1 as cnt")
.window(Tumble over 5.minutes on \'rowtime as \'w)
.groupBy(\'id)
//.aggregate("sum(cnt) as sum_cnt")
//.select("id, sum(cnt) as sum_cnt, distinct(count(log1(cnt))) as count_cnt")
//.select(\'id, \'cnt.sum as \'sum_cnt, \'cnt.count.distinct as \'count_cnt)
.select(
expr("id"),
expr("count(*) as fk1"),
expr("sum(cnt) as fk2"),
expr("distinct(count(cnt)) as fk3")
)
val value = tableEnv.toRetractStream[Row](table1)
println(table1.getSchema)
println(tableEnv.explain(table1))
value.print("table1")
env.execute(this.getClass.getName)
}
private final def expr(expr: String) = ExpressionParser.parseExpression(expr)
}
以上是关于特征抽取并转换成Spark或Flink执行计划的思路的主要内容,如果未能解决你的问题,请参考以下文章