[转]使用PHP处理Kafka消息
Posted codebox
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[转]使用PHP处理Kafka消息相关的知识,希望对你有一定的参考价值。
每天与你分享
IT编程开发 技术干货 区块链技术 技术思维导图 效率工具
Kafka的特点:
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】
支持Kafka Server间的消息分区,同时保证每个Partition内的消息顺序传输。
分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
同时支持离线数据处理和实时数据处理。
Kafka的架构:
kafka
Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。
Kafka基本概念:
Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。
Kafka消息发送的流程:
Kafka-Message
下面是php生产、消费Kafka消息的例子(假设已经配置好Kafka):
1.从zookeeper源码src/c/src安装zookeeper c client
cd zookeeper-3.4.8/src/c
./configure
make && make install
2.编译php libzookper扩展
git clone https://github.com/Timandes/libzookeeper.git
cd libzookeeper
phpize
./configure --with-libzookeeper=/usr/local/bin/cli_mt
make && make install
3.编译php zookeeper扩展
git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
phpize
./configure
make && make install
4.修改php.ini配置,添加libzookeeper和php-zookeeper扩展
extension=libzookeeper.so
extension=zookeeper.so
PHP处理Kafka消息:
1.启动zookeeper和kafka
kafka_2.11-0.10.0.0/bin/zookeeper-server-start.sh --daemon
kafka_2.11-0.10.0.0/config/zookeeper.properties
kafka_2.11-0.10.0.0/bin/kafka-server-start.sh kafka_2.11-0.10.0.0/config/server.properties
2.创建由2个partition组成的、名为testtopic的topic
kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testtopic
3.composer安装nmred/kafka-php
composer require "nmred/kafka-php"
4.producer.php代码
<?php
require_once('./vendor/autoload.php');
$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);
$produce->setRequireAck(-1);
$topicName = 'testtopic';
//获取到topic下可用的partitions
$partitions = $produce->getAvailablePartitions($topicName);
$partitionCount = count($partitions);
$count = 1;
while(true){
$message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s')));
//发送消息到不同的partition
$partitionId = $count%$partitionCount;
$produce->setMessages('testtopic', $partitionId, array($message));
$result = $produce->send();
var_dump($result);
$count++;
echo "producer sleeping\n";
sleep(1);
}
5.consumer.php代码
<?php
require_once('./vendor/autoload.php');
//获取需要处理的partitionId
$partitionId = isset($argv[1]) ? intval($argv[1]) : 0;
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('test-consumer-group');
$consumer->setPartition('testtopic', $partitionId);
$consumer->setFromOffset(true);
$consumer->setMaxBytes(102400);
while(true){
$topic = $consumer->fetch();
foreach ($topic as $topicName => $partition) {
foreach ($partition as $partId => $messageSet) {
foreach ($messageSet as $message) {
var_dump($message);
}
}
}
echo "consumer sleeping\n";
sleep(1);
}
6.运行php代码
在3个终端界面分别运行
php producer.php
php consumer.php 0
php consumer.php 1
7.结果
两个consumer脚本依次收到producer发送的消息
推荐阅读
区块链技术入门学习分享
php7.0新特性学习
Tinkphp5.0框架博客项目实战
GO语言编程基础学习
以上是关于[转]使用PHP处理Kafka消息的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段