sparkdataframe转换成字节流

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparkdataframe转换成字节流相关的知识,希望对你有一定的参考价值。

本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。

json字符串转DataFrame

spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:

* 若列数据全为null会用String类型

* 整数默认会用Long类型

* 浮点数默认会用Double类型 val json1 = """"a":null, "b": 23.1, "c": 1""" val json2 =

""""a":null, "b": "hello", "d": 1.2""" val ds =

spark.createDataset(Seq(json1, json2))val df = spark.read.json(ds) df.show

df.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null

|23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string

(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)

|-- d: double (nullable =true)

若指定schema会按照schema生成DF:

* schema中不存在的列会被忽略

* 可以用两种方法指定schema,StructType和String,具体对应关系看后面

*

若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常

val schema = StructType(List( StructField("a", ByteType, true), StructField("b"

, FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "b

float, c short" val df = spark.read.schema(schema).json(ds) df.show

df.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null

|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float

(nullable =true) |-- c: short (nullable = true)

json解析相关配置参数

primitivesAsString (default false): 把所有列看作string类型

prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.

allowComments (default false): 忽略json字符串中Java/C++风格的注释

allowUnquotedFieldNames (default false): 允许不加引号的列名

allowSingleQuotes (default true): 除双引号外,还允许用单引号

allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)

allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符

allowUnquotedControlChars (default false):

允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。

mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。

PERMISSIVE

:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。

如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。

DROPMALFORMED : 忽略整条损害记录

FAILFAST : 遇到损坏记录throws an exception

columnNameOfCorruptRecord

(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE

mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord

dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式.

只有日期部分(无详细时间)

timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX):

自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)

multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件

以上参数可用option方法配置:

val stringDF = spark.read.option("primitivesAsString", "true").json(ds)

stringDF.show stringDF.printSchema +----+-----+----+----+ | a| b| c| d|

+----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2|

+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string

(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)

二进制类型会自动用base64编码方式表示

‘Man’(ascci) base64编码后为:”TWFu”

val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte) val binaryDs =

spark.createDataset(Seq(byteArr))val dsWithB64 = binaryDs.withColumn("b64",

base64(col("value"))) dsWithB64.show(false) dsWithB64.printSchema

+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|

+----------+----+ root |-- value: binary (nullable =true) |-- b64: string

(nullable =true) //=================================================

dsWithB64.toJSON.show(false) +-----------------------------+ |value |

+-----------------------------+ |"value":"TWFu","b64":"TWFu"|

+-----------------------------+

//================================================= val json =

""""value":"TWFu"""" val jsonDs = spark.createDataset(Seq(json)) val binaryDF

= spark.read.schema("value binary").json(jsonDs ) binaryDF.show

binaryDF.printSchema +----------+ | value| +----------+ |[4D 61 6E]|

+----------+ root |-- value: binary (nullable =true)

指定schema示例:

以下是Spark SQL支持的所有基本类型:

val json = """"stringc":"abc", "shortc":1, "integerc":null, "longc":3,

"floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23,

"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12

11:22:22.123123"""" val ds = spark.createDataset(Seq(json)) val schema =

"stringc string, shortc short, integerc int, longc long, floatc float, doublec

double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,

datec date, timestampc timestamp" val df = spark.read.schema(schema).json(ds)

df.show(false) df.printSchema

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc

|datec |timestampc |

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11

:22:22.123|

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)

|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--

floatc: float (nullable =true) |-- doublec: double (nullable = true) |--

decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true

) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--

datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)

复合类型:

val json = """ "arrayc" : [ 1, 2, 3 ], "structc" : "strc" : "efg",

"decimalc" : 1.1 , "mapc" : "key1" : 1.2, "key2" : 1.1 """ val ds =

spark.createDataset(Seq(json))val schema = "arrayc array, structc

struct, mapc map" val df =

spark.read.schema(schema).json(ds) df.show(false) df.printSchema

+---------+--------+--------------------------+ |arrayc |structc |mapc |

+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 -> 1.2

, key2 ->1.1]| +---------+--------+--------------------------+ root |-- arrayc:

array (nullable =true) | |-- element: short (containsNull = true) |-- structc:

struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:

decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:

string | |-- value: float (valueContainsNull =true)

SparkSQL数据类型

基本类型:

DataType simpleString typeName sql defaultSize catalogString json

StringType string string STRING 20 string “string”

ShortType smallint short SMALLINT 2 smallint “short”

IntegerType int integer INT 4 int “integer”

LongType bigint long BIGINT 8 bigint “long”

FloatType float float FLOAT 4 float “float”

DoubleType double double DOUBLE 8 double “double”

DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)

“decimal(10,3)”

BooleanType boolean boolean BOOLEAN 1 boolean “boolean”

ByteType tinyint byte TINYINT 1 tinyint “byte”

BinaryType binary binary BINARY 100 binary “binary”

DateType date date DATE 4 date “date”

TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”

三个复合类型:

DataType simpleString typeName sql defaultSize catalogString json

ArrayType(IntegerType, true) array array ARRAY 4 array

“type”:”array”,”elementType”:”integer”,”containsNull”:true

MapType(StringType, LongType, true) map map MAP

28 map

“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true

