with aiofiles.open(m3u8_encrypt_file, mode='rb') as f: f = f.read() content_video_part = AES.new(key, AES.MODE_CBC, iv).decrypt(f) with aiofiles.open(dest_decrypt_file, mode='wb') as f: f.write(content_video_part)
asyncdefdecrypt_m3u8_all(): ifnot os.path.exists(class_video_name + '/decryption'): os.makedirs(class_video_name + '/decryption') key = await requests.get(playlist.keys[0].uri) key = key.content iv = bytes(playlist.keys[0].iv, 'UTF-8')[:16] decrypt_m3u8_list = [asyncio.create_task(decrypt_m3u8_video(f'{class_video_name}/downloads/{uri}', key, iv)) for uri in os.listdir(f'{class_video_name}/downloads') if uri[0] != '.'] # 忽略隐藏文件 await asyncio.wait(decrypt_m3u8_list) print(f'[{class_video_name}]——视频全部解密完成')
defmerge_m3u8_all(): download_decrypt_list = [uri for uri in os.listdir(f'{class_video_name}/decryption') if uri[0] != '.'] download_encrypt_list = [uri for uri in os.listdir(f'{class_video_name}/downloads') if uri[0] != '.'] iflen(download_decrypt_list) != len(download_encrypt_list): # 判断是否有漏下的分段视频没有下载 print('解密分段视频出现问题,可能是受限于类Unix系统文件句柄数量限制导致脚本不能获取足够的文件句柄。\n ' '如果你是 Linux 或 Macos 请尝试在运行本脚本的终端内执行 "ulimit -n 5120" 命令,以解除255(Macos)/1024(Linux)数量限制') return withopen(f'{class_video_name}/{class_video_name}.mp4', 'ab') as final_file: print(f'[{class_video_name}]——开始拼接解密后的分段视频') temp_file_uri_list = os.listdir(f'{class_video_name}/decryption') temp_file_uri_list.sort(key=lambda x: int(x[:-6])) for uri in temp_file_uri_list: if uri[0] == '.': continue# 忽略隐藏文件 withopen(f'{class_video_name}/decryption/{uri}', 'rb') as temp_file: final_file.write(temp_file.read()) # 将ts格式分段视频追加到完整视频文件中 print(f'[{class_video_name}]——合成视频成功')
if __name__ == '__main__': playlist = m3u8.load(m3u8_file_uri, verify_ssl=False) del playlist.files[0] # 第一个文件为视频密钥,忽略这个文件。 asyncio.run(download_m3u8_all()) asyncio.run(decrypt_m3u8_all()) merge_m3u8_all() print(f'[{class_video_name}]——视频文件:{os.getcwd()}/{class_video_name}/{class_video_name}.mp4')