OpenCV+TensorFlow简单的机器小车传统视觉寻迹

Posted 叫我田小霖啦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OpenCV+TensorFlow简单的机器小车传统视觉寻迹相关的知识,希望对你有一定的参考价值。

该文章适合OpenCv的初学者以及对计算机视觉有了简单认识的朋友。以下将根据不同的能力水平进行梯度的讲解。最后会附带完整代码。

小白需要知道的

什么是传统的视觉寻迹?

个人认为,传统寻迹就是通过记录轨迹的横坐标来进行判断。例如:

这张图就被认为是直行。但是机器要怎么进行判断呢?

我们可以通过将这个图片转换为一个矩阵,然后通过遍历来记录这些黑点的横坐标,从而获得黑点的平均横坐标来判断。

source = 0  # 记录黑点的横坐标
m = 0  # 记录黑点个数
y = 143  # 画面的横轴大小
x = 80  # 画面的纵轴大小
for i in range(y):
    for j in range(x):
        if pred[j, i] == 0:
            m += 1
            source += j
source /= m  # 获得平均横坐标
source -= x / 2  # 对比中轴差值
if abs(source) <= 10:
    print("前进")
elif source > 0:
    print("左转")
else:
    print("右转")

上面代码pred为画面的矩阵。当平均横坐标与x中轴值大小偏差10以内就被认为“直行”。

同理根据计算平均横坐标值,这两张图就被视为“左转”,机器也很好的进行了识别。

 

这也是一样的,被机器视为右转。

寻迹的思想

从上述的简单介绍中,我们就很好的认识到了机器对于轨迹的简单识别。也是传统视觉的方法。

接下来就要介绍寻迹的思路。

可能会有朋友问:上述的图片都是黑白的,并且轨迹为黑色,跟现实不符。那我们需要怎么做到这一步呢?

总的来说分为:获取图片(RGB) ---> 转为灰度图  -----> 转为二值图  ----> 转为矩阵

 

THRESHOLD = 80 # 设置阈值为80
ret, frame = capture.read()  # 一帧一帧读取视频    
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)  # 对每一帧做处理,设置为灰度图
retval, black_Write = cv.threshold(gray, THRESHOLD, 255, cv.THRESH_BINARY)  # 将灰度图二值化
data = np.array(black_Write)  # 把这个数据通过 numpy 转换成多维度的张量

在转化为灰度图的时候,会将RGB三颜色通道转换为0~255的灰度图,0表示纯黑,255表示纯白。

转化为二值图的先决条件就是要转换为灰度图,然后通过设置阈值将灰度图所有的像素点分割为0和255两种颜色。上述代码第三行的THRESHOULD是就是阈值,大小在0~255之间。我这里设置的就是80。

进阶知识

矩阵的处理---缩小矩阵

最开始测试的时候,我用的是10块钱二手买来的USB免驱摄像头。拍摄分辨率为640*480,在遍历时我们大多数会选择二重for循环进行,然后进行判断。但是这是十分消耗时间的。因此对于矩阵的处理显得尤为重要。接下来将介绍方法一:缩小矩阵。

X_LEFT_CUT_NUM = 199
X_RIGHT_CUT_NUM = 439
Y_HIGH_CUT_NUM = 439
Y_LOW_CUT_NUM = 239
data_after = data[X_LEFT_CUT_NUM:X_RIGHT_CUT_NUM, Y_LOW_CUT_NUM:Y_HIGH_CUT_NUM]  # 裁剪图像,获得一个200*240

 data为经过二值化后得到的矩阵。

 

 

 通过裁剪得到的就为橙色区域。对于小车寻迹来说,刚好这一块就是最需要关心的地方,离小车很近,是下一步会到达的区域。相比于遍历480*640的矩阵来说,遍历这个矩阵时间会大幅度减少。 

但是,依旧非常的消耗时间,我的测试平台是NVIDIA Jetson nano,得到的帧数竟然只有2帧。对于这块板子来说是非常不理想的,于是就分析了原因。nano这块板子强于树莓派就在于它具有128个CUDA,处理图像以及做流式计算会大幅度加快。但是因为还没有用到张量的运算,因此这里还是取决于CPU的处理速度。

  

 在我添加时间戳之后一切都显得明朗了,对于矩阵的遍历速度实在是太慢了。

平均采样

