参考这里
1、打开抖音app,搜索想要爬取视频的账号
2、找到主页右上角三个点
3、点击其中的分享主页
4、选择其中的复制链接,此时复制出来的是类似于如下的内容:
长按复制此条消息,打开抖音搜索,查看TA的更多作品。https://v.douyin.com/YNyUYT5
以下分析得到的短链接: https://v.douyin.com/YNyUYT5
在PC浏览器内F12,选择手机模式,访问短链接,发现短链接被还原为原始链接。
暂时我们不知道需要下载视频需要的是什么参数,后期通过分析知道只需要sec_uid这个参数即可。
点击作品和喜欢,可以抓包到请求的链接,这个我们稍后分析
因为现在抖音app有隐藏自己收藏的功能,所以如果用户设置了隐藏,你将无法下载用户收藏的视频哦。
分析请求链接
通过抓包我们知道,用户上传的数据链接为:
https://www.iesdouyin.com/web/api/v2/aweme/post/?sec_uid=MS4wLjABAAAAYSkDWl4l-zUBPSEtsuxKHbgy5uIcwokC-OpwJrzOPmU&count=21&max_cursor=0&aid=1128&_signature=Bth1lgAAWa-w6IbGQlE-1gbYdY&dytk=
用户收藏的数据链接为:
https://www.iesdouyin.com/web/api/v2/aweme/like/?sec_uid=MS4wLjABAAAAYSkDWl4l-zUBPSEtsuxKHbgy5uIcwokC-OpwJrzOPmU&count=21&max_cursor=0&aid=1128&_signature=Bth1lgAAWa-w6IbGQlE-1gbYdY&dytk=
请求结果如下,因抖音的检测机制,你需要刷很多次才有数据,这个我们通过循环判断aweme_list中是否有数据即可
分析返回的数据
看整体数据,将数组折叠先:
status_code: 不用说,肯定是返回状态
aweme_list: 存放视频信息的数组,等会具体分析
max_cursor/min_cursor: 这个盲猜都知道用来分页的指针,如果是多页,且请求的不是第一页,需要传其中的某个值,这个暂时不讨论
has_more: 是否有多页
extra: 额外的信息,当前请求的毫秒级时间戳,以及logid,这个不重要,抖音那边用来日志记录的
分析视频信息:
可以看出每个视频有两个链接,自己访问一下就知道,一个链接是用户上传的原视频,另一个是抖音那边加了水印的视频
其实到这里大家都知道无水印视频如何下载了,不必赘述了
如何下载?
其实通过分析,抖音下载只需要拿到sec_uid这个参数即可
浏览器打开分享的短连接,就能看到地址栏这个参数
# -*- coding:utf-8 -*-
'''
@author: user
'''
import os, sys, requests, random
import json, re, time
from retrying import retry
from contextlib import closing
import logging
from importlib import reload
reload(sys)
# sys.setdefaultencoding('utf8')
PY_GEN_PATH = "D:/data/priv".replace('/', os.sep)
PY_GEN_DIR = PY_GEN_PATH + "/dy"
logger = logging.getLogger('dy')
LOG_FILE = 'dy.log'
LOG_FORMATTER = '%(message)s'
def config_logger():
logger.setLevel(logging.DEBUG)
if not os.path.exists(PY_GEN_PATH):
logger.info("文件夹不存在,已自行创建")
os.makedirs(PY_GEN_PATH, 777)
handler = logging.FileHandler(os.path.join(PY_GEN_PATH, LOG_FILE))
handler.setLevel(logging.DEBUG)
fmter = logging.Formatter(LOG_FORMATTER)
handler.setFormatter(fmter)
logger.addHandler(handler)
class Douyin:
def __init__(self):
UA_LST = ['Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Linux; Android 8.0.0; SM-G955U Build/R16NW) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Mobile Safari/537.36',
'Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.91 Mobile Safari/537.36',
]
ACCEPT = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9"
ACCEPT_ENCODING = "gzip, deflate, br" #br这个务必要去掉
ACCEPT_LANGUAGE = "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ru;q=0.6,ja;q=0.5,ko;q=0.4"
header = {
"User-Agent":UA_LST[random.randrange(0, len(UA_LST))],
"Accept":ACCEPT,
"Accept-Encoding":ACCEPT_ENCODING,
"Accept-Language":ACCEPT_LANGUAGE,
"upgrade-insecure-requests":"1",
"pragma":"no-cache",
"cache-control":"no-cache"
}
self.headers = header
def hello(self):
print("*"*50)
print(" "*15 + " 抖音下载小助手")
print(" "*15 + " 无水印 | 有水印")
print(" "*12 + " 输入用户的sec_uid即可")
print(" "*2 + " 用浏览器打开用户的分享链接,复制参数中的sec_uid")
print("*"*50)
return self
@retry(stop_max_attempt_number=5)
def get_requests(self, url, params=None):
if params is None:
params = {}
response = requests.get(url, params=params, headers=self.headers, timeout=10)
assert response.status_code == 200
return response
@retry(stop_max_attempt_number=5)
def post_requests(self, url, data=None):
if data is None:
data = {}
response = requests.post(url, data=data, headers=self.headers, timeout=10)
assert response.status_code == 200
return response
# 获得用户的vod链接
# type_flag:视频类型
# 返回nickname和video_list
def get_video_urls(self, sec_uid, type_flag='p'):
user_url_prefix = 'https://www.iesdouyin.com/web/api/v2/aweme/post' if type_flag == 'p' else 'https://www.iesdouyin.com/web/api/v2/aweme/like'
logger.info('--解析视频链接中--\r')
i = 0
result = []
while result == []:
i = i + 1
logger.info("--正在第{}次尝试--\r".format(str(i)))
user_url = user_url_prefix + "/?sec_uid=%s&count=2000" % (sec_uid)#一次去取2000个,有点儿过分了。
response = self.get_requests(user_url)
html = json.loads(response.content.decode())
if html['aweme_list'] != []:
result = html['aweme_list']
nickname = None
video_list = []
for item in result:
if nickname is None:
nickname = item['author']['nickname'] if re.sub(r'[\/:*?"<>|]', '', item['author']['nickname']) else None
video_list.append({
'desc':re.sub(r'[\/:*?"<>|]', '', item['desc']) if item['desc'] else '无标题' + str(int(time.time())),
'url':item['video']['play_addr']['url_list'][0]
}
)
return nickname , video_list
def get_download_url(self, video_url, water_flag):
if water_flag == True:
download_url = video_url.replace('api.amemv.com', 'aweme.snssdk.com')
else:
download_url = video_url.replace('aweme.snssdk.com', 'api.amemv.com')
return download_url
def video_downloader(self, video_url, video_name, water_flag=False):
size = 0
video_url = self.get_download_url(video_url, water_flag)
with closing(requests.get(video_url, headers=self.headers, stream=True)) as response:
chunk_size = 1024
content_size = int(response.headers['content-length'])
if response.status_code == 200:
logger.info('----[文件大小]:%0.2f MB' % (content_size / chunk_size / 1024))
with open(video_name + ".mp4", 'wb') as file:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
size += len(data)
file.flush()
logger.info('----[下载进度]:%0.2f%%' % float(size / content_size * 100) + '\r')
# sys.stdout.flush()
def run(self):
default_sec_uid = 'MS4wLjABAAAAH-=' #请替换
sec_uid = input('请输入用户的sec_uid:')
sec_uid = sec_uid if sec_uid else default_sec_uid
water_flag = input('是否下载带有水印的视频(0-否(默认) ,1-是):')
water_flag = bool(int(water_flag)) if water_flag else 0
type_flag = input('p-上传的 (默认),l-收藏的')
type_flag = type_flag if type_flag else 'p'
save_dir = input('保存位置')
save_dir = save_dir if save_dir else PY_GEN_DIR
nickname , video_list = self.get_video_urls(sec_uid, type_flag)
nickname_dir = os.path.join(save_dir, nickname)
if not os.path.exists(nickname_dir):
os.makedirs(nickname_dir)
if type_flag == 'f':
if 'favorite' not in os.listdir(nickname_dir):
os.mkdir(os.path.join(nickname_dir, 'favorite'))
logger.info('----视频下载中:共有{}个作品---'.format(str(len(video_list))))
for num in range(len(video_list)):
vod_desc = video_list[num]['desc'].replace("\r", '').replace("\n", '').replace('\r\n', '')
logger.info("---正在解析第%d个视频链接[%s] , 请稍后---" % (num + 1, vod_desc))
video_path = os.path.join(nickname_dir, vod_desc) if type_flag != 'f' else os.path.join(nickname_dir, 'favorite', vod_desc)
if os.path.isfile(video_path):
logger.info("---视频已存在---")
else:
self.video_downloader(video_list[num]['url'], video_path, water_flag)
logger.info('\n')
logger.info("下载完成")
if __name__ == '__main__':
config_logger()
Douyin().hello().run()
其实应该可以循环往下获取vod_url的,但是上面那个仅仅获取了一部分vod而已,并不是全部
tips:
2022-08-31
今天发现get_video_urls报错了,response.content.decode无法decode,仔细看了下response.headers['Content-Encoding'],发现已经变成br了,
这就需要安装一个包brotli了,看这里。但之前运行的时候,都是ok的啊,怎么突然爆出个br。
又仔细看了下request头部,发现我的accept-encoding写的居然是:gzip, deflate,br.
这不是给自己挖坑么?都写了支持br了,人家给你返回一个conten-encoding=br也没错啊,现在把br去掉,一切就正常了。