Kafka+OpenCV 实现实时流视频处理

Posted zackstang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka+OpenCV 实现实时流视频处理相关的知识,希望对你有一定的参考价值。

 1. 启动Kafka Server

bin/kafka-server-start.sh config/server.properties &

 

2. 创建一个新topic

bin/kafka-topics.sh --create --zookeeper xxxx --replication-factor 1 --partitions 1 --topic video

 

3. 安装相关依赖

sudo pip-3.6 install kafka-python opencv-contrib-python imutils

 

4. 创建一个 Kafka Producer,并发送到kafka消息队列
# kafkaProducer
def publish_video(server, topic):
  
# start producer
  
producer = KafkaProducer(bootstrap_servers=server)

   vs = VideoStream(
src=0).start()
   time.sleep(
2.0)

  
print("publishing video...")

  
while True:
      frame = vs.read()
      frame = imutils.resize(frame,
width=400)
      frame = detection(frame,
‘pretrained.prototxt.txt‘, ‘pretrained.caffemodel‘)

     
# send to kafka topic
     
producer.send(topic, frame.tobytes())

   vs.stop()

 

这里使用opencv 采集本地摄像头视频,读取每一帧数据,做图像识别处理,并转化为bytes数据发送到 kafka topic。

其中detection方法为以一个已经训练好的深度学习模型,用于做图像识别以及描绘边框。网上类似模型很多,这里不多做赘述。这里对每帧做了一个resize是由于原视频采集的每帧数据较大,超过了kafka里默认的一个item大小,所以需要裁剪每帧,以减少传输数据量。

 

5. 验证是否可以接收到流数据:

bin/kafka-console-consumer.sh --bootstrap-server xxxx:port --topic video

 

6. 创建一个Kafka Consumer,用于获取流数据

from imutils.video import VideoStream
from imutils.video import FPS
import imutils
import numpy as np
import time
import cv2
from kafka import KafkaConsumer
import sys

def showCam(server, topic):
    consumer = KafkaConsumer(
        topic,
       
bootstrap_servers=[server])

    fps = FPS().start()


   
for msg in consumer:
       
decoded = np.frombuffer(msg.value, np.uint8)
        decoded = decoded.reshape(
225, 400, 3)

        cv2.imshow(
"Cam", decoded)

        key = cv2.waitKey(
1) & 0xFF
       
if key == ord("q"):
           
break

       
fps.update()

    fps.stop()
    cv2.destroyAllWindows()

 

这里使用np.frombuffer() 方法将每一帧的bytes数据转为一维numpy数组,由于采集的帧数据为3numpy数组,所以需要对此数组做reshape,以还原为原数据格式,最后显示在屏幕上。

 

7. 执行代码:

首先启动 Consumerpython3 kafkaCCam.py server:port topic

然后启动 Producerpython3 KafkaPCam.py server:port topic

 

即可在Consumer端获取到Producer送入到流里的实时视频图像:

技术分享图片

 


















































以上是关于Kafka+OpenCV 实现实时流视频处理的主要内容,如果未能解决你的问题,请参考以下文章

opencv+python实现视频实时质心读取

OpenCV-Python:如何从实时视频流中获取最新帧或跳过旧帧

教程 | 如何使用DockerTensorFlow目标检测API和OpenCV实现实时目标检测和视频处理

OpenCV 实时流式视频捕获很慢。如何丢帧或实时同步?

opencv 实现实时检测,求大神帮忙

Flink视频教程_基于Flink流处理的动态实时电商实时分析系统