从M3_LIST中读取m3u8列表,以"|"分隔,前面是m3u8地址,后面是文件名
import requests
import os
from urllib.parse import urljoin
M3_LIST=[
"https://xx.xyz/index.m3u8|mp4name"
]
def download_ts_files(m3u8_url, output_dir='ts_files'):
# 下载 m3u8 文件并解析 ts 列表
response = requests.get(m3u8_url)
if response.status_code != 200:
raise Exception(f"Failed to download m3u8 file: {response.status_code}")
# 创建保存 ts 文件的目录
os.makedirs(output_dir, exist_ok=True)
ts_urls = []
base_url = m3u8_url.rsplit('/', 1)[0] + '/' # 获取基础 URL
for line in response.text.split('\n'):
line = line.strip()
if line and not line.startswith('#') and line.endswith('.ts'):
# 处理相对路径和绝对路径
if line.startswith('http'):
ts_url = line
else:
ts_url = urljoin(base_url, line)
ts_urls.append(ts_url)
# 下载所有 ts 文件
ts_files = []
for i, ts_url in enumerate(ts_urls):
ts_path = os.path.join(output_dir, f'segment_{i}.ts')
print(f'Downloading {ts_url}...')
with requests.get(ts_url, stream=True) as r:
r.raise_for_status()
with open(ts_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
ts_files.append(ts_path)
return ts_files
def merge_ts_files(ts_files, output_file='output.mp4'):
# 使用二进制追加方式合并 ts 文件
with open(output_file, 'wb') as merged:
for ts_file in ts_files:
with open(ts_file, 'rb') as f:
merged.write(f.read())
print(f'Merged {ts_file}')
print(f'Successfully merged to {output_file}')
def clean_up(ts_files):
# 清理临时 ts 文件
for ts_file in ts_files:
try:
os.remove(ts_file)
except OSError:
pass
if __name__ == '__main__':
#m3u8_url = input("Enter the m3u8 URL: ").strip()
for m3u8url in M3_LIST:
m3u8_url=m3u8url.split("|")[0]
filename=m3u8url.split("|")[1]
print(m3u8_url)
print(filename)
try:
# 1. 下载所有 ts 文件
ts_files = download_ts_files(m3u8_url)
# 2. 合并 ts 文件
output_file = '{}.pmp'.format(filename)
merge_ts_files(ts_files, output_file)
# 3. 清理临时文件 (可选)
clean_up(ts_files)
print(f"视频已成功保存为 {output_file}")
except Exception as e:
print(f"发生错误: {str(e)}")