StructType(StructField(“sf”, DoubleType)::Nil) struct struct

STRUCT 8 struct

“type”:”struct”,”fields”:[“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:]
参考技术A 本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。
json字符串转DataFrame
spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:
* 若列数据全为null会用String类型
* 整数默认会用Long类型
* 浮点数默认会用Double类型 val json1 = """"a":null, "b": 23.1, "c": 1""" val json2 =
""""a":null, "b": "hello", "d": 1.2""" val ds =
spark.createDataset(Seq(json1, json2))val df = spark.read.json(ds) df.show
df.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null
|23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string
(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)
|-- d: double (nullable =true)
若指定schema会按照schema生成DF:
* schema中不存在的列会被忽略
* 可以用两种方法指定schema,StructType和String,具体对应关系看后面
*
若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常
val schema = StructType(List( StructField("a", ByteType, true), StructField("b"
, FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "b
float, c short" val df = spark.read.schema(schema).json(ds) df.show
df.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null
|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float
(nullable =true) |-- c: short (nullable = true)
json解析相关配置参数
primitivesAsString (default false): 把所有列看作string类型
prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.
allowComments (default false): 忽略json字符串中Java/C++风格的注释
allowUnquotedFieldNames (default false): 允许不加引号的列名
allowSingleQuotes (default true): 除双引号外,还允许用单引号
allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)
allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符
allowUnquotedControlChars (default false):
允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。
mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。
PERMISSIVE
:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。
如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。
DROPMALFORMED : 忽略整条损害记录
FAILFAST : 遇到损坏记录throws an exception
columnNameOfCorruptRecord
(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE
mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord
dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式.
只有日期部分(无详细时间)
timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX):
自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)
multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件
以上参数可用option方法配置:
val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show stringDF.printSchema +----+-----+----+----+ | a| b| c| d|
+----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2|
+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string
(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)
二进制类型会自动用base64编码方式表示
‘Man’(ascci) base64编码后为:”TWFu”
val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte) val binaryDs =
spark.createDataset(Seq(byteArr))val dsWithB64 = binaryDs.withColumn("b64",
base64(col("value"))) dsWithB64.show(false) dsWithB64.printSchema
+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|
+----------+----+ root |-- value: binary (nullable =true) |-- b64: string
(nullable =true) //=================================================
dsWithB64.toJSON.show(false) +-----------------------------+ |value |
+-----------------------------+ |"value":"TWFu","b64":"TWFu"|
+-----------------------------+
//================================================= val json =
""""value":"TWFu"""" val jsonDs = spark.createDataset(Seq(json)) val binaryDF
= spark.read.schema("value binary").json(jsonDs ) binaryDF.show
binaryDF.printSchema +----------+ | value| +----------+ |[4D 61 6E]|
+----------+ root |-- value: binary (nullable =true)
指定schema示例:
以下是Spark SQL支持的所有基本类型:
val json = """"stringc":"abc", "shortc":1, "integerc":null, "longc":3,
"floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23,
"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12
11:22:22.123123"""" val ds = spark.createDataset(Seq(json)) val schema =
"stringc string, shortc short, integerc int, longc long, floatc float, doublec
double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,
datec date, timestampc timestamp" val df = spark.read.schema(schema).json(ds)
df.show(false) df.printSchema
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc
|datec |timestampc |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11
:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)
|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--
floatc: float (nullable =true) |-- doublec: double (nullable = true) |--
decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true
) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--
datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)
复合类型:
val json = """ "arrayc" : [ 1, 2, 3 ], "structc" : "strc" : "efg",
"decimalc" : 1.1 , "mapc" : "key1" : 1.2, "key2" : 1.1 """ val ds =
spark.createDataset(Seq(json))val schema = "arrayc array, structc
struct, mapc map" val df =
spark.read.schema(schema).json(ds) df.show(false) df.printSchema
+---------+--------+--------------------------+ |arrayc |structc |mapc |
+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 -> 1.2
, key2 ->1.1]| +---------+--------+--------------------------+ root |-- arrayc:
array (nullable =true) | |-- element: short (containsNull = true) |-- structc:
struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:
decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:
string | |-- value: float (valueContainsNull =true)
SparkSQL数据类型
基本类型:
DataType simpleString typeName sql defaultSize catalogString json
StringType string string STRING 20 string “string”
ShortType smallint short SMALLINT 2 smallint “short”
IntegerType int integer INT 4 int “integer”
LongType bigint long BIGINT 8 bigint “long”
FloatType float float FLOAT 4 float “float”
DoubleType double double DOUBLE 8 double “double”
DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)
“decimal(10,3)”
BooleanType boolean boolean BOOLEAN 1 boolean “boolean”
ByteType tinyint byte TINYINT 1 tinyint “byte”
BinaryType binary binary BINARY 100 binary “binary”
DateType date date DATE 4 date “date”
TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”
三个复合类型:
DataType simpleString typeName sql defaultSize catalogString json
ArrayType(IntegerType, true) array array ARRAY 4 array
“type”:”array”,”elementType”:”integer”,”containsNull”:true
MapType(StringType, LongType, true) map map MAP
28 map
“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true
StructType(StructField(“sf”, DoubleType)::Nil) struct struct
STRUCT 8 struct
“type”:”struct”,”fields”:[“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:]
参考技术B 本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。
json字符串转DataFrame
spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:
* 若列数据全为null会用String类型
* 整数默认会用Long类型
* 浮点数默认会用Double类型 val json1 = """"a":null, "b": 23.1, "c": 1""" val json2 =
""""a":null, "b": "hello", "d": 1.2""" val ds =
spark.createDataset(Seq(json1, json2))val df = spark.read.json(ds) df.show
df.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null
|23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string
(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)
|-- d: double (nullable =true)
若指定schema会按照schema生成DF:
* schema中不存在的列会被忽略
* 可以用两种方法指定schema,StructType和String,具体对应关系看后面
*
若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常
val schema = StructType(List( StructField("a", ByteType, true), StructField("b"
, FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "b
float, c short" val df = spark.read.schema(schema).json(ds) df.show
df.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null
|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float
(nullable =true) |-- c: short (nullable = true)
json解析相关配置参数
primitivesAsString (default false): 把所有列看作string类型
prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.
allowComments (default false): 忽略json字符串中Java/C++风格的注释
allowUnquotedFieldNames (default false): 允许不加引号的列名
allowSingleQuotes (default true): 除双引号外,还允许用单引号
allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)
allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符
allowUnquotedControlChars (default false):
允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。
mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。
PERMISSIVE
:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。
如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。
DROPMALFORMED : 忽略整条损害记录
FAILFAST : 遇到损坏记录throws an exception
columnNameOfCorruptRecord
(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE
mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord
dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式.
只有日期部分(无详细时间)
timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX):
自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)
multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件
以上参数可用option方法配置:
val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show stringDF.printSchema +----+-----+----+----+ | a| b| c| d|
+----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2|
+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string
(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)
二进制类型会自动用base64编码方式表示
‘Man’(ascci) base64编码后为:”TWFu”
val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte) val binaryDs =
spark.createDataset(Seq(byteArr))val dsWithB64 = binaryDs.withColumn("b64",
base64(col("value"))) dsWithB64.show(false) dsWithB64.printSchema
+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|
+----------+----+ root |-- value: binary (nullable =true) |-- b64: string
(nullable =true) //=================================================
dsWithB64.toJSON.show(false) +-----------------------------+ |value |
+-----------------------------+ |"value":"TWFu","b64":"TWFu"|
+-----------------------------+
//================================================= val json =
""""value":"TWFu"""" val jsonDs = spark.createDataset(Seq(json)) val binaryDF
= spark.read.schema("value binary").json(jsonDs ) binaryDF.show
binaryDF.printSchema +----------+ | value| +----------+ |[4D 61 6E]|
+----------+ root |-- value: binary (nullable =true)
指定schema示例:
以下是Spark SQL支持的所有基本类型:
val json = """"stringc":"abc", "shortc":1, "integerc":null, "longc":3,
"floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23,
"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12
11:22:22.123123"""" val ds = spark.createDataset(Seq(json)) val schema =
"stringc string, shortc short, integerc int, longc long, floatc float, doublec
double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,
datec date, timestampc timestamp" val df = spark.read.schema(schema).json(ds)
df.show(false) df.printSchema
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc
|datec |timestampc |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11
:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)
|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--
floatc: float (nullable =true) |-- doublec: double (nullable = true) |--
decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true
) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--
datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)
复合类型:
val json = """ "arrayc" : [ 1, 2, 3 ], "structc" : "strc" : "efg",
"decimalc" : 1.1 , "mapc" : "key1" : 1.2, "key2" : 1.1 """ val ds =
spark.createDataset(Seq(json))val schema = "arrayc array, structc
struct, mapc map" val df =
spark.read.schema(schema).json(ds) df.show(false) df.printSchema
+---------+--------+--------------------------+ |arrayc |structc |mapc |
+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 -> 1.2
, key2 ->1.1]| +---------+--------+--------------------------+ root |-- arrayc:
array (nullable =true) | |-- element: short (containsNull = true) |-- structc:
struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:
decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:
string | |-- value: float (valueContainsNull =true)
SparkSQL数据类型
基本类型:
DataType simpleString typeName sql defaultSize catalogString json
StringType string string STRING 20 string “string”
ShortType smallint short SMALLINT 2 smallint “short”
IntegerType int integer INT 4 int “integer”
LongType bigint long BIGINT 8 bigint “long”
FloatType float float FLOAT 4 float “float”
DoubleType double double DOUBLE 8 double “double”
DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)
“decimal(10,3)”
BooleanType boolean boolean BOOLEAN 1 boolean “boolean”
ByteType tinyint byte TINYINT 1 tinyint “byte”
BinaryType binary binary BINARY 100 binary “binary”
DateType date date DATE 4 date “date”
TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”
三个复合类型:
DataType simpleString typeName sql defaultSize catalogString json
ArrayType(IntegerType, true) array array ARRAY 4 array
“type”:”array”,”elementType”:”integer”,”containsNull”:true
MapType(StringType, LongType, true) map map MAP
28 map
“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true
StructType(StructField(“sf”, DoubleType)::Nil) struct struct
STRUCT 8 struct
“type”:”struct”,”fields”:[“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:]
参考技术C 本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。
json字符串转DataFrame
spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:
* 若列数据全为null会用String类型
* 整数默认会用Long类型
* 浮点数默认会用Double类型 val json1 = """"a":null, "b": 23.1, "c": 1""" val json2 =
""""a":null, "b": "hello", "d": 1.2""" val ds =
spark.createDataset(Seq(json1, json2))val df = spark.read.json(ds) df.show
df.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null
|23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string
(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)
|-- d: double (nullable =true)
若指定schema会按照schema生成DF:
* schema中不存在的列会被忽略
* 可以用两种方法指定schema,StructType和String,具体对应关系看后面
*
若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常
val schema = StructType(List( StructField("a", ByteType, true), StructField("b"
, FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "b
float, c short" val df = spark.read.schema(schema).json(ds) df.show
df.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null
|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float
(nullable =true) |-- c: short (nullable = true)
json解析相关配置参数
primitivesAsString (default false): 把所有列看作string类型
prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.
allowComments (default false): 忽略json字符串中Java/C++风格的注释
allowUnquotedFieldNames (default false): 允许不加引号的列名
allowSingleQuotes (default true): 除双引号外,还允许用单引号
allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)
allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符
allowUnquotedControlChars (default false):
允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。
mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。
PERMISSIVE
:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。
如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。
DROPMALFORMED : 忽略整条损害记录
FAILFAST : 遇到损坏记录throws an exception
columnNameOfCorruptRecord
(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE
mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord
dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式.
只有日期部分(无详细时间)
timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX):
自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)
multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件
以上参数可用option方法配置:
val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show stringDF.printSchema +----+-----+----+----+ | a| b| c| d|
+----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2|
+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string
(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)
二进制类型会自动用base64编码方式表示
‘Man’(ascci) base64编码后为:”TWFu”
val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte) val binaryDs =
spark.createDataset(Seq(byteArr))val dsWithB64 = binaryDs.withColumn("b64",
base64(col("value"))) dsWithB64.show(false) dsWithB64.printSchema
+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|
+----------+----+ root |-- value: binary (nullable =true) |-- b64: string
(nullable =true) //=================================================
dsWithB64.toJSON.show(false) +-----------------------------+ |value |
+-----------------------------+ |"value":"TWFu","b64":"TWFu"|
+-----------------------------+
//================================================= val json =
""""value":"TWFu"""" val jsonDs = spark.createDataset(Seq(json)) val binaryDF
= spark.read.schema("value binary").json(jsonDs ) binaryDF.show
binaryDF.printSchema +----------+ | value| +----------+ |[4D 61 6E]|
+----------+ root |-- value: binary (nullable =true)
指定schema示例:
以下是Spark SQL支持的所有基本类型:
val json = """"stringc":"abc", "shortc":1, "integerc":null, "longc":3,
"floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23,
"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12
11:22:22.123123"""" val ds = spark.createDataset(Seq(json)) val schema =
"stringc string, shortc short, integerc int, longc long, floatc float, doublec
double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,
datec date, timestampc timestamp" val df = spark.read.schema(schema).json(ds)
df.show(false) df.printSchema
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc
|datec |timestampc |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11
:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)
|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--
floatc: float (nullable =true) |-- doublec: double (nullable = true) |--
decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true
) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--
datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)
复合类型:
val json = """ "arrayc" : [ 1, 2, 3 ], "structc" : "strc" : "efg",
"decimalc" : 1.1 , "mapc" : "key1" : 1.2, "key2" : 1.1 """ val ds =
spark.createDataset(Seq(json))val schema = "arrayc array, structc
struct, mapc map" val df =
spark.read.schema(schema).json(ds) df.show(false) df.printSchema
+---------+--------+--------------------------+ |arrayc |structc |mapc |
+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 -> 1.2
, key2 ->1.1]| +---------+--------+--------------------------+ root |-- arrayc:
array (nullable =true) | |-- element: short (containsNull = true) |-- structc:
struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:
decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:
string | |-- value: float (valueContainsNull =true)
SparkSQL数据类型
基本类型:
DataType simpleString typeName sql defaultSize catalogString json
StringType string string STRING 20 string “string”
ShortType smallint short SMALLINT 2 smallint “short”
IntegerType int integer INT 4 int “integer”
LongType bigint long BIGINT 8 bigint “long”
FloatType float float FLOAT 4 float “float”
DoubleType double double DOUBLE 8 double “double”
DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)
“decimal(10,3)”
BooleanType boolean boolean BOOLEAN 1 boolean “boolean”
ByteType tinyint byte TINYINT 1 tinyint “byte”
BinaryType binary binary BINARY 100 binary “binary”
DateType date date DATE 4 date “date”
TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”
三个复合类型:
DataType simpleString typeName sql defaultSize catalogString json
ArrayType(IntegerType, true) array array ARRAY 4 array
“type”:”array”,”elementType”:”integer”,”containsNull”:true
MapType(StringType, LongType, true) map map MAP
28 map
“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true
StructType(StructField(“sf”, DoubleType)::Nil) struct struct
STRUCT 8 struct
“type”:”struct”,”fields”:[“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:
参考技术D IO分类:
按照数据流向分类:
输入流

输出流

按照处理的单位划分:
字节流:字节流读取的都是文件中的二进制数据,读取到的二进制数据不会经过任何处理

字符流:字符流读取的数据都是以字符为单位的,字符流也是读取的文件的二进制数据,只不过会把这些二进制数据转换成我们能识别的字符
字符流 = 字节流 + 解码

输出字节流::
------------------|OutputStream 所有输出字节流的基类 抽象类
-----------|FileOutputStream 向文件输出数据的输出字节流 throws FileNotFoundException

FileOutputStream的使用步骤:

1.找到目标文件
2.建立数据通道
3.把数据转换成字节数组写出
4.关闭资源

FileOutputStream的一些方法:

close() 关闭此文件输出流并释放与此流有关的所有系统资源。
write(int b) 将指定字节写入此文件输出流。
write(byte[] b)   将 b.length 个字节从指定 byte 数组写入此文件输出流中。
write(byte[] b, int off, int len) 将指定 byte 数组中从偏移量 off 开始的 len 个字节写入此文件输出流。

   注意:
      1.write(byte b[])方法实际上是调用了 write(byte b[], int off, int len)方法

FileOutputStream要注意的细节:

1.使用FileOutputStream的时候,如果目标文件不存在,那么会创建目标文件对象,然后把数据写入
2.使用FileOutputStream的时候,如果目标文件已经存在,那么会清空目标文件的数据后再写入数据,如果需要再原数据的上追加数据,需要使用FileOutputStream(file,true)构造函数
3.使用FileOutputStream的write方法写数据的时候,虽然接受的是一个int类型的数据,但真正写出的只是一个字节的数据,只是把低八位的二进制数据写出,其他二十四位数据全部丢弃

复制代码
public class Demo2
public static void main(String[] args)

//1.找到目标文件
File file = new File("D:\\新建文件夹 (2)\\a.txt");
System.out.println("文件的源文数据是:"+readFile(file));
writeFile(file,"Hello World!");
System.out.println("目前文件数据是:"+readFile(file));

//输出数据
public static void writeFile(File file,String data)
FileOutputStream fileOutputStream = null;
try
//2.建立通道(我的源文件数据:abc 追加数据)
fileOutputStream = new FileOutputStream(file,true);
//3.把要写入的数据转换成字符数组
fileOutputStream.write(data.getBytes());

catch(IOException e)

throw new RuntimeException(e);
finally
//关闭资源
try
fileOutputStream.close();
catch (IOException e)
throw new RuntimeException(e);





//输入数据
public static String readFile(File file)
FileInputStream fileInputStream = null;
String str = "";
try
//建立数据通道
fileInputStream = new FileInputStream(file);
//创建缓冲字节数组
byte[] buf = new byte[1024];
int length = 0;
while((length = fileInputStream.read(buf))!=-1)
//把字节数组转换成字符串返回
str+=new String(buf,0,length);

catch (IOException e)
throw new RuntimeException();
finally
try
fileInputStream.close();
catch (IOException e)
throw new RuntimeException(e);


return str;

Netty-解码器架构与常用解码器

任何数据类型想在网络中进行传输,都得经过编解码转换成字节流

在netty中,服务端和客户端进行通信的其实是下面这样的

程序 ---编码--> 网络

网路 ---解码--> 程序

对应服务端:

  • 入站数据, 经过解码器解码后给后续的handler使用
  • 出站数据, 结果编码器编码成字节流给在网络上传播

在netty中的编码器其实就是一个handler,回想一下,无论是编写服务端的代码,还是客户端的代码,总会通过一个channelIniteializer往pipeline中动态的添加多个处理器,在添加我们自定义的处理器之前,往往会添加编解码器,其实说白了,编解码器其实就是特定功能的handler

我们这样做是有目的的,因为第一步就得需要把字节流转换成我们后续的handler中能处理的常见的数据类型

Netty中的编解码器太多了,下面就用常用的ByteToMessageDecoder介绍他的体系

编码器的模板基类ByteToMessageDecoder

ByteToMessageDecoder继承了ChannelInboundHandlerAdapter 说明它是处理出站方向数据的编码器,而且它也因此是一个不折不扣的Handler,在回想,其实In开头的handler都是基于事件驱动的,被动的处理器,当客户端发生某种事件时,它对应有不同的动作回调,而且它的特色就是 fireXXX往下传递事件, 带回我们就能看到,netty用它把处理好的数据往下传递

架构概述

ByteToMessageDecoder本身是一个抽象类,但是它只有一个抽象方法decode()

netty中的解码器的工作流程如下:

  • 累加字节流
  • 调用子类的decode()方法进行解码
  • 将解析完成的ByteBuf往后传递

既然是入栈处理器,有了新的数据,channelRead()就会被回调,我们去看一下它的channelRead()

下面是它的源码,

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
if (msg instanceof ByteBuf)  // todo 在这里判断, 是否是 ByteBuf类型的,如果是,进行解码,不是的话,简单的往下传播下去
    CodecOutputList out = CodecOutputList.newInstance();
    try 
        ByteBuf data = (ByteBuf) msg;
        // todo 进入查看 cumulation是类型  累加器,其实就是往 ByteBuf中 write数据,并且,当ByteBuf 内存不够时进行扩容
        first = cumulation == null; // todo 如果为空, 则说明这是第一次进来的数据, 从没累加过
        if (first) 
            cumulation = data;  // todo 如果是第一次进来,直接用打他将累加器初始化
         else 
            cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // todo 非第一次进来,就进行累加
        
        // todo , 这是第二部, 调用子类的decode()进行解析
        callDecode(ctx, cumulation, out);
     catch (DecoderException e) 
        throw e;
     catch (Throwable t) 
        throw new DecoderException(t);
     finally 
        if (cumulation != null && !cumulation.isReadable()) 
            numReads = 0;
            cumulation.release();
            cumulation = null;
         else if (++ numReads >= discardAfterReads) 
            // We did enough reads already try to discard some bytes so we not risk to see a OOME.
            // See https://github.com/netty/netty/issues/4275
            numReads = 0;
            discardSomeReadBytes();
        

        int size = out.size();
        decodeWasNull = !out.insertSinceRecycled();
        // todo 调用 fireChannelRead,向后船舶channelRead事件, 前面的学习也知道,  她会从当前节点,挨个回调pipeline中处理器的CHannelRead方法
        fireChannelRead(ctx, out, size);
        out.recycle();
    
 else 
    ctx.fireChannelRead(msg);

其实三步工作流程就在上面的代码中

  • 累加字节流 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
  • 调用子类的decode()进行解析 callDecode(ctx, cumulation, out);
  • 将解析完成的ByteBuf往后传递fireChannelRead(ctx, out, size);

它的设计很清晰, 由ByteToMessageDecoder完成整个编码器的模板,规定好具体的处理流程,首先它负责字节流的累加工作,但是具体如何进行解码,由不同的子类去实现,因此它设及成了唯一的抽象方法,在他的模板中,子类将数据解码完成后,它再将数据传播下去

什么是累加器cumulation?

源码如下:我们可以看到,其实他就是一个辅助对象, 里面维护了一个 ByteBuf的引用

  • 所谓累加,就是往ByteBuf中write数据
  • 所谓维护,就是 动态判断ByteBuf中可写入的区域大小和将写入的字节的关系
  • 最后,为了防止内存泄露,将收到的ByteBuf 释放
// todo 创建一个累加器
public static final Cumulator MERGE_CUMULATOR = new Cumulator() 
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) 
final ByteBuf buffer;
// todo 如果 writerIndex + readableBytes > cumulation.maxCapacity 说明已经无法继续累加了
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
        || cumulation.refCnt() > 1 || cumulation.isReadOnly()) 
    // todo 扩容
    buffer = expandCumulation(alloc, cumulation, in.readableBytes());
 else 
    buffer = cumulation;

