python实战——m3u8高速下载器

Posted _雪菜肉丝面_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python实战——m3u8高速下载器相关的知识,希望对你有一定的参考价值。

文章目录

1,一些理论

ffmpeg能够下载m3u8,但很慢。
它是下一段拼接一段,不能多线程。

python调用ffmpeg:

import os
# 下载地址,文件名
url = "https://iqiyi.sd-play.com/20211007/KGTJvkvQ/index.m3u8"
name = "a1"
# 调用cmd
os.system(f"ffmpeg -i url -c copy -bsf:a aac_adtstoasc ./name.mp4")

获取视频的m3u8地址

打开开发者工具,切换到网络面板,搜索m3u8。
然后刷新页面。就可以看到m3u8的地址了。

第一个m3u8

作用是找到第二个m3u8。


第三行就是第二个m3u8的地址。

第二个m3u8

有两个比较重要的部分:

  1. 解密方式,密钥。
  2. ts文件列表。

#EXT-X-KEY那一行存储了加密相关的信息,比如方式是aes128,密钥在那个链接里面。

ts文件是一个短视频,解密完毕后,可以直接播放。

后续步骤

合并,转码成mp4。用ffmpeg就可以做到。

2,获取第一个m3u8

发请求就行。

import requests

url1 = "https://iqiyi.sd-play.com"
url2 = "/20211007/KGTJvkvQ/index.m3u8"

# 获取第一个m3u8
text = requests.get(url1 + url2).text

print(text)

效果:

3,获取第二个m3u8

因为m3u8有两种,不含ts列表的和含有ts列表的。
我们无法确定之前获取的是哪一个,所以要对内容进行判断。

如果是第一个m3u8,发请求获取第二个。
如果是第二个m3u8,那么不动。

import requests

def step1(str0):
	# 一行一行判断
	list0 = str0.split("\\n")
	for index, value in enumerate(list0):
		# 区别
		if value.startswith("#EXT-X-STREAM-INF"):
			# 发请求
			return requests.get(url1 + list0[index + 1]).text
	# 保持原样
	return str0

url1 = "https://iqiyi.sd-play.com"
url2 = "/20211007/KGTJvkvQ/index.m3u8"

# 获取第一个m3u8
text = requests.get(url1 + url2).text
# 获取第二个m3u8
text = step1(text)

print(text)

效果:

4,m3u8解析

获取两部分内容:

  • 解密方式,密钥。
  • ts列表。
import requests

def step1(str0):
	# 一行一行判断
	list0 = str0.split("\\n")
	for index, value in enumerate(list0):
		# 区别
		if value.startswith("#EXT-X-STREAM-INF"):
			# 发请求
			return requests.get(url1 + list0[index + 1]).text
	# 保持原样
	return str0
def step2(str0):
	method0 = None
	key0 = None
	list0 = []
	# 获取加密信息
	if str0.__contains__("#EXT-X-KEY"):
		for item in str0.split("\\n"):
			if item.startswith("#EXT-X-KEY"):
				method0 = item[item.index("METHOD=") + 7:item.index(",URI")]
				key0 = requests.get(item[item.index("\\"") + 1: item.rindex("\\"")]).text
				break
	# 获取ts文件列表
	for item in str0.split("\\n"):
		if item.startswith("http"):
			list0.append(item)
	return 
		"method": method0,
		"key": key0,
		"list": list0
	
url1 = "https://iqiyi.sd-play.com"
url2 = "/20211007/KGTJvkvQ/index.m3u8"

# 获取第一个m3u8
text = requests.get(url1 + url2).text
# 获取第二个m3u8
text = step1(text)
# 解析m3u8
info = step2(text)

print(info)

效果:

5,清空缓存文件夹

删除缓存文件夹,然后重新创建。

def rebuildTemp():
	if os.path.exists("temp0"):
		shutil.rmtree("temp0")
	os.mkdir("temp0")

# 删除,创建缓存文件夹
rebuildTemp()

6,下载ts文件

多线程模板:

import time
from concurrent.futures import ThreadPoolExecutor

# 一个任务
def action(amount):
	print(amount)
	time.sleep(1)  # 耗时操作
	print(-amount)

# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交两个任务
future1 = pool.submit(action, 1)
future2 = pool.submit(action, 2)
# 等全部结束
pool.shutdown(wait=True)
print("Q")

下载一个:

def downOne(index, url):
	data = requests.get(url).content
	file = open(f"./temp0/index.ts", "wb")
	file.write(data)
	file.close()
	print(index, end="-")
downOne(0, "https://iqiyi.shanshanku.com/20211007/KGTJvkvQ/1000kb/hls/0QITO9qF.ts")

多线程下载:

def downAll(list0):
	pool = ThreadPoolExecutor(max_workers=64)
	for index, value in enumerate(list0):
		pool.submit(downOne, index, value)
	pool.shutdown(wait=True)

效果:

目前全部代码

import os
import shutil
from concurrent.futures import ThreadPoolExecutor

import requests

def step1(str0):
	list0 = str0.split("\\n")
	for index, value in enumerate(list0):
		if value.startswith("#EXT-X-STREAM-INF"):
			return requests.get(url1 + list0[index + 1]).text
	return str0
def step2(str0):
	method0 = None
	key0 = None
	list0 = []
	if str0.__contains__("#EXT-X-KEY"):
		for item in str0.split("\\n"):
			if item.startswith("#EXT-X-KEY"):
				method0 = item[item.index("METHOD=") + 7:item.index(",URI")]
				key0 = requests.get(item[item.index("\\"") + 1: item.rindex("\\"")]).text
				break
	for item in str0.split("\\n"):
		if item.startswith("http"):
			list0.append(item)
	return 
		"method": method0,
		"key": key0,
		"list": list0
	
def rebuildTemp():
	if os.path.exists("temp0"):
		shutil.rmtree("temp0")
	os.mkdir("temp0")
def downOne(index, url):
	data = requests.get(url).content
	file = open(f"./temp0/index.ts", "wb")
	file.write(data)
	file.close()
	print(index, end="-")
def downAll(list0):
	pool = ThreadPoolExecutor(max_workers=64)
	for index, value in enumerate(list0):
		pool.submit(downOne, index, value)
	pool.shutdown(wait=True)
# 初始数据
url1 = "https://iqiyi.sd-play.com"
url2 = "/20211007/KGTJvkvQ/index.m3u8"

# 获取第一个m3u8
text = requests.get(url1 + url2).text
print("下载第一个m3u8成功!")
# 获取第二个m3u8
text = step1(text)
print("下载第二个m3u8成功!")
# 解析m3u8
info = step2(text)
print("m3u8解析成功!")
# 删除,创建缓存文件夹
rebuildTemp()
print("目录temp0删除,创建成功!")
# 批量下载
downAll(info.get('list'))
print("下载成功!")
# 结束
print("完成!")

效果:

8,ts文件解密

库名叫:pycryptodome。

这部分比较费脑子,而且加密方式千奇百怪。
我百度找了一段,当前案例能用。

虽然还没看懂。。。

import requests
from Crypto.Cipher import AES

def downOne(index, url, key0):
	data = requests.get(url).iter_content(chunk_size=1024)
	file = open(f"index.ts", "wb")
	key1 = bytes(key0, 'utf8')
	cryptor = AES.new(key1, AES.MODE_CBC, key1)
	for chunk in data:
		if chunk:
			file.write(cryptor.decrypt(chunk))
	file.close()
	print(index, end="-")

fileName = 0
fileUrl = "https://iqiyi.shanshanku.com/20211007/KGTJvkvQ/1000kb/hls/0QITO9qF.ts"
key = 'dd38a8dcedfb8fbf'
downOne(fileName, fileUrl, key)

成功的标志,是播放器可以打开ts,而且正常播放。


对之前的代码进行调整,把key传入。

def downOne(index, url, key):
	data = requests.get(url).iter_content(chunk_size=1024)
	key0 = bytes(key, 'utf8')
	file = open(f"./temp0/index.ts", "wb")
	cryptor = AES.new(key0, AES.MODE_CBC, key0)
	for chunk in data:
		if chunk:
			file.write(cryptor.decrypt(chunk))
	file.close()
	print(index, end="-")

def downAll(list0, key):
	pool = ThreadPoolExecutor(max_workers=64)
	for index, value in enumerate(list0):
		pool.submit(downOne, index, value, key)
	pool.shutdown(wait=True)

