python3实现的rtsp客户端脚本

Posted 诸子流

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python3实现的rtsp客户端脚本相关的知识,希望对你有一定的参考价值。

一、说明

此客户端使用python3编写

此客户端实现RTSP的OPTIONS, DESCRIBE, SETUP , PLAY, GET_PARAMETER,TEARDOWN方法,未实现ANNOUNCE,PAUSE,SET_PARAMETER,REDIRECT,RECORD

RTSP就是针对一个URL,或是依次或是有选择地执行以上11种请求;要做渗透测试,没很多可测的,也就测试URL中的参数和各请求方法的请求头参数有没有溢出

此客户端自动依次执行请求:OPTIONS--DESCRIBE(2次)--SETUP(2次)--PLAY--GET_PARAMETER(5次)--TEARDOWN

RTSP有Basic和Digest两种验证方法,此客户端实现默认使用Digest方法,如果想换用Basic方法请将config_dict[\'auth_method\']的值由‘Digest’修改为‘Basic’

总体执行效果如下图所示

 

二、客户端源代码

V1.0版本(20180525):

import base64
import socket
import hashlib
import time

global config_dict
config_dict = {
    \'server_username\': \'admin\',
    \'server_password\': \'1234567\',
    \'server_ip\': \'10.10.6.94\',
    \'server_port\': 554,
    \'server_path\': \'/chIP=1&streamType=main/\',
    \'cseq\': 2,
    \'user_agent\': \'LibVLC/3.0.2 (LIVE555 Streaming Media v2016.11.28)\',
    \'buffer_len\': 1024,
    \'auth_method\': \'Digest\',
    \'header_modify_allow\': False,
    \'options_header_modify\': False,
    \'describe_header_modify\': False,
    \'describe_auth_header_modify\': False,
    \'setup_header_modify\': False,
    \'setup_session_header_modify\': False,
    \'play_header_modify\': False,
    \'get_parameter_header_modify\': False,
    \'teardown_header_modify\': False
}


def gen_response_value(url,public_method,realm,nonce):
    frist_pre_md5_value = hashlib.md5((config_dict[\'server_username\'] + \':\' + realm + \':\' + config_dict[\'server_password\']).encode()).hexdigest()
    first_post_md5_value = hashlib.md5((public_method+\':\' + url).encode()).hexdigest()
    response_value = hashlib.md5((frist_pre_md5_value + \':\' + nonce + \':\' + first_post_md5_value).encode()).hexdigest()
    return response_value


def gen_options_header():
    global config_dict
    str_options_header = \'OPTIONS rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[
        \'server_path\'] + \' RTSP/1.0\\r\\n\'
    str_options_header += \'CSeq: \' + str(config_dict[\'cseq\']) + \'\\r\\n\'
    str_options_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_options_header += \'\\r\\n\'
    return str_options_header


def gen_describe_header():
    global config_dict
    str_describe_header = \'DESCRIBE rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[
        \'server_path\'] + \' RTSP/1.0\\r\\n\'
    str_describe_header += \'CSeq: \' + str(config_dict[\'cseq\'] + 1) + \'\\r\\n\'
    str_describe_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_describe_header += \'Accept: application/sdp\\r\\n\'
    str_describe_header += \'\\r\\n\'
    return str_describe_header


def gen_describe_auth_header(url,realm,nonce):
    global config_dict
    public_method = \'DESCRIBE\'
    str_describe_auth_header = \'DESCRIBE rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[\'server_path\'] + \' RTSP/1.0\\r\\n\'
    str_describe_auth_header += \'CSeq: \' + str(config_dict[\'cseq\'] + 2) + \'\\r\\n\'
    if config_dict[\'auth_method\'] == \'Basic\':
        auth_64 = base64.b64encode((config_dict[\'server_username\'] + ":" + config_dict[\'server_password\']).encode("utf-8")).decode()
        str_describe_auth_header += \'Authorization: Basic \'+auth_64 + \' \\r\\n\'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_describe_auth_header += \'Authorization: Digest username="\'+config_dict[\'server_username\']+\'", realm="\'+realm+\'", nonce="\'+nonce+\'", uri="\'+url+\'", response="\'+response_value+\'"\\r\\n\'
    str_describe_auth_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_describe_auth_header += \'Accept: application/sdp\\r\\n\'
    str_describe_auth_header += \'\\r\\n\'
    return str_describe_auth_header


def gen_setup_header(url, realm, nonce):
    global config_dict
    public_method = \'SETUP\'
    str_setup_header  = \'SETUP rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[\'server_path\'] + \'trackID=0 RTSP/1.0\\r\\n\'
    str_setup_header += \'CSeq: \' + str(config_dict[\'cseq\'] + 3) + \'\\r\\n\'
    if config_dict[\'auth_method\'] == \'Basic\':
        auth_64 = base64.b64encode((config_dict[\'server_username\'] + ":" + config_dict[\'server_password\']).encode("utf-8")).decode()
        str_setup_header += \'Authorization: Basic \'+auth_64 + \' \\r\\n\'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_setup_header += \'Authorization: Digest username="\'+config_dict[\'server_username\']+\'", realm="\'+realm+\'", nonce="\'+nonce+\'", uri="\'+url+\'", response="\'+response_value+\'"\\r\\n\'
    str_setup_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_setup_header += \'Transport: RTP/AVP;unicast;client_port=50166-50167\\r\\n\'
    str_setup_header += \'\\r\\n\'
    return str_setup_header


def gen_setup_session_header(url, realm, nonce,session):
    global config_dict
    public_method = \'SETUP\'
    str_setup_session_header = \'SETUP rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[
        \'server_path\'] + \'/trackID=1 RTSP/1.0\\r\\n\'
    str_setup_session_header += \'CSeq: \' + str(config_dict[\'cseq\'] + 4) + \'\\r\\n\'
    if config_dict[\'auth_method\'] == \'Basic\':
        auth_64 = base64.b64encode((config_dict[\'server_username\'] + ":" + config_dict[\'server_password\']).encode("utf-8")).decode()
        str_setup_session_header += \'Authorization: Basic \'+auth_64 + \' \\r\\n\'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_setup_session_header += \'Authorization: Digest username="\'+config_dict[\'server_username\']+\'", realm="\'+realm+\'", nonce="\'+nonce+\'", uri="\'+url+\'", response="\'+response_value+\'"\\r\\n\'
    str_setup_session_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_setup_session_header += \'Transport: RTP/AVP;unicast;client_port=50168-50169\\r\\n\'
    str_setup_session_header += \'Session: \'+session+\'\\r\\n\'
    str_setup_session_header += \'\\r\\n\'
    return str_setup_session_header


def gen_play_header(url, realm, nonce,session):
    global config_dict
    public_method = \'PLAY\'
    str_play_header = \'PLAY rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[
        \'server_path\'] + \' RTSP/1.0\\r\\n\'
    str_play_header += \'CSeq: \' + str(config_dict[\'cseq\'] + 5) + \'\\r\\n\'
    if config_dict[\'auth_method\'] == \'Basic\':
        auth_64 = base64.b64encode((config_dict[\'server_username\'] + ":" + config_dict[\'server_password\']).encode("utf-8")).decode()
        str_play_header += \'Authorization: Basic \'+auth_64 + \' \\r\\n\'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_play_header += \'Authorization: Digest username="\'+config_dict[\'server_username\']+\'", realm="\'+realm+\'", nonce="\'+nonce+\'", uri="\'+url+\'", response="\'+response_value+\'"\\r\\n\'
    str_play_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_play_header += \'Session: \'+session+\'\\r\\n\'
    str_play_header += \'Range: npt=0.000-\\r\\n\'
    str_play_header += \'\\r\\n\'
    return str_play_header


def gen_get_parameter_header(url, realm, nonce, session,count):
    global config_dict
    public_method = \'GET_PARAMETER\'
    str_get_parameter_header = \'GET_PARAMETER rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[
        \'server_path\'] + \' RTSP/1.0\\r\\n\'
    str_get_parameter_header += \'CSeq: \' + str(config_dict[\'cseq\'] + 6+int(count)) + \'\\r\\n\'
    if config_dict[\'auth_method\'] == \'Basic\':
        auth_64 = base64.b64encode((config_dict[\'server_username\'] + ":" + config_dict[\'server_password\']).encode("utf-8")).decode()
        str_get_parameter_header += \'Authorization: Basic \'+auth_64 + \' \\r\\n\'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_get_parameter_header += \'Authorization: Digest username="\' + config_dict[
        \'server_username\'] + \'", realm="\' + realm + \'", nonce="\' + nonce + \'", uri="\' + url + \'", response="\' + response_value + \'"\\r\\n\'
    str_get_parameter_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_get_parameter_header += \'Session: \' + session + \'\\r\\n\'
    str_get_parameter_header += \'\\r\\n\'
    return str_get_parameter_header


def gen_teardown_header(url, realm, nonce, session):
    global config_dict
    public_method = \'TEARDOWN\'
    str_teardown_header = \'TEARDOWN rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[\'server_path\'] + \' RTSP/1.0\\r\\n\'
    str_teardown_header += \'CSeq: \' + str(config_dict[\'cseq\'] + 11) + \'\\r\\n\'
    if config_dict[\'auth_method\'] == \'Basic\':
        auth_64 = base64.b64encode((config_dict[\'server_username\'] + ":" + config_dict[\'server_password\']).encode("utf-8")).decode()
        str_teardown_header += \'Authorization: Basic \'+auth_64 + \' \\r\\n\'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_teardown_header += \'Authorization: Digest username="\' + config_dict[
        \'server_username\'] + \'", realm="\' + realm + \'", nonce="\' + nonce + \'", uri="\' + url + \'", response="\' + response_value + \'"\\r\\n\'
    str_teardown_header += \'User-Agent: \' + config_dict[\'user_agent\'] + \'\\r\\n\'
    str_teardown_header += \'Session: \' + session + \'\\r\\n\'
    str_teardown_header += \'\\r\\n\'
    return str_teardown_header


def add_header_according_to_protocol(str_header):
    str_header = str_header[0:len(str_header)-2]
    str_header += \'Accept: application/rtsl, application/sdp;level=2\'
    str_header += \'Accept-Encoding: \\r\\n\'
    str_header += \'Accept-Language: \\r\\n\'
    str_header += \'Authorization: \\r\\n\'
    str_header += \'Bandwidth: 1*DIGIT \\r\\n\'
    str_header += \'Blocksize: \\r\\n\'
    str_header += \'Cache-Control: no-cache \\r\\n\'
    str_header += \'Conference: 199702170042.SAA08642@obiwan.arl.wustl.edu%20Starr \\r\\n\'
    str_header += \'Connection: \\r\\n\'
    str_header += \'Content-Base: \\r\\n\'
    str_header += \'Content-Encoding: \\r\\n\'
    str_header += \'Content-Language: \\r\\n\'
    str_header += \'Content-Length: 20 \\r\\n\'
    str_header += \'Content-Location: \\r\\n\'
    str_header += \'Content-Type: \\r\\n\'
    str_header += \'Date: \\r\\n\'
    str_header += \'Expires: Thu, 01 Dec 1994 16:00:00 GMT \\r\\n\'
    str_header += \'From: \\r\\n\'
    str_header += \'If-Modified-Since: Sat, 29 Oct 1994 19:43:31 GMT \\r\\n\'
    str_header += \'Last-Modified: \\r\\n\'
    str_header += \'Proxy-Require: \\r\\n\'
    str_header += \'Referer: \\r\\n\'
    str_header += \'Require: funky-feature \\r\\n\'
    str_header += \'Scale: -3.5 \\r\\n\'
    str_header += \'Speed: 2.5 \\r\\n\'
    str_header += \'Transport: RTP/AVP;unicast;client_port=3456-3457;mode="PLAY" \\r\\n\'
    str_header += \'User-Agent: \\r\\n\'
    str_header += \'Via: \\r\\n\'
    str_header += \'Range: npt=2\\r\\n\'
    str_header += \'\\r\\n\'
    return str_header


def exec_full_request():
    socket_send = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socket_send.settimeout(5)
    socket_send.connect((config_dict[\'server_ip\'], config_dict[\'server_port\']))
    
    url = \'rtsp://\' + config_dict[\'server_ip\'] + \':\' + str(config_dict[\'server_port\']) + config_dict[\'server_path\']
    
    print(\'now start to check options operation\')
    str_options_header = gen_options_header()
    if config_dict[\'header_modify_allow\'] & config_dict[\'options_header_modify\']:
        str_options_header = add_header_according_to_protocol(str_options_header)
    socket_send.send(str_options_header.encode())
    msg_recv = socket_send.recv(config_dict[\'buffer_len\']).decode()
    if \'200 OK\' in msg_recv:
        print(\'OPTIONS request is OK\')
    else:
        print(\'OPTIONS request is BAD\')
    str_describe_header = gen_describe_header()
    if config_dict[\'header_modify_allow\'] & config_dict[\'describe_header_modify\']:
        str_describe_header = add_header_according_to_protocol(str_describe_header)
    socket_send.send(str_describe_header.encode())
    msg_recv = socket_send.recv(config_dict[\'buffer_len\']).decode()
    if msg_recv.find(\'401 Unauthorized\') == -1 & False:
        msg_recv_dict = msg_recv.split(\'\\r\\n\')
        print(\'first DESCRIBE request occur error: \')
        print(msg_recv_dict[0])
    else:
        print(\'一个基于JRTPLIB的轻量级RTSP客户端(myRTSPClient)——实现篇:用户接口层之RTSP命令

代码片段:Shell脚本实现重复执行和多进程

一个基于JRTPLIB的轻量级RTSP客户端(myRTSPClient)——实现篇:用户接口层之提取媒体流数据

javascript中的RTP RTSP实现

RTSP实时音视频(H264/H265/AAC)开发实战项目

流媒体协议之RTSP客户端的实现20171014