// todo 往 ByteBuf中写入数据 完成累加
buffer.writeBytes(in);
// todo 累加完成之后,原数据 释放掉
in.release();
return buffer;

;

第二步,callDecode(ctx, cumulation, out)

我们直接跟进源码: 可以看到,在把ByteBuf真正通过下面的decodeRemovalReentryProtection(ctx, in, out);的子类进行解码时, 它记录下来了当时ByteBuf中可读的字节数, 它用这个标记和经过子类处理之后的ByteBuf的可读的字节数进行比对,从而判断出子类是否真的读取成功

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 
try 
while (in.isReadable()) 
    int outSize = out.size();

    if (outSize > 0) // todo 如果盛放解析完成后的数据的 out集合中有数据
        fireChannelRead(ctx, out, outSize); /// todo 传播channelRead事件,数据也传递进去
        out.clear();  // todo 清空out 集合

        if (ctx.isRemoved()) 
            break;
        
        outSize = 0;
    

    // todo 记录 子类使用in之前, in中的可读的字节
    int oldInputLength = in.readableBytes();

    //todo 调用子类重写的 decode()
    decodeRemovalReentryProtection(ctx, in, out);
    if (ctx.isRemoved()) 
        break;
    

    if (outSize == out.size())  // todo 0 = 经过上面的decode解析后的 out.size()==0 , 说明没解析出任何东西
        if (oldInputLength == in.readableBytes())  // todo 第一种情况就是 可能字节数据不够, 根本没从in中读
            break;
         else 
            continue;  // todo 情况2: 从in中读了, 但是没来得及继续出 内容
        
    
    // todo 来到这里就说明,已经解析出数据了 ,
    // todo  解析出数据了  就意味着in中的readIndex被子类改动了, 即 oldInputLength != in.readableBytes()
    // todo 如下现在还相等, 肯定是出问题了
    if (oldInputLength == in.readableBytes()) 
        throw new DecoderException(
                StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
    
    if (isSingleDecode()) 
        break;
    

 catch (DecoderException e) 
throw e;
 catch (Throwable cause) 
throw new DecoderException(cause);

如何实现自己的解码器?

实现自己的解码器, 记得了解这三个参数分别是什么

  • ctx: 当前的hander所在的 Context
  • cumulation: 累加器,其实就是ByteBuf
  • out: 她其实是个容器, 用来盛放 经过编码之后的数据,也就是可以被后续的处理器使用 类型

实现的思路就是继承ByteToMessageDecoder然后重写它唯一的抽象方法,decode(), 实现的逻辑如下:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception 
    System.out.println("MyDeCoderHandler invoke...");
    System.out.println(in.readableBytes());
    if (in.readableBytes()>=8)
        out.add(in.readLong());
    

常用的编解码器

固定长度的解码器FixedLengthFrameDecoder

他里面只维护着一个private final int frameLength;
使用时,我们通过构造函数传递给他,他就会按照下面的方式解码

我们看一下它的javaDoc

 原始数据
 * +---+----+------+----+
 * | A | BC | DEFG | HI |
 * +---+----+------+----+
 
 如果frameLength==3
 * +-----+-----+-----+
 * | ABC | DEF | GHI |
 * +-----+-----+-----+

它的decode() 实现如下

protected Object decode(
    @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception 
if (in.readableBytes() < frameLength) 
    return null;
 else 
// 从in中截取 frameLength 长度的 字节流
    return in.readRetainedSlice(frameLength);

行解码器LineBasedFrameDecoder

她会根据换行符进行解码, 无论用户发送过来的数据是以 \r\n 还是 \n 类型的换行符LineBasedFrameDecoder

使用:


   public LineBasedFrameDecoder(final int maxLength) 
        this(maxLength, true, false);
    
    
  public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) 
        this.maxLength = maxLength;
        this.failFast = failFast;
        this.stripDelimiter = stripDelimiter;
    

第一个构造函数

  • 入参位置是我们指定的每一行最大的字节数, 超过了这个大小的所有行,将全部被丢弃
  • 默认跳过分隔符
  • 出现了超过最大值的行,不报异常

第二个构造函数

  • 入参1 是我们指定的每一行最大的字节数, 超过了这个大小的所有行,将全部被丢弃
  • 入参2 指定每次解析是否跳过换行符
  • 入参3 指定出现大于规定的最大字节数时是否报异常

看它重写的decode()的实现逻辑如下:

它总起来分成四种情况

  • 非丢弃模式
    • 找到了换行符
      • 如 readIndex + 换行符的位置 < maxLength 的关系 --> 解码
      • 如 readIndex + 换行符的位置 > maxLength的关系 --> 丢弃
    • 未找到换行符
      • 如果可解析的长度 > maxLength --> 丢弃
  • 丢弃模式
    • 找到了换行符
      • 丢弃
    • 未找到换行符
      • 丢弃

基于分隔符的解码器DelimiterBasedFrameDecoder

它主要有这几个成员变量, 根据这几个成员变量,可以选出使用它哪个构造函数

private final ByteBuf[] delimiters;  分隔符,数组
private final int maxFrameLength;    每次能允许的最大解码长度
private final boolean stripDelimiter;  是否跳过分隔符
private final boolean failFast;      超过最大解码长度时,是否抛出异常
private boolean discardingTooLongFrame;  是否丢弃超过最大限度的帧
private int tooLongFrameLength;      记录超过最大范围的字节数值

分三步

  • 第一, 判断我们传递进入的分隔符是否是\n \r\n 如果是的话,就是用上面的, 行解码器
  • 第二步, 按照最细的力度进行解码, 比如, 我们有两个解码器, AB, 当前的readIndex 到A, 有2个字节, 到B有3个字节, 就会按照A进行解码
  • 解码

基于长度域的解码器LengthFieldBasedFrameDecoder

通常我们在对特定的网络协议进行解码时会用到它,比如说,最典型的http协议, 虽然http协议看起来, 又有请求头,又有请求体,挺麻烦的,它在网络中依然是以字节流的方式进行传输

基于长度域,指的是在传输的协议中有一个 length字段,这个十六进制的字段记录的可能是整个协议的长度,也可能是消息体的长度, 我们根据具体情况使用不同的构造函数

如何使用呢? 最常用它下面的这个构造函数

public LengthFieldBasedFrameDecoder(
    int maxFrameLength,
    int lengthFieldOffset,
    int lengthFieldLength,
    int lengthAdjustment,
    int initialBytesToStrip) 
this(
        maxFrameLength,
        lengthFieldOffset, lengthFieldLength, lengthAdjustment,
        initialBytesToStrip, true);

使用它的前提是,知道这五个参数的意思

  • maxFrameLength 每次解码所能接受的最大帧的长度
  • lengthFieldOffset 长度域的偏移量

    听着挺高大尚的, 偏移量, 说白了,就是在现有的这段字节数据中找个开始解码的位置, 大多数设为0, 意为,从o位置 开始解码

  • lengthFieldLength 字段域的长度, 根据lengthFieldOffset的初始值往后数lengthFieldLength个字节,这段范围解析出来的数值 可能是 长度域的大小,也可能是整个协议的大小(包括header,body...) 根据不同的协议不同
  • lengthAdjustment 矫正长度
  • initialBytesToStrip 需要取出的长度

下面是javaDoc给的例子

基于长度的拆包

 * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
 这是最简单的情况, 假定 Length的长度就是后面的 真正需要解码的内容
 
 现在的字节全部解码后是这样的  12HELLO, WORLD
 我们要做的就是区分出  12和HELLO, WORLD
 
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2 // todo 每两个字节 表示一个数据包
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0

 意思就是:
  字节数组[lengthFieldOffset,lengthFieldLength]之间的内容转换成十进制,就是后面的字段域的长度
    00 0C ==> 12 
  这个12 意思就是 长度域的长度, 说白了 就是我们想要的 HELLO, WORLD 的长度
  
  这样一算,就分开了
基于长度的阶段拆包
 
  * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +----------------+
 * | Length | Actual Content |----->| Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
 * +--------+----------------+      +----------------+
 情况2: 
 
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2 // todo 每两个字节 表示一个数据包
 * lengthAdjustment    = 0
 * initialBytesToStrip = 2
 
  意思就是
   字节数组[lengthFieldOffset,lengthFieldLength]之间的内容转换成十进制,就是后面的字段域的长度是
   00 0C ==> 12 
   这个12 意思就是 长度域的长度, 说白了 就是我们想要的 HELLO, WORLD 的长度
  
  然后,  从0开始 忽略 initialBytesToStrip, 就去除了 length ,只留下 HELLO, WORLD
  
  
  

 有时, 在某些其他协议中, length field 可能代表是整个消息的长度, 包括消息头
       在这种情况下,我们就得指定一个 非零的 lengthAdjustment 去调整
  
  
   * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
  
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2 // todo 每两个字节 表示一个数据包
 * lengthAdjustment    = -2
 * initialBytesToStrip = 0

    意思就是
    
    字节数组[lengthFieldOffset,lengthFieldLength]之间的内容转换成十进制,表示整个协议的长度
    00 0C ==> 14  意味,协议全长 14
    现在还是不能区分开  Length 和 Actual Content
  
    公式: 数据包的长度 = 长度域 + lengthFieldOffset + lengthFieldLength +lengthAdjustment
    
    通过他可以算出 lengthAdjustment = -2
基于偏移长度的拆包
    
  
 * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
 * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
  这个例子和第一个例子很像,但是多了头
  
  我们想拿到后面消息长度的信息,就偏移过header
  
 * lengthFieldOffset   = 2
 * lengthFieldLength   = 3 // todo 每两个字节 表示一个数据包
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0
  
  
  字节数组[lengthFieldOffset,lengthFieldLength]之间的内容转换成十进制, 表示长度域的长度
               
  在这里 整好跳过了 header 1,   0x00 00 0C 是三个字节
  也就是  字节数组[lengthFieldOffset,lengthFieldLength]=>[0,3]
  0x00 00 0C == 12 表示长度域是 12
  
  现在也成功区分开了 Header 1 和  Length 和 Actual Content
  分别是 2 3 12
  
基于可调整长度的拆包
  
  
  BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
 * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
  
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 3 // todo 每两个字节 表示一个数据包
 * lengthAdjustment    = 2
 * initialBytesToStrip = 0
  
  
  字节数组[lengthFieldOffset,lengthFieldLength]之间的内容转换成十进制, 表示长度域的长度

  也就是  字节数组[lengthFieldOffset,lengthFieldLength]=>[0,3]
  0x00 00 0C 是三个字节  
  0x00 00 0C == 12 表示长度域是 12 == 长度域的长度 就是 HELLO, WORLD的长度
  但是上面的图多了一个 两个字节长度的 Header 1
  下一步进行调整 
  
  公式: 数据包的长度 = 长度域 + lengthFieldOffset + lengthFieldLength +lengthAdjustment
  
  lengthAdjustment= 17-12-0-3=2
基于偏移可调整长度的截断拆包

 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+
  
 
 * lengthFieldOffset   = 1
 * lengthFieldLength   = 2 // todo 每两个字节 表示一个数据包
 * lengthAdjustment    = 1
 * initialBytesToStrip = 3
 
 lengthFieldOffset =1 偏移1字节 跨过 HDR1
 
 lengthFieldLength =2 从[1,2] ==> 0x000C =12 表示长度域的值
 
 看拆包后的结果,后面明显还多了个 HDR2 ,进行调整
 公式:  数据包值 = 长度域  + lengthFieldOffset+ lengthFieldLength + lengthAdjustment
 算出 lengthAdjustment = 16 - 12 - 1 - 2 = 1
 
 结果值只有 HDR2 和  Actual Content , 说明,前面通过 initialBytesToStrip 进行忽略
 initialBytesToStrip =3
 
基于偏移可调整长度的 变种 截断拆包
 
  * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+
 
 
 * lengthFieldOffset   = 1
 * lengthFieldLength   = 2 // todo 每两个字节 表示一个数据包
 * lengthAdjustment    = -3
 * initialBytesToStrip = 3
 
 同样
 看结果,保留 HDR2 和 Actual Content
 
 lengthFieldOffset   = 1 表示跳过开头的 HDR1
 [1,2] ==> 00 10 , 算出的 长度域的值==10 很显然这不对
 
 10 < 13
 
 我们要想拆出后面的数据包就得在现有的基础上往左移动三个字节 -3个调整量

以上是关于sparkdataframe转换成字节流的主要内容,如果未能解决你的问题,请参考以下文章

转换流——OutputStreamWriter类与InputStreamReader类

python3 利用struct.pack 动态组建字节流

图片与文件流互转

MapReduce序列化及分区的java代码示例

python中sys.setdefaultencoding('utf-8')的作用

python中sys.setdefaultencoding('utf-8')的作用