多线程下载主要用到http请求中的header
- Content-Length:资源长度,用于确认资源的总长度,从而便于规划每个线程的任务量
- Range:bytes=beg1-end1;beg2-end2,用来控制下载的资源的某一部分,需要注意,这里的beg、end是前闭后闭区间。
当下载的片段较小时,很容易出错,需要重试,可以使用retry模块通过注解方式实现重试,这个模块非常好用。
Python的多线程没有体现出优势来。链条的强度取决于最薄弱的一环,木桶的容量取决于最短的木板,系统的的并发量取决于并发量最小的模块。多线程体现不出优势,可能是因为网速。如果单线程就能够将网速充分利用起来,那么多线程就没有用了。
import os
import threading
import time
import requests
import retry
url = 'http://mp4.vjshi.com/2017-12-18/422ded2944a95d6ca09752e04f687dd6.mp4'
def one_thread():
# 37.86秒
begTime = time.time()
resp = requests.get(url)
with open("haha.mp4", "wb") as f:
f.write(resp.content)
endTime = time.time()
print(endTime - begTime)
def multi_thread():
PER_THREAD_MIN = 2000 # 每个线程至少下载量
MAX_THREAD_COUNT = 50 # 最多线程数
TEMP_FOLDER = "dow" # 临时文件夹
TARGET_FILE_NAME = "mul.mp4" # 存储目标
if not os.path.exists(TEMP_FOLDER):
os.mkdir(TEMP_FOLDER)
begTime = time.time()
resp = requests.get(url, stream=True)
sz = int(resp.headers['Content-Length'])
block_sz = max(sz // MAX_THREAD_COUNT, PER_THREAD_MIN)
task = []
cnt = 0
for i in range(0, sz, block_sz):
now_sz = sz - i if sz - i - block_sz < PER_THREAD_MIN else block_sz
it = {
'beg': i,
'end': i + now_sz,
'path': os.path.join(TEMP_FOLDER, str(cnt)),
'last': i + now_sz == sz
}
task.append(it)
cnt += 1
if it['last']:
break
lock = threading.Lock()
def merge():
with open(TARGET_FILE_NAME, "wb") as f:
for j, i in enumerate(task):
with open(i['path'], 'rb') as ff:
f.write(ff.read(i['end'] - i['beg']))
endTime = time.time()
print(endTime - begTime)
@retry.retry(tries=100)
def go(it):
nonlocal cnt
print(it)
resp = requests.get(url, headers={
'Range': "bytes=%d-%d" % (it['beg'], it['end'] - 1)
})
if resp.status_code not in [200, 206]:
print(it, resp.status_code, '爬虫失败')
raise Exception("爬虫失败")
if len(resp.content) != it['end'] - it['beg']:
print("长度不对")
raise Exception("长度不对")
with open(it['path'], 'wb') as f:
f.write(resp.content)
print(it, it['end'] - it['beg'], len(resp.content), 'over', resp.status_code)
lock.acquire(timeout=0)
cnt -= 1
if cnt == 0:
merge()
lock.release()
def start_threading():
for i in task:
threading.Thread(target=go, args=(i,)).start()
start_threading()
# one_thread()
multi_thread()