在 Spark sql 中按二进制类型过滤

Posted

技术标签:

【中文标题】在 Spark sql 中按二进制类型过滤【英文标题】:Filtering by Binary type in Spark sql 【发布时间】:2017-12-20 09:29:20 【问题描述】:

我的架构中有一个表示 IP 地址的字段。我想使用Binary Type 来存储数据。

我想象的方式是,如果我的 ip 是:50.100.150.200,我会将其保存为字节数组中的[50,100,150,200](序列确实很重要,但我们可以将其排除在此问题的讨论之外)。

我的问题是查询时如何按此列过滤? (字符串并不真正符合目的)

例如,我想运行以下查询:

SELECT * from table1 WHERE sourceip='50.100.150.200'

这里有一段代码来演示这个问题:

Bean 定义(用于创建模式):

    public static class MyBean1 implements Serializable 
    private static final long serialVersionUID = 1L;
    private int id;
    private String name;
    private byte[] description;

    public MyBean1(int id, String name, String description) 
        this.id = id;
        this.name = name;
        this.description = description.getBytes();
    

    public int getId() 
        return id;
    
    public void setId(int id) 
        this.id = id;
    
    public String getName() 
        return name;
    
    public void setName(String name) 
        this.name = name;
    
    public byte[] getDescription() 
        return description;
    
    public void setDescription(byte[] description) 
        this.description = description;
    

演示代码(我想按描述过滤):

    List<MyBean1> newDebugData = new ArrayList<MyBean1>();
    newDebugData.add(new MyBean1(1, "Arnold", "10.150.15.10"));
    newDebugData.add(new MyBean1(1, "Bob", "10.150.15.11"));
    newDebugData.add(new MyBean1(3, "Bob", "10.150.15.12"));
    newDebugData.add(new MyBean1(3, "Bob", "10.150.15.13"));
    newDebugData.add(new MyBean1(1, "Alice", "10.150.15.14"));

    Dataset<Row> df2 = sqlContext.createDataFrame(newDebugData, MyBean1.class);
    df2.createTempView("table1");
    sqlContext.sql("select * from table1 where description='10.150.15.14'").show();

我收到错误:

differing types in '(table1.`description` = CAST('10.150.15.14' AS DOUBLE))'

【问题讨论】:

你能分享一个可重现的例子吗? @moto 请看我的编辑 对于 IPv4 地址,我们使用 LongType(不是 IntegerType,因为一位用于符号)。我们有一个将where description='1.2.3.4' 转换为where description = 16909060 的包装脚本(16909060 = 0x01020304)。我们的 spark 程序包含一个将列值转换回点分十进制的 UDF:results_df = results_df.withColumn('description', ipv4AsStr_udf(col('description'))) 【参考方案1】:

这不是你问题的 100% 答案,但我希望指针会有所帮助。

下面的问题不是关于过滤,而是从数组中选择数据。 selecting a range of elements in an array spark sql

那里看起来有很多信息,包括有关使用 Spark SQL 查询数组的 UDF 的一些指南。

希望这会有所帮助。

【讨论】:

感谢提示,实际上我在研究时看到了这个答案。请查看我对用例的编辑【参考方案2】:

SPARK-21344 修复了版本 2.0.3、2.1.2、2.2.1 中的以下问题:BinaryType 比较执行有符号字节数组比较。所以二进制比较应该适用于这些版本。

JIRA 有以下 scala 测试代码:

case class TestRecord(col0: Array[Byte])
def convertToBytes(i: Long): Array[Byte] = 
   val bb = java.nio.ByteBuffer.allocate(8)
   bb.putLong(i)
   bb.array

val timestamp = 1498772083037L
val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i)))
val testDF = sc.parallelize(data).toDF
val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) 
              && col("col0") < convertToBytes(timestamp + 50L))
assert(filter1.count == 50)

我不知道等效的 Java 代码是什么,但这应该可以帮助您入门。


我在上面的评论中提到了我们使用 LongType 存储 IPv4 地址的问题。我们有一个包装脚本将点分十进制转换为长整数,还有一个 Spark UDF 以另一种方式进行:长整数到点分十进制。我认为 LongType 的查询速度比 BinaryType 快。

【讨论】:

以上是关于在 Spark sql 中按二进制类型过滤的主要内容,如果未能解决你的问题,请参考以下文章

大负十进制值在带有十进制类型的spark DataFrame中取整

使用 Spark 过滤大型数据集中的列

火花:十进制类型未找到

在镶木地板的地图类型列上使用 spark-sql 过滤下推

无法在 Azure SQL 跨数据库查询中按日期时间类型进行筛选

十进制数据类型无法在 spark 和 Hive 中正确存储值