通过Pipelinedb使用SQL查询Kafka实时流数据
Posted 知了小巷
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过Pipelinedb使用SQL查询Kafka实时流数据相关的知识,希望对你有一定的参考价值。
很多时候我们对流的数据不能进行很好的结构化查询,就是不能像SQL一样查询数据流,当然现在flink和spark(structured)streaming可以做到实时的流处理(支持SQL语义和语法及扩展)。
本文要介绍的是基于PostgreSQL的一个开源扩展数据库PipelineDB,它能连续地在数据流上做SQL查询,并通过增量的方式将数据保存到结构化的数据库中。
主要特点和优势
不需要编写应用代码
只需要使用SQL就可以处理实时的数据。
使用持续执行的执行计划和执行引擎来处理复杂的正在运行中的计算任务。运行在PostgreSQL之上
PipelineDB是标准的PostgreSQL扩展,依托极其成熟、可靠和活跃的PG生态系统,使得PipelineDB非常的稳定。甩掉ETL
PipelineDB免去了数据处理的ETL层,根据提前声明好的SQL语句直接不断地将颗粒化的流数据写入数据库即可。效率高、可持续
PipelineDB只存储连续查询的结果输出,这些查询随着数据的不断获取而增量更新。
PipelineDB的安装
下载rpm文件,直接安装即可 pipelinedb-0.9.7u5-centos6-x86_64.rpm
1$ sudo rpm -ivh pipelinedb-0.9.7u5-centos6-x86_64.rpm
2[sudo] password for hadoop:
3Preparing... ########################################### [100%]
4 1:pipelinedb ########################################### [100%]
5
6 ____ _ ___ ____ ____
7 / __ \(_)___ ___ / (_)___ ___ / __ \/ __ )
8 / /_/ / / __ \/ _ \/ / / __ \/ _ \/ / / / __ |
9 / ____/ / /_/ / __/ / / / / / __/ /_/ / /_/ /
10/_/ /_/ .___/\___/_/_/_/ /_/\___/_____/_____/
11 /_/
12
13PipelineDB successfully installed. To get started, initialize a
14database directory:
15
16pipeline-init -D <data directory>
17
18where <data directory> is a nonexistent directory where you'd
19like all of your database files to live.
20
21You can find the PipelineDB documentation at:
22
23http://docs.pipelinedb.com
24
根据安装成功后的提示初始化存储目录
1$ pipeline-init -D pipe_data
2The files belonging to this database system will be owned by user "hadoop".
3This user must also own the server process.
4
5The database cluster will be initialized with locale "zh_CN.UTF-8".
6The default database encoding has accordingly been set to "UTF8".
7pipeline-init: could not find suitable text search configuration for locale "zh_CN.UTF-8"
8The default text search configuration will be set to "simple".
9
10Data page checksums are disabled.
11
12creating directory pipe_data ... ok
13creating subdirectories ... ok
14selecting default max_connections ... 100
15selecting default shared_buffers ... 128MB
16selecting dynamic shared memory implementation ... posix
17creating configuration files ... ok
18creating template1 database in pipe_data/base/1 ... ok
19initializing pg_authid ... ok
20initializing dependencies ... ok
21creating system views ... ok
22loading system objects' descriptions ... ok
23creating collations ... ok
24creating conversions ... ok
25creating dictionaries ... ok
26setting privileges on built-in objects ... ok
27creating information schema ... ok
28loading PL/pgSQL server-side language ... ok
29loading PipelineDB ... ok
30vacuuming database template1 ... ok
31copying template1 to template0 ... ok
32copying template1 to pipeline ... ok
33syncing data to disk ... ok
34
35WARNING: enabling "trust" authentication for local connections
36You can change this by editing pg_hba.conf or using the option -A, or
37--auth-local and --auth-host, the next time you run initdb.
38
39Success. You can now start the database server using:
40
41 pipeline-ctl -D pipe_data -l logfile start
启动pipelinedb
1pipeline-ctl -D pipe_data -l logfile start
停止pipelinedb
1pipeline-ctl -D pipe_data -l logfile stop
修改pipelinedb配置文件
配置允许特定IP访问
置服务IP和端口
1$ vi pg_hba.conf
2host all all 192.168.99.1/24 trust
3$ vi pipelinedb.conf
4listen_addresses = '192.168.99.100' # what IP address(es) to listen on;
5 # comma-separated list of addresses;
6 # defaults to 'localhost'; use '*' for all
7 # (change requires restart)
8port = 5432 # (change requires restart)
重启pipelinedb
1pipeline-ctl -D pipe_data -l logfile restart
使用psql访问并创建用户和分配角色
1$ psql -p 5432 -h bigdata1 pipeline
2psql (9.5.3)
3Type "help" for help.
4
5pipeline=# create user pipeline password 'pipeline' ;
6CREATE ROLE
7pipeline=# alter role pipeline superuser;
8ALTER ROLE
退出和查看数据库
1pipeline-# \q 退出pipeline[psql]
2pipeline=# \l 查看数据库
3pipeline=# \l
4 List of databases
5 Name | Owner | Encoding | Collate | Ctype | Access privileges
6-----------+--------+----------+-------------+-------------+-------------------
7 pipeline | hadoop | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 |
8 template0 | hadoop | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 | =c/hadoop +
9 | | | | | hadoop=CTc/hadoop
10 template1 | hadoop | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 | =c/hadoop +
11 | | | | | hadoop=CTc/hadoop
12(3 rows)
下载安装扩展librdkafka
1git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
2cd ~/librdkafka
3./configure --prefix=/usr
4make
5sudo make install
使用Navicate客户端连接pipelinedb
连接选择postgres
更多内容见 pipeline_kafka
1-- 加载扩展到数据库pipeline
2CREATE EXTENSION pipeline_kafka;
3-- 删除和新增kafka brokers
4SELECT pipeline_kafka.remove_broker('192.168.99.100:9092,192.168.99.101:9092,192.168.99.102:9092');
5SELECT pipeline_kafka.add_broker('内网ip1:9092,内网ip2:9092,内网ip3:9092');
6
7-- 删除视图和STREAM
8DROP CONTINUOUS VIEW msg_result;
9DROP STREAM msg_stream;
10
11-- 创建STREAM和视图
12CREATE STREAM msg_stream (msg varchar);
13CREATE CONTINUOUS VIEW msg_result AS SELECT msg FROM msg_stream;
14
15-- kafka消费者启动消费
16SELECT pipeline_kafka.consume_begin ( 'SearchLog', 'msg_stream',
17 format := 'text', group_id := 'PipelineDB-1' , batchsize := 1000,
18 maxbytes := 32000000, parallelism := 1, start_offset := 0 );
19
20-- 查询数据 347126
21select count(*) from msg_result where msg like '%,"create_time":"2017-07-11%'
22
23-- 停止kafka消费
24SELECT pipeline_kafka.consume_end('SearchLog', 'msg_stream');
Happy...
图片显示
以上是关于通过Pipelinedb使用SQL查询Kafka实时流数据的主要内容,如果未能解决你的问题,请参考以下文章
PipelineDB 1.0.0 发布——使用标准的pg 扩展开发模型