1.說明
m3u8是一種傳輸數(shù)據(jù)的方式,比如說一集20分鐘的完整視頻被分割成一千多段一兩秒的小視頻,客戶端播放的時候是感覺是連續(xù),但如果你要下載這集視頻,那就要把一千多個小視頻全都下載然后自己拼接成一個完整視頻。拼接的話很簡單,像格式工廠等很多軟件都可以輕松完成,但要一個一個下載視頻分段確實麻煩,所以我打算使用Python開啟多線程下載,每個視頻使用一個線程,只要你的網(wǎng)速夠快,幾秒鐘下載一集視頻沒什么問題。
2.實現(xiàn)思路
2.1.m3u8文件。m3u8一般是以m3u8結(jié)尾的文件,如果是瀏覽器,可以按一下F12打開DevTools進行抓包獲取m3u8的完整鏈接,下載之后提取所有視頻分段的uri,為了方便操作,我們可以使用m3u8庫。
2.2.加密解密。有些m3u8是加密的,但會在文件里給出秘鑰的url,請求一下即可得到秘鑰,秘鑰一般是一個數(shù)字字母組成的字符串。一般加密算法是AES-128,我們需要借助pycryptodome庫對已加密的視頻進行解密操作。
2.3.視頻合并。Windows系統(tǒng)自帶的copy命令也可以合并,但是經(jīng)過我測試,發(fā)現(xiàn)合并之后的視頻可能會混亂,所以如果視頻比較少,可以借助格式工廠等軟件合并,如果較多,可以使用Python操作FFmpeg
2.4.限頻問題。因為很多網(wǎng)站都會限頻,也就是說,同時發(fā)起的請求個數(shù)不能超過一定值,否則服務器不會正常響應數(shù)據(jù),所以我們可能需要限制一下并發(fā)執(zhí)行的線程數(shù),使用Python自帶的BoundedSemaphore就行
2.5.顯示進度條。如果只是簡單的打印當前進度,感覺不夠美觀,我們可以借助tqdm等庫實現(xiàn)進度條的顯示
3.代碼實現(xiàn)
此腳本需要用到的第三方庫
pip install requests
pip install fake_useragent
pip install m3u8
pip install pycryptodome
pip install tqdm
參考代碼文章來源:http://www.zghlxwxcb.cn/news/detail-661386.html
import logging
import os.path
import sys
import time
from datetime import datetime
from threading import Thread, BoundedSemaphore
import requests
from Crypto.Cipher import AES
from fake_useragent import UserAgent
import m3u8
from tqdm import tqdm
# pip install requests
# pip install fake-useragent==0.1.11
# pip install m3u8
# pip install pycryptodome
# pip install tqdm
class M3U8Loader:
def __init__(self, uri, base_url, segments):
self.uri = uri
self.base_url = base_url
self.segments = segments
@classmethod
def load(cls, uri, base_url=None):
if uri.startswith("http"):
res = requests.get(uri, headers={"User-Agent": get_user_agent()})
if res.status_code != 200:
raise Exception(f"load u3u8 failed when download file, uri: {uri}")
text = res.text.encode().decode('unicode_escape')
segments = text.split("\n")
else:
with open(uri, encoding="utf-8") as f:
segments = f.read()
segments = segments.encode().decode('unicode_escape')
segments = segments.split("\n")
if not base_url:
for line in segments:
if line.startswith("http"):
base_url = os.path.split(line.split("?")[0])[0]
break
if not base_url and uri.startswith("http"):
base_url = str(uri).split("?")[0].rsplit("/", maxsplit=1)[0]
segments = [s.strip() for s in segments]
return M3U8Loader(uri, base_url, segments)
def decode_video(video_stream, key, iv):
if iv and iv and str(iv).startswith("0x") and int(iv, 16):
aes = AES.new(bytes(key, encoding='utf8'), AES.MODE_CBC, bytes(iv, encoding='utf8'))
else:
aes = AES.new(bytes(key, encoding='utf8'), AES.MODE_CBC, bytes(key, encoding='utf8'))
return aes.decrypt(video_stream)
def get_datetime_num():
return datetime.strftime(datetime.now(), "%Y%m%d%H%M%S")
def get_user_agent():
return UserAgent(path="./utils/fake_useragent_0.1.11.json").random
class M3U8Downloader:
def __init__(self, m3u8_url, base_url, save_dir, video_folder, headers, if_random_ug, merge_name, ffmpeg_path,
sp_count, if_tqdm):
self.tqdm = None
self.if_tqdm = if_tqdm
self.m3u8_url = m3u8_url
self.base_url = base_url if base_url else ""
self.to_download_url = list()
self.download_failed_dict = dict()
self.key_method = None
self.key_iv = None
self.key_str = None
self.current_file_path = os.path.dirname(os.path.abspath(__file__))
self.save_dir = save_dir if save_dir else os.path.join(self.current_file_path, "m3u8_download")
self.video_folder = video_folder if video_folder else get_datetime_num()
if not os.path.isabs(ffmpeg_path):
ffmpeg_path = os.path.join(self.current_file_path, ffmpeg_path)
self.headers = headers if isinstance(headers, dict) else dict()
self.if_random_ug = if_random_ug if isinstance(if_random_ug, bool) else True
self.ffmpeg_path = ffmpeg_path
self.merge_name = merge_name if merge_name else "merge.ts"
self.file_type = ".ts"
self.semaphore = BoundedSemaphore(sp_count) if sp_count else None
self.logger = self.get_logger()
self.normalize_m3u8_file(self.m3u8_url)
self.normalize_base_url()
self.logger.info(f"init info m3u8_url: {self.m3u8_url}")
self.logger.info(f"init info base_url: {self.base_url}")
self.logger.info(f"init info if_random_ug: {self.if_random_ug}")
self.logger.info(f"init info headers: {self.headers}")
self.logger.info(f"init info save_dir: {self.save_dir}")
self.logger.info(f"init info video_folder: {self.video_folder}")
self.logger.info(f"init info current_file_path: {self.current_file_path}")
self.logger.info(f"init info ffmpeg_path: {self.ffmpeg_path}")
self.logger.info(f"init info merge_name: {self.merge_name}")
def __del__(self):
if self.tqdm:
self.tqdm.close()
def get_headers(self):
headers = self.headers
if self.if_random_ug:
headers.update({"User-Agent": get_user_agent()})
return headers
def get_logger(self):
logger = logging.getLogger("M3U8Downloader")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s-%(filename)s-line:%(lineno)d-%(levelname)s-%(process)s: %(message)s")
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
if not os.path.exists(self.save_dir):
os.mkdir(self.save_dir)
file_handler = logging.FileHandler(os.path.join(self.save_dir, "m3u8_download.log"), encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
def get_m3u8_info(self):
m3u8_obj = m3u8.load(self.m3u8_url, timeout=10, headers=self.get_headers())
keys = m3u8_obj.keys
if keys and keys[-1]:
key_alg = keys[-1].method
if key_alg != "AES-128":
raise Exception(f"matched key but algorithm ({key_alg}) is not AES-128")
self.key_method = key_alg
self.key_iv = keys[-1].iv
self.get_key(self.normalize_url(keys[-1].absolute_uri))
self.to_download_url = [self.normalize_url(segment.uri) for segment in m3u8_obj.segments]
self.to_download_url = [d_url for d_url in self.to_download_url if d_url]
if not self.to_download_url:
loader_obj = M3U8Loader.load(self.m3u8_url, self.base_url)
self.to_download_url = [self.normalize_url(segment) for segment in loader_obj.segments]
self.to_download_url = [d_url for d_url in self.to_download_url if d_url]
self.logger.info(f"to_download_url: {len(self.to_download_url)} {self.to_download_url[:5]}, ...")
self.tqdm = tqdm(total=len(self.to_download_url), desc="download progress") if self.if_tqdm else None
if self.to_download_url:
self.file_type = os.path.splitext(self.to_download_url[0].split("?")[0])[1]
def get_key(self, key_url):
self.logger.info(f"key_url: {key_url}")
res = requests.get(key_url, headers=self.get_headers(), timeout=10)
self.key_str = res.text
if not self.key_str:
raise Exception("get key error, key: {}".format(self.key_str))
self.logger.info(f"get_key key_str: {self.key_str}")
def test_download(self, d_url):
self.logger.info(f"test download url: {d_url}")
try:
res = requests.get(d_url, timeout=30, headers=self.get_headers(), stream=True)
return True if res.status_code < 300 else False
except Exception as e:
self.logger.error(f"test_download meet error: {e}")
return False
def download_video(self, number, url):
if self.semaphore:
self.semaphore.acquire()
trt_times = 10
res_content = None
while trt_times > 0:
try:
res = requests.get(url, timeout=10, stream=True)
if res.status_code == 200:
res_content = res.content
break
except Exception as e:
self.logger.error(f"download failed, will try again: url:{url} ,error:{e}")
res_content = None
trt_times -= 1
time.sleep(1)
if res_content:
if self.key_str:
res_content = decode_video(res_content, self.key_str, self.key_iv)
path = os.path.join(self.save_dir, self.video_folder, "{0:0>8}".format(number) + str(self.file_type))
with open(path, "wb+") as f:
f.write(res_content)
# self.logger.info(f"download video {path} (total: {len(self.to_download_url)}) success, url: {url}")
if self.tqdm:
self.tqdm.update(1)
else:
self.logger.warning(f"download video failed, number:{number},url:{url}")
self.download_failed_dict.update({number: url})
if self.semaphore:
self.semaphore.release()
def merge_videos(self):
if os.name != "nt":
self.logger.warning(f"current system {os.name} is not Windows, can't merge.")
return
self.logger.info("start merge")
path = self.save_dir
if os.path.isabs(path):
path = self.save_dir + os.sep + self.video_folder
else:
path = self.current_file_path + os.sep + os.path.basename(self.save_dir) + os.sep + self.video_folder
if not os.path.exists(path):
self.logger.warning(f"merge_videos canceled, the path({path}) is not exist")
return
self.logger.info(f"ffmpeg path: {self.ffmpeg_path}")
all_ts_files = os.listdir(path)
all_ts_files = [ts for ts in all_ts_files if ts.startswith("0") and ts.endswith(self.file_type)]
if not all_ts_files:
self.logger.warning(f"there is no {self.file_type} file need to merge")
return
all_ts_files.sort(key=lambda x: x)
self.logger.info(f"ffmpeg path: {self.ffmpeg_path}")
if self.ffmpeg_path and os.path.exists(self.ffmpeg_path):
self.logger.info(f"use ffmpeg to merge")
with open(path + os.sep + "merge_file_list.txt", "w") as f:
for file in all_ts_files:
f.write("file " + "'" + path + os.sep + file + "'" + "\n")
cmd = "{} -f concat -safe 0 -i {} -c copy {}".format(
self.ffmpeg_path, path + os.sep + 'merge_file_list.txt', path + os.sep + self.merge_name)
self.logger.info(f"merge cmd: {cmd}")
res = os.system(cmd)
if res:
self.logger.error("merge failed")
else:
self.logger.info("merge success")
else:
self.logger.warning(f"ffmpeg not exist, will merge by python")
try:
with open(path + os.sep + self.merge_name, "wb+") as f:
for ts_file in all_ts_files:
with open(path + os.sep + ts_file, "rb+") as t:
f.write(t.read())
except Exception as e:
self.logger.error(f"merge failed: {e}")
else:
self.logger.info("merge success")
def mkdir(self):
if not os.path.exists(self.save_dir):
os.mkdir(self.save_dir)
self.logger.info(f"make save_dir({self.save_dir}) success.")
video_folder = os.path.join(self.save_dir, self.video_folder)
if not os.path.exists(video_folder):
os.mkdir(video_folder)
self.logger.info(f"make video_folder({video_folder}) success.")
def normalize_url(self, raw_url):
raw_url = raw_url.strip()
if raw_url.startswith("#"):
return
if raw_url and raw_url.startswith("http") and any(
[raw_url.split("?")[0].endswith(".ts"), raw_url.split("?")[0].endswith(".key")]):
return raw_url
if raw_url and not str(raw_url).startswith("http"):
last_find_str = ""
for i in range(1, len(raw_url) + 1):
start_str = raw_url[:i]
if self.base_url.rfind(start_str) == -1:
break
else:
last_find_str = start_str
sep = "" if self.base_url.endswith("/") or raw_url.startswith("/") else "/"
if len(last_find_str) > 2 and self.base_url.endswith(last_find_str):
raw_url = f"{self.base_url}{sep}{raw_url.replace(last_find_str, '')}"
else:
raw_url = f"{self.base_url}{sep}{raw_url}"
return raw_url
def normalize_m3u8_file(self, path):
if not os.path.exists(path):
return
with open(path, "r", encoding="utf-8") as f:
contents = f.read()
contents = contents.encode().decode('unicode_escape')
print()
with open(path, 'w', encoding='utf-8') as w:
w.write(contents.replace("'", "").replace('"', ''))
self.logger.info(f"normalize m3u8 file success, path: {path}")
return contents
def normalize_base_url(self):
if self.base_url and self.base_url.startswith('http'):
return
base_url = M3U8Loader.load(self.m3u8_url).base_url
if base_url:
self.base_url = base_url
else:
raise Exception("automatically identify base_url failed, please fill in manually")
def run(self):
start_time = time.time()
self.get_m3u8_info()
if not self.to_download_url:
self.logger.warning("there is no url to download, self.to_download_url is empty, please check url")
return
self.logger.info(f"self.key_str: {self.key_str}")
self.logger.info(f"self.key_method: {self.key_method}")
if not self.test_download(self.to_download_url[0]):
self.logger.warning(f"test download failed, pls check whether the url is valid ({self.to_download_url[0]})")
return
self.mkdir()
threads = [Thread(target=self.download_video, args=(idx, url)) for idx, url in enumerate(self.to_download_url)]
for t in threads:
t.start()
for t in threads:
t.join()
self.logger.info(f"all download finish, spent time: {time.time() - start_time:.2f} second")
self.logger.info(f"total video count: {len(self.to_download_url)}")
self.logger.info(f"download_failed_dict: {self.download_failed_dict}")
if self.download_failed_dict:
self.logger.warning(f"{len(self.download_failed_dict)} video file download failed.")
raise Exception(f"{len(self.download_failed_dict)} video file download failed.")
if self.ffmpeg_path:
self.merge_videos()
if self.tqdm:
self.tqdm.close()
if __name__ == '__main__':
url = "https://test/index.m3u8"
if len(sys.argv) > 1 and str(sys.argv[1]).startswith("http"):
url = sys.argv[1]
if not url:
raise Exception("missing download url")
params_dict = {
"m3u8_url": url,
"base_url": "",
"save_dir": "",
"video_folder": "",
"headers": {
# "Host": "",
# "Cookie": "",
# "Referer": "",
# "User-Agent": "",
},
"if_random_ug": True,
"ffmpeg_path": "./utils/ffmpeg.exe",
"merge_name": "",
"sp_count": 2,
"if_tqdm": True,
}
# if os.path.isfile(params_dict["m3u8_url"]) and not params_dict["base_url"]:
# raise Exception("the m3u8 file is a local file but miss base_url")
downloader = M3U8Downloader(**params_dict)
downloader.run()
該代碼已上傳
GitHub鏈接:https://github.com/panmeibing/python_downloader文章來源地址http://www.zghlxwxcb.cn/news/detail-661386.html
到了這里,關(guān)于【python】多線程下載m3u8分段視頻的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!