对于上述出现的问题,我想到一个解决办法,平均采样。我们对于一个图片进行处理其实并不需要对每一行进行判断。比如现在需要遍历200行,我只需要对其中的20行进行遍历就大概得到了这个趋势。

source = 0  # 记录黑点的横坐标
m = 0  # 记录黑点个数
y = int(200 / 20)  # 画面的纵轴大小
x = 240  # 画面的横轴大小
for i in range(y):
    for j in range(x):
        if pred[j, i * 10] == 0:
            m += 1
            source += j

 这里我选择了每20行取一行,果然帧数上升了。成功到了八帧。。。

 右边窗口就是图像。但是对于寻迹来说还是相当不够看的。既然在矩阵这块已经走到头了那么现在就得思考其他办法了。

池化操作

前面有谈到,到目前为止,因为没有使用矩阵的运算,因此没有发挥出NVIDIA Jetson nano这块开发板的优势,所以我们可以想想如何往这方面靠靠。碰巧我今年暑假接触过一点计算机视觉的网上课程。我就想到了池化的操作。

 池化操作分为最大池化和平均池化,如上图所示,其实核心就是用卷积核将图片缩小,用局部特征代替整体的一中方式。最大池化操作的运算速度会更快,但是会丢失一些细节信息,平均池化是根据卷积核内的所有值求平均来代替整体,所以运算会稍慢,但是会保留细节信息。

池化操作不同于卷积,池化的卷积核可以视为权重一样的卷积。池化只起到缩小图片,以及转换维度的作用。讲远了,这里就只用到了池化这一个方法。理解意思即可,只需要知道是用局部特征代替整体的一种方法,能缩小图片大小,但是也能非常好的表现图片。

STRIDE = 3  # 步数
data_after = tf.convert_to_tensor(data, tf.float32, name='data_after')
data_after = tf.expand_dims(data_after, axis=0)  # 拓展为三维张量
data_after = tf.expand_dims(data_after, axis=3)  # 拓展为三维张量
out = tf.nn.avg_pool(data_after, [1, STRIDE, STRIDE, 1], [1, STRIDE, STRIDE, 1], 'SAME')  # 平均池化,后面的数组为跳转步数

 data_after为上面经过二值化得到的矩阵。shape为[240,200]但是进行池化操作需要进行维度扩展,否则无法使用卷积核进行池化操作。进行了第3、4行的操作后shape为[1,240,200,1]。就可以进行池化操作了。

池化操作的第1个参数需要处理的矩阵;

第2个参数为卷积核,数组的第一个和最后一个为必须为1,中间两个数表示卷积核大小,例如这里就是3*3的卷积核;

第3个参数为移动的步长,卷积核在图像上移动才能得到处理后的图像。与上面一样数组里第1个和最后一个为1,表示会遍历所有的维度,中间两个数为横向移动和纵向移动的步长。

第四个参数选择为"VALID"和“SAME”分别表示为是否需要边界填充。对于新手来说,如果搞不清楚直接选择“SAME”方便计算池化后的大小。

通过上述操作,我使用3*3的卷积核,步长为3进行池化操作得到的处理后的图片x轴缩小了3倍,y轴缩小了3倍。平均取样间隔该到了10行一次,此时我在运行时,以及达到了30帧+了。(其实以及达到了上限,后来我将平均采样该到了5行一次,依然能保持24帧+)

 算法优化(重点)

240*200的矩阵卷积之后变成了80*67的矩阵,由于每10行取样一次,就可能造成得到的值不能很好的表现当前的情况。因此我们需要想办法优化算法。

二次池化

前面提到了先经过裁剪,然后池化得到的80*67的矩阵,感受野非常的窄,不能很好的对稍远的情况作出判断。最直观的改变就是小车的速度不能非常快,一旦速度超过了上一帧的画面,那么视觉寻迹将变得没有意义。

在此,就提出了二次池化,用池化代替裁剪。就可以获得了更大的感受野。此时我也将摄像头改为了板载摄像头。以下是板载摄像头的打开方式,CSI摄像头用ViserCapture是无法打开的,需要进行配置。 

