需要根据某个关键字过滤掉Kafka Records

Posted

技术标签:

【中文标题】需要根据某个关键字过滤掉Kafka Records【英文标题】:Need to filter out Kafka Records based on a certain keyword 【发布时间】:2019-08-03 22:41:24 【问题描述】:

我有一个 Kafka 主题,它有大约 300 万条记录。我想从中挑选出一条具有特定参数的记录。我一直在尝试使用 Lenses 进行查询,但无法形成正确的查询。以下是1条消息的记录内容。


  "header": 
    "schemaVersionNo": "1",
  ,
  "payload": 
    "modifiedDate": 1552334325212,
    "createdDate": 1552334325212,
    "createdBy": "A",
    "successful": true,
    "source_order_id": "1111111111111",
  

现在我想过滤掉具有特定 source_order_id 的记录,但无法找出正确的方法。 我们也尝试过通过镜头以及 Kafka Tool。

我们在镜头中尝试的示例查询如下:

SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='A'

此查询有效,但是如果我们尝试使用如下所示的源 ID,则会收到错误:

SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='1111111111111'



 Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.

通过自定义消费者使用所有 300 万条记录,然后对其进行迭代,这对我来说似乎不是一种优化的方法,因此为此类用例寻找任何可用的解决方案。

【问题讨论】:

Apacha Lenses 是什么?您是指 Lenses 及其 Lenses SQL 语言吗? 您正在构建的解决方案是否必须围绕 Lenses?如果对你有用的话,我可以基于 KSQL 给你一个答案。 @RobinMoffatt: docs.lenses.io/overview/lenses-kafka.html @RobinMoffatt:不一定,任何解决方案都可以。 【参考方案1】:

既然你说你对其他解决方案持开放态度,这里有一个使用KSQL 构建的解决方案。

首先,让我们将一些示例记录放入源主题中:

$ kafkacat -P -b localhost:9092 -t TEST <<EOF
 "header":  "schemaVersionNo": "1" , "payload":  "modifiedDate": 1552334325212, "createdDate": 1552334325212, "createdBy": "A", "successful": true, "source_order_id": "3411976933214"  
 "header":  "schemaVersionNo": "1" , "payload":  "modifiedDate": 1552334325412, "createdDate": 1552334325412, "createdBy": "B", "successful": true, "source_order_id": "3411976933215"  
 "header":  "schemaVersionNo": "1" , "payload":  "modifiedDate": 1552334325612, "createdDate": 1552334325612, "createdBy": "C", "successful": true, "source_order_id": "3411976933216"  
EOF

使用KSQL,我们可以用PRINT检查主题:

ksql> PRINT 'TEST' FROM BEGINNING;
Format:JSON
"ROWTIME":1552476232988,"ROWKEY":"null","header":"schemaVersionNo":"1","payload":"modifiedDate":1552334325212,"createdDate":1552334325212,"createdBy":"A","successful":true,"source_order_id":"3411976933214"
"ROWTIME":1552476232988,"ROWKEY":"null","header":"schemaVersionNo":"1","payload":"modifiedDate":1552334325412,"createdDate":1552334325412,"createdBy":"B","successful":true,"source_order_id":"3411976933215"
"ROWTIME":1552476232988,"ROWKEY":"null","header":"schemaVersionNo":"1","payload":"modifiedDate":1552334325612,"createdDate":1552334325612,"createdBy":"C","successful":true,"source_order_id":"3411976933216"

然后在主题上声明一个模式,这使我们能够针对它运行 SQL:

ksql> CREATE STREAM TEST (header STRUCT<schemaVersionNo VARCHAR>, 
                          payload STRUCT<modifiedDate BIGINT, 
                                        createdDate BIGINT, 
                                        createdBy VARCHAR, 
                                        successful BOOLEAN, 
                                        source_order_id VARCHAR>) 
                          WITH (KAFKA_TOPIC='TEST', 
                                VALUE_FORMAT='JSON');

Message
----------------
Stream created
----------------

告诉 KSQL 使用主题中的所有数据:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

现在我们可以选择所有数据了:

ksql> SELECT * FROM TEST;
1552475910106 | null | SCHEMAVERSIONNO=1 | MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214
1552475910106 | null | SCHEMAVERSIONNO=1 | MODIFIEDDATE=1552334325412, CREATEDDATE=1552334325412, CREATEDBY=B, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933215
1552475910106 | null | SCHEMAVERSIONNO=1 | MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216
^CQuery terminated

或者我们可以选择性地查询它,使用-&gt; 符号来访问模式中的嵌套字段:

ksql> SELECT * FROM TEST 
        WHERE PAYLOAD->CREATEDBY='A';
1552475910106 | null | SCHEMAVERSIONNO=1 | MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214

除了选择所有记录外,您还可以只返回感兴趣的字段:

ksql> SELECT payload FROM TEST 
        WHERE PAYLOAD->source_order_id='3411976933216';
MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216

使用 KSQL,您可以将任何 SELECT 语句的结果写入一个新主题,该主题将使用所有现有消息以及源主题上的每条新消息进行填充,并根据声明的 SELECT 语句进行过滤和处理:

ksql> CREATE STREAM TEST_CREATED_BY_A AS
        SELECT * FROM TEST WHERE PAYLOAD->CREATEDBY='A';

Message
----------------------------
Stream created and running
----------------------------

列出Kafka集群上的主题:

ksql> SHOW TOPICS;

Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
orders                 | true       | 1          | 1                  | 1         | 1
pageviews              | false      | 1          | 1                  | 0         | 0
products               | true       | 1          | 1                  | 1         | 1
TEST                   | true       | 1          | 1                  | 1         | 1
TEST_CREATED_BY_A      | true       | 4          | 1                  | 0         | 0

打印新主题的内容:

ksql> PRINT 'TEST_CREATED_BY_A' FROM BEGINNING;
Format:JSON
"ROWTIME":1552475910106,"ROWKEY":"null","HEADER":"SCHEMAVERSIONNO":"1","PAYLOAD":"MODIFIEDDATE":1552334325212,"CREATEDDATE":1552334325212,"CREATEDBY":"A","SUCCESSFUL":true,"SOURCE_ORDER_ID":"3411976933214"

【讨论】:

以上是关于需要根据某个关键字过滤掉Kafka Records的主要内容,如果未能解决你的问题,请参考以下文章

如何根据与已知参考轨迹的距离过滤掉位置数据?

SQL语句对某字段去重?

fastjson过滤掉不需要返回的字段

distinct mysql过滤重复记录

结巴分词获取关键词时怎么过滤掉一些停用词

在kafka connect中按某个值过滤记录