Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder
Posted yy3b2007com
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder相关的知识,希望对你有一定的参考价值。
一般情况下我们在使用Dataset<Row>进行groupByKey时,你会发现这个方法最后一个参数需要一个encoder,那么这些encoder如何定义呢?
一般数据类型
static Encoder<byte[]> BINARY() An encoder for arrays of bytes. static Encoder<Boolean> BOOLEAN() An encoder for nullable boolean type. static Encoder<Byte> BYTE() An encoder for nullable byte type. static Encoder<java.sql.Date> DATE() An encoder for nullable date type. static Encoder<java.math.BigDecimal> DECIMAL() An encoder for nullable decimal type. static Encoder<Double> DOUBLE() An encoder for nullable double type. static Encoder<Float> FLOAT() An encoder for nullable float type. static Encoder<Integer> INT() An encoder for nullable int type. static Encoder<Long> LONG() An encoder for nullable long type. static Encoder<Short> SHORT() An encoder for nullable short type. static Encoder<String> STRING() An encoder for nullable string type. static Encoder<java.sql.Timestamp> TIMESTAMP() An encoder for nullable timestamp type.
示例:
== Scala == Encoders are generally created automatically through implicits from a SparkSession, or can be explicitly created by calling static methods on Encoders. import spark.implicits._ val ds = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder) == Java == Encoders are specified by calling static methods on Encoders. List<String> data = Arrays.asList("abc", "abc", "xyz"); Dataset<String> ds = context.createDataset(data, Encoders.STRING());
Class类型:
Or constructed from Java Beans: Encoders.bean(MyClass.class);
Tuple类型:
一般类型的Tuple
Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING()); List<Tuple2<Integer, String>> data2 = Arrays.asList(new scala.Tuple2(1, "a"); Dataset<Tuple2<Integer, String>> ds2 = context.createDataset(data2, encoder2);
Tuple包含类的:
Encoder<Tuple2<String, MyClass>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.bean(MyClass.class));
关于Encoder请参考《http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoder.html》
关于Encoders请参考《http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoders.html》
以上是关于Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder的主要内容,如果未能解决你的问题,请参考以下文章
Kafka:ZK+Kafka+Spark Streaming集群环境搭建安装zookeeper-3.4.12
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十七)待整理
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十九)待整理
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)kafka+spark streaming打包好的程序提交时提示虚拟内存不足(Container is running
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数