def gstreamer_pipeline(
        capture_width=1280,
        capture_height=720,
        display_width=1280,
        display_height=720,
        framerate=60,
        flip_method=0,
):
    return (
            "nvarguscamerasrc ! "
            "video/x-raw(memory:NVMM), "
            "width=(int)%d, height=(int)%d, "
            "format=(string)NV12, framerate=(fraction)%d/1 ! "
            "nvvidconv flip-method=%d ! "
            "video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! "
            "videoconvert ! "
            "video/x-raw, format=(string)BGR ! appsink"
            % (
                capture_width,
                capture_height,
                framerate,
                flip_method,
                display_width,
                display_height,
            )
    )


capture = cv.VideoCapture(gstreamer_pipeline(flip_method=0), cv.CAP_GSTREAMER)  # 创建一个VideoCapture对象

此时窗口大小变为了1280*720,进行了两次3*3卷积核,步长为3的池化操作,窗口大小变为了142*80,平均采样为10行采样一行。

当时我人是在动的,可以看到感受野十分大,并且在这么复杂的环境下依旧能维持在30帧上下。(后来平均采样为5行采样一行时,帧数也维持在24帧上下)到现在,感受野以及对图片的表达已经做的不错了。

有的朋友会问,为什么要经过两次池化,而不是一次池化原则更大的卷积核以及步长呢?回答这个问题,首先卷积核池化一次只会输出一个值即一个特征,那么就意味着在该值下的其他特征就会被覆盖。大的卷积核得到的图片就不会有二次卷积得到的图片包含的特征细腻。其次,著名AlexNet在当年的人工智能大赛获得冠军就证明了连续多个小型卷积核的组合效果优于大型卷积核。

判断优化 

对于“直行”来说左侧空白处,是不需要判断的,进行判断十分浪费时间,并且没有意义。

同理对于“右转”来说亦是如此。

那就可以通过上一张图片处理得到的命令,来选择对当前图片的处理范围。很好的解决了上面提到的问题。

那我们再思考,是不是轨迹右边的空白部分也是属于没有必要判断的呢?

在这里我设计了一个算法,在扫描矩阵每一行的时候,在得到轨迹之后,如果出现连续多个的白点,那么直接跳出当前循环

STRIDE = 3
CAPTURE_WIDTH = 1280  # 摄像头捕捉画面宽
CAPTURE_HEIGHT = 720  # 摄像头捕捉画面高
order = ""  # 保存上一条指令
RANGE_NUM = 10  # 直行允许波动范围
while True:
    source = 0  # 记录黑点的横坐标
    black_point_num = 0  # 代表黑点个数
    x = int(CAPTURE_WIDTH / STRIDE / STRIDE)
    y = int(CAPTURE_HEIGHT / STRIDE / STRIDE)
    opt = 0
    if order == "前进":
        opt = int(x / 4)
    elif order == "右转":
        opt = int(x / 3)
    else:
        opt = 0
    for i in range(y):
        row = 0  # 记录当前行的黑点数量
        row_white_point_num = 0  # 代表行白点个数
        for j in range(opt, x):
            if pred[i, j] == 0:
                row += 1
                row_white_point_num = 0
                source += j
            else:
                row_white_point_num += 1
                if row > 5 and row_white_point_num > 5:
                    break
        black_point_num += row
    source /= black_point_num  # 获得平均横坐标
    source -= x / 2
    if abs(source) <= RANGE_NUM:
        print("前进")
        order = "前进"
    elif source > 0:
        print("右转")
        order = "右转"
    else:
        print("左转")
        order = "左转"

为了验证这个效率,我取消了平均采样。当前的条件就为对142*80的矩阵进行遍历。

 在没经过判断优化的时候,可以看到帧数在6.5左右。

 

在进行优化判断之后,帧数来到了12帧左右,效率几乎翻倍。

完整代码

优化前

# 开发作者   :Tian.Z.L
# 开发时间   :2022/1/9  21:33 
# 文件名称   :vision.PY
# 开发工具   :PyCharm
import time
# import Adafruit_SSD1306
import cv2 as cv
import numpy as np
import tensorflow as tf

# from PIL import Image

THRESHOLD = 100  # 二值化阈值
STRIDE = 3
X_LEFT_CUT_NUM = 199
X_RIGHT_CUT_NUM = 439
Y_HIGH_CUT_NUM = 439
Y_LOW_CUT_NUM = 239
RANGE_NUM = 10


