在 Spark sql 中按二进制类型过滤
Posted
技术标签:
【中文标题】在 Spark sql 中按二进制类型过滤【英文标题】:Filtering by Binary type in Spark sql 【发布时间】:2017-12-20 09:29:20 【问题描述】:我的架构中有一个表示 IP 地址的字段。我想使用Binary Type 来存储数据。
我想象的方式是,如果我的 ip 是:50.100.150.200
,我会将其保存为字节数组中的[50,100,150,200]
(序列确实很重要,但我们可以将其排除在此问题的讨论之外)。
我的问题是查询时如何按此列过滤? (字符串并不真正符合目的)
例如,我想运行以下查询:
SELECT * from table1 WHERE sourceip='50.100.150.200'
这里有一段代码来演示这个问题:
Bean 定义(用于创建模式):
public static class MyBean1 implements Serializable
private static final long serialVersionUID = 1L;
private int id;
private String name;
private byte[] description;
public MyBean1(int id, String name, String description)
this.id = id;
this.name = name;
this.description = description.getBytes();
public int getId()
return id;
public void setId(int id)
this.id = id;
public String getName()
return name;
public void setName(String name)
this.name = name;
public byte[] getDescription()
return description;
public void setDescription(byte[] description)
this.description = description;
演示代码(我想按描述过滤):
List<MyBean1> newDebugData = new ArrayList<MyBean1>();
newDebugData.add(new MyBean1(1, "Arnold", "10.150.15.10"));
newDebugData.add(new MyBean1(1, "Bob", "10.150.15.11"));
newDebugData.add(new MyBean1(3, "Bob", "10.150.15.12"));
newDebugData.add(new MyBean1(3, "Bob", "10.150.15.13"));
newDebugData.add(new MyBean1(1, "Alice", "10.150.15.14"));
Dataset<Row> df2 = sqlContext.createDataFrame(newDebugData, MyBean1.class);
df2.createTempView("table1");
sqlContext.sql("select * from table1 where description='10.150.15.14'").show();
我收到错误:
differing types in '(table1.`description` = CAST('10.150.15.14' AS DOUBLE))'
【问题讨论】:
你能分享一个可重现的例子吗? @moto 请看我的编辑 对于 IPv4 地址,我们使用 LongType(不是 IntegerType,因为一位用于符号)。我们有一个将where description='1.2.3.4'
转换为where description = 16909060
的包装脚本(16909060 = 0x01020304)。我们的 spark 程序包含一个将列值转换回点分十进制的 UDF:results_df = results_df.withColumn('description', ipv4AsStr_udf(col('description')))
。
【参考方案1】:
这不是你问题的 100% 答案,但我希望指针会有所帮助。
下面的问题不是关于过滤,而是从数组中选择数据。 selecting a range of elements in an array spark sql
那里看起来有很多信息,包括有关使用 Spark SQL 查询数组的 UDF 的一些指南。
希望这会有所帮助。
【讨论】:
感谢提示,实际上我在研究时看到了这个答案。请查看我对用例的编辑【参考方案2】:SPARK-21344 修复了版本 2.0.3、2.1.2、2.2.1 中的以下问题:BinaryType 比较执行有符号字节数组比较。所以二进制比较应该适用于这些版本。
JIRA 有以下 scala 测试代码:
case class TestRecord(col0: Array[Byte])
def convertToBytes(i: Long): Array[Byte] =
val bb = java.nio.ByteBuffer.allocate(8)
bb.putLong(i)
bb.array
val timestamp = 1498772083037L
val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i)))
val testDF = sc.parallelize(data).toDF
val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp)
&& col("col0") < convertToBytes(timestamp + 50L))
assert(filter1.count == 50)
我不知道等效的 Java 代码是什么,但这应该可以帮助您入门。
我在上面的评论中提到了我们使用 LongType 存储 IPv4 地址的问题。我们有一个包装脚本将点分十进制转换为长整数,还有一个 Spark UDF 以另一种方式进行:长整数到点分十进制。我认为 LongType 的查询速度比 BinaryType 快。
【讨论】:
以上是关于在 Spark sql 中按二进制类型过滤的主要内容,如果未能解决你的问题,请参考以下文章
大负十进制值在带有十进制类型的spark DataFrame中取整