downAll(info.get('list'),info.get('key'))

现在,所有ts都是可以播放的了。

9,ts文件合并转码

因为文件很多,要把所有ts写在一个txt里,然后交给ffmpeg来处理合并。

def step3(size):
	file = open("temp0/info.txt", "w")
	index = 0
	while index < size:
		file.write(f"file 'index.ts'\\n")
		index = index + 1
	file.close()
	
step3(len(info.get('list')))

txt是这样的:

然后就可以交给ffmpeg进行合并了。

import os

os.system("cd temp0")
os.system("ffmpeg -f concat -safe 0 -i info.txt -c copy out.mp4")

最终产物:一个可以播放的mp4文件。

全部代码

"""
内置库
"""
import os
import shutil
import urllib.parse
from concurrent.futures import ThreadPoolExecutor

"""
三方库:
	requests
	pycryptodome
"""
import requests
from Crypto.Cipher import AES

# 获取第二个m3u8
def step0(str0, url):
	url_info = urllib.parse.urlsplit(url)
	url = url_info.scheme + "://" + url_info.netloc
	list0 = str0.split("\\n")
	for index, value in enumerate(list0):
		if value.startswith("#EXT-X-STREAM-INF"):
			return requests.get(url + list0[index + 1]).text
	return str0
# 解析m3u8
def step1(str0):
	method0 = None
	key0 = None
	list0 = []
	# 加密信息
	if str0.__contains__("#EXT-X-KEY"):
		for item in str0.split("\\n"):
			if item.startswith("#EXT-X-KEY"):
				method0 = item[item.index("METHOD=") + 7:item.index(",URI")]
				key0 = requests.get(item[item.index("\\"") + 1: item.rindex("\\"")]).text
				break
	# ts文件列表
	for item in str0.split("\\n"):
		if item.startswith("http"):
			list0.append(item)
	return 
		"method": method0,
		"key": key0,
		"list": list0
	
# 创建缓存文件夹
def rebuildTemp(str0):
	if os.path.exists(str0):
		shutil.rmtree(str0)
	os.mkdir(str0)

# 一个下载任务
def downOne(index, url, key, temp):
	if key is None:
		data = requests.get(url).content
		file = open(f"./temp/index.ts", "wb")
		file.write(data)
		file.close()
		print(f'[index]', end="")
	else:
		data = requests.get(url).iter_content(chunk_size=1024)
		key0 = bytes(key, 'utf8')
		file = open(f"./temp/index.ts", "wb")
		cryptor = AES.new(key0, AES.MODE_CBC, key0)
		for chunk in data:
			if chunk:
				file.write(cryptor.decrypt(chunk))
		file.close()
		print(f'[index]', end="")
# 线程池批量下载
def downAll(list0, key, temp, size):
	pool = ThreadPoolExecutor(max_workers=size)
	for index, value in enumerate(list0):
		pool.submit(downOne, index, value, key, temp)
	pool.shutdown(wait=True)
# 准备合并
def step2(size, temp):
	file = open(f"temp/info.txt", "w")
	index = 0
	while index < size:
		file.write(f"file 'index.ts'\\n")
		index = index + 1
	file.close()
# 进行合并
def step3(temp, filename):
	os.system(f"ffmpeg -f concat -safe 0 -i temp/info.txt -c copy filename.mp4")
# 删除缓存文件夹
def removeTemp(str0):
	if os.path.exists(str0):
		shutil.rmtree(str0)
class m3u8:
	def __init__(self, kv):
		self.url = kv.get('url')
		self.temp = kv.get('temp')
		self.size =<

以上是关于python实战——m3u8高速下载器的主要内容,如果未能解决你的问题,请参考以下文章

一个简单易用的m3u8下载器,支持下载m3u8链接或文件为mp4或ts格式

学Python爬虫,不看看m3u8文件如何加密?i春秋 m3u8 文件加密解析

学Python爬虫,不看看m3u8文件如何加密?i春秋 m3u8 文件加密解析

Python 视频下载,下载加密的m3u8 文件到本地

Python 实现 m3u8 视频下载

Python3 根据m3u8下载视频,批量下载ts文件并且合并