# 注意 使用的是哪组i2c的接口,对应调整i2c_bus取值
# OLED = Adafruit_SSD1306.SSD1306_128_64(rst=None, i2c_bus=1, gpio=1)
# OLED.begin()  # 初始化屏幕并清屏
# OLED.clear()
# OLED.display()
def gstreamer_pipeline(
        capture_width=1280,
        capture_height=720,
        display_width=1280,
        display_height=720,
        framerate=60,
        flip_method=0,
):
    return (
            "nvarguscamerasrc ! "
            "video/x-raw(memory:NVMM), "
            "width=(int)%d, height=(int)%d, "
            "format=(string)NV12, framerate=(fraction)%d/1 ! "
            "nvvidconv flip-method=%d ! "
            "video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! "
            "videoconvert ! "
            "video/x-raw, format=(string)BGR ! appsink"
            % (
                capture_width,
                capture_height,
                framerate,
                flip_method,
                display_width,
                display_height,
            )
    )


capture = cv.VideoCapture(gstreamer_pipeline(flip_method=0), cv.CAP_GSTREAMER)  # 创建一个VideoCapture对象
start_time = time.time()
while (True):
    ret, frame = capture.read()  # 一帧一帧读取视频
    gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)  # 对每一帧做处理,设置为灰度图
    retval, black_Write = cv.threshold(gray, THRESHOLD, 255, cv.THRESH_BINARY)  # 将灰度图二值化
    data = np.array(black_Write)  # 把这个数据通过 numpy 转换成多维度的张量
    # data_after = data[X_LEFT_CUT_NUM:X_RIGHT_CUT_NUM, Y_LOW_CUT_NUM:Y_HIGH_CUT_NUM]  # 裁剪图像,获得一个200*240
    data_after = tf.convert_to_tensor(data, tf.float32, name='data_after')
    data_after = tf.expand_dims(data_after, axis=0)  # 拓展为三维张量
    data_after = tf.expand_dims(data_after, axis=3)  # 拓展为三维张量
    # out = tf.nn.avg_pool(data_after, [1, STRIDE, STRIDE, 1], [1, STRIDE, STRIDE, 1], 'SAME')  # 平均池化,后面的数组为跳转步数
    out = tf.nn.max_pool(data_after, [1, STRIDE, STRIDE, 1], [1, STRIDE, STRIDE, 1], 'SAME')  # 最大池化,后面的数组为跳转步数
    out = tf.nn.max_pool(out, [1, STRIDE, STRIDE, 1], [1, STRIDE, STRIDE, 1], 'SAME')
    # print(out)
    out = tf.squeeze(out)  # 缩小张量维度,将所有维度为1的去除掉,在这里就表现只剩下二值图
    pred = np.array(out, np.uint8)
    cv.imshow('123', pred)  # 显示结果
    source = 0  # 记录黑点的横坐标
    m = 0   # 记录黑点个数
    y = 143  # 画面的横轴大小
    x = 80  # 画面的纵轴大小
    for i in range(y):
        for j in range(x):
            if pred[j, i] == 0:
                m += 1
                source += j
    if cv.waitKey(1) & 0xFF == ord('q'):  # 按q停止
        break
    if (time.time() - start_time) != 0:  # 实时显示帧数
        # print((time.time() - start_time))
        print("FPS: ", 1 / (time.time() - start_time))
    try:
        source /= m  # 获得平均横坐标
        source -= x / 2
        if abs(source) <= 10:
            print("前进")
        elif source > 0:
            print("左转")
        else:
            print("右转")
        start_time = time.time()
    except:
        start_time = time.time()
        print("停止")
        continue
capture.release()  # 释放cap,销毁窗口
cv.destroyAllWindows()

 优化后

 

# 开发作者   :Tian.Z.L
# 开发时间   :2022/1/11  17:15 
# 文件名称   :visionByOptimization.PY
# 开发工具   :PyCharm
import time
import cv2 as cv
import numpy as np
import tensorflow as tf


THRESHOLD = 100  # 二值化阈值
STRIDE = 3
CAPTURE_WIDTH = 1280  # 摄像头捕捉画面宽
CAPTURE_HEIGHT = 720  # 摄像头捕捉画面高
FRAMERATE = 60  # 摄像头捕捉画面帧数
RANGE_NUM = 10
order = ""  # 记录上一条命令

