使用 ffmpeg pyspark 和 hadoop 进行逐帧视频处理

Posted

技术标签:

【中文标题】使用 ffmpeg pyspark 和 hadoop 进行逐帧视频处理【英文标题】:Frame by frame video processing with ffmpeg pyspark and hadoop 【发布时间】:2019-07-12 18:39:05 【问题描述】:

我想使用 spark 和 hadoop 并行处理 mp4 视频帧。我不想在处理之前提取所有帧。我正在寻找的是一种按顺序读取帧数据的方法。视频时间,然后在它们使用 yarn 触发 hadoop 集群上的执行程序时提供帧。 mp4 视频文件可以在本地文件系统或 HDFS 上。

我可以使用 ffmpeg 创建一个管道,然后读取原始帧字节(例如,image = np.fromstring(pipe.stdout.read(1920*1080*3) , dtype='uint8'))。有没有办法将数据(即流,因为帧作为可变解码时间的函数进入)提供给 spark RDD 并具有执行某些操作的映射函数,例如计算平均强度?

我已经阅读 spark 文档很长一段时间了,但找不到在这种情况下有效的任何东西。我可能会因为树木而错过森林。如果可以,请提供帮助,即使它涉及不使用 ffmpeg 和管道。

【问题讨论】:

【参考方案1】:

经过反复试验,我有了一个可行的解决方案。虽然这可能并不适合所有人,但它可能会帮助一些人,所以这里是:

我首先创建了一个从视频中提取帧的脚本,该脚本必须存在于所有工作节点上:

#!/home/hadoop/anaconda2/bin/python

import os
import sys
import subprocess as sp
import numpy as np
import cv2
import copy

# RDD.pipe sends via stdin
i = 0
try:
        i = input()
except:
    sys.exit()



file_name = 'v.mp4'
FFMPEG_BIN = "ffmpeg" # on Linux ans Mac OS
command = [ FFMPEG_BIN,
               '-i', '/home/hadoop/' + file_name,
               '-f', 'image2pipe',
               '-vf', 'select=gte(n\, %d)' % i,
               '-vframes', '1',
               '-pix_fmt', 'rgb24',
               '-loglevel', 'panic',
               '-vcodec', 'png', '-']
pipe = sp.Popen(command, stdout=sp.PIPE, bufsize=10**8)
data = pipe.stdout.read()
pipe.stdout.close()
import base64
print(base64.b64encode(data))

然后,在 pyspark 脚本中,我创建一个带有脚本参数的 RDD:

params = [str(i)  for i in range(1, 1001)]
rdd1 = sc.parallelize(params, numSlices=1000)
pipeRDD = rdd1.pipe('/home/hadoop/src/extract_frame.sh')
resizedRDD = pipeRDD.map(resizeMapper)
test = resizedRDD.collect()

测试现在有前 1000 帧。 resize mapper 调整每一帧的大小,这里是:

def resizeMapper(x):
    import base64
    import cv2
    a = base64.b64decode(x)
    im = cv2.imdecode(np.fromstring(a, dtype=np.uint8), 1)
    im = cv2.resize(im, (200, 200))
    return im

我希望这对那里的人有所帮助。

【讨论】:

嗨,保罗,自从您上次发消息以来,您尝试过其他方法吗?您是否知道这种方法是否与您首先将 mpeg 文件转换为序列文件一样有效?谢谢。

以上是关于使用 ffmpeg pyspark 和 hadoop 进行逐帧视频处理的主要内容,如果未能解决你的问题,请参考以下文章

在win10上搭建pyspark,

在 PySpark 中将数据帧写入 CSV 后重命名文件 [重复]

Pyspark 数据帧从一个存储桶中读取,并在同一作业中使用不同的 KMS 密钥写入另一个存储桶

Hadoo系列之Hive安装和使用

Hadoo系列之Hive安装和使用

hadoo之HDFS