# 调取半载摄像头
def gstreamer_pipeline(
        capture_width=CAPTURE_WIDTH,
        capture_height=CAPTURE_HEIGHT,
        display_width=CAPTURE_WIDTH,
        display_height=CAPTURE_HEIGHT,
        framerate=FRAMERATE,
        flip_method=0,
):
    return (
            "nvarguscamerasrc ! "
            "video/x-raw(memory:NVMM), "
            "width=(int)%d, height=(int)%d, "
            "format=(string)NV12, framerate=(fraction)%d/1 ! "
            "nvvidconv flip-method=%d ! "
            "video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! "
            "videoconvert ! "
            "video/x-raw, format=(string)BGR ! appsink"
            % (
                capture_width,
                capture_height,
                framerate,
                flip_method,
                display_width,
                display_height,
            )
    )


capture = cv.VideoCapture(gstreamer_pipeline(flip_method=0), cv.CAP_GSTREAMER)  # 创建一个VideoCapture对象
start_time = time.time()
while (True):
    ret, frame = capture.read()  # 一帧一帧读取视频
    gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)  # 对每一帧做处理,设置为灰度图
    retval, black_Write = cv.threshold(gray, THRESHOLD, 255, cv.THRESH_BINARY)  # 将灰度图二值化
    data = np.array(black_Write)  # 把这个数据通过 numpy 转换成多维度的张量
    data_after = tf.convert_to_tensor(data, tf.float32, name='data_after')
    data_after = tf.expand_dims(data_after, axis=0)  # 拓展维度
    data_after = tf.expand_dims(data_after, axis=3)  # 拓展维度
    # out = tf.nn.avg_pool(data_after, [1, STRIDE, STRIDE, 1], [1, STRIDE, STRIDE, 1], 'SAME')  # 平均池化,后面的数组为跳转步数
    out = tf.nn.max_pool(data_after, [1, STRIDE, STRIDE, 1], [1, STRIDE, STRIDE, 1], 'SAME')  # 最大池化,后面的数组为跳转步数
    out = tf.nn.max_pool(out, [1, STRIDE, STRIDE, 1], [1, STRIDE, STRIDE, 1], 'SAME')
    # print(out)
    out = tf.squeeze(out)  # 缩小张量维度,将所有维度为1的去除掉,在这里就表现只剩下二值图
    pred = np.array(out, np.uint8)
    cv.imshow('123', pred)  # 显示结果

    source = 0  # 记录黑点的横坐标
    black_point_num = 0  # 代表黑点个数
    x = int(CAPTURE_WIDTH / STRIDE / STRIDE)
    y = int(CAPTURE_HEIGHT / STRIDE / STRIDE)
    opt = 0
    if order == "前进":
        opt = int(x / 4)
    elif order == "右转":
        opt = int(x / 3)
    else:
        opt = 0
    for i in range(y):
        row = 0  # 记录当前行的黑点数量
        row_white_point_num = 0  # 代表行白点个数
        for j in range(opt, x):
            if pred[i, j] == 0:
                row += 1
                row_white_point_num = 0
                source += j
            else:
                row_white_point_num += 1
                if row > 5 and row_white_point_num > 5:
                    break
        black_point_num += row
    if cv.waitKey(1) & 0xFF == ord('q'):  # 按q停止
        break
    if (time.time() - start_time) != 0:  # 实时显示帧数
        # print((time.time() - start_time))
        print("FPS: ", 1 / (time.time() - start_time))
    try:
        source /= black_point_num  # 获得平均横坐标
        source -= x / 2
        if abs(source) <= RANGE_NUM:
            print("前进")
            order = "前进"
        elif source > 0:
            print("右转")
            order = "右转"
        else:
            print("左转")
            order = "左转"
        start_time = time.time()
    except:
        start_time = time.time()
        print("停止")
        continue
capture.release()  # 释放cap,销毁窗口
cv.destroyAllWindows()

以上是关于OpenCV+TensorFlow简单的机器小车传统视觉寻迹的主要内容,如果未能解决你的问题,请参考以下文章

#基于DAYU200开发一个好玩的图传智能小车--demo样例#

TensorFlow 和 OpenCV 实时分类

基于Tensorflow + Opencv 实现CNN自定义图像分类

基于Tensorflow + Opencv 实现CNN自定义图像分类

树莓派视觉小车 -- 物体跟踪(OpenCV)

OpenCV DNN模块——从TensorFlow模型导出到OpenCV部署详解