python-douyin-vod

参考这里
1、打开抖音app,搜索想要爬取视频的账号
2、找到主页右上角三个点
3、点击其中的分享主页
4、选择其中的复制链接,此时复制出来的是类似于如下的内容:
长按复制此条消息,打开抖音搜索,查看TA的更多作品。https://v.douyin.com/YNyUYT5

以下分析得到的短链接: https://v.douyin.com/YNyUYT5
在PC浏览器内F12,选择手机模式,访问短链接,发现短链接被还原为原始链接。
暂时我们不知道需要下载视频需要的是什么参数,后期通过分析知道只需要sec_uid这个参数即可。

点击作品和喜欢,可以抓包到请求的链接,这个我们稍后分析
因为现在抖音app有隐藏自己收藏的功能,所以如果用户设置了隐藏,你将无法下载用户收藏的视频哦。

分析请求链接

通过抓包我们知道,用户上传的数据链接为
https://www.iesdouyin.com/web/api/v2/aweme/post/?sec_uid=MS4wLjABAAAAYSkDWl4l-zUBPSEtsuxKHbgy5uIcwokC-OpwJrzOPmU&count=21&max_cursor=0&aid=1128&_signature=Bth1lgAAWa-w6IbGQlE-1gbYdY&dytk=

用户收藏的数据链接为
https://www.iesdouyin.com/web/api/v2/aweme/like/?sec_uid=MS4wLjABAAAAYSkDWl4l-zUBPSEtsuxKHbgy5uIcwokC-OpwJrzOPmU&count=21&max_cursor=0&aid=1128&_signature=Bth1lgAAWa-w6IbGQlE-1gbYdY&dytk=

请求结果如下,因抖音的检测机制,你需要刷很多次才有数据,这个我们通过循环判断aweme_list中是否有数据即可


分析返回的数据

看整体数据,将数组折叠先:
status_code: 不用说,肯定是返回状态
aweme_list: 存放视频信息的数组,等会具体分析
max_cursor/min_cursor: 这个盲猜都知道用来分页的指针,如果是多页,且请求的不是第一页,需要传其中的某个值,这个暂时不讨论
has_more: 是否有多页
extra: 额外的信息,当前请求的毫秒级时间戳,以及logid,这个不重要,抖音那边用来日志记录的

分析视频信息
可以看出每个视频有两个链接,自己访问一下就知道,一个链接是用户上传的原视频,另一个是抖音那边加了水印的视频
其实到这里大家都知道无水印视频如何下载了,不必赘述了


如何下载?
其实通过分析,抖音下载只需要拿到sec_uid这个参数即可
浏览器打开分享的短连接,就能看到地址栏这个参数

# -*- coding:utf-8 -*-
'''

@author: user
'''
import os, sys, requests, random
import json, re, time
from retrying import retry
from contextlib import closing
import logging
from importlib import reload

reload(sys) 
# sys.setdefaultencoding('utf8')

PY_GEN_PATH = "D:/data/priv".replace('/', os.sep)
PY_GEN_DIR = PY_GEN_PATH + "/dy"
logger = logging.getLogger('dy')
LOG_FILE = 'dy.log'
LOG_FORMATTER = '%(message)s'


def config_logger():
    logger.setLevel(logging.DEBUG)
    if not os.path.exists(PY_GEN_PATH):
        logger.info("文件夹不存在,已自行创建")
        os.makedirs(PY_GEN_PATH, 777)
    handler = logging.FileHandler(os.path.join(PY_GEN_PATH, LOG_FILE))
    handler.setLevel(logging.DEBUG)
    fmter = logging.Formatter(LOG_FORMATTER)
    handler.setFormatter(fmter)
    logger.addHandler(handler)


class Douyin:

    def __init__(self):
        
        UA_LST = ['Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
                  'Mozilla/5.0 (Linux; Android 8.0.0; SM-G955U Build/R16NW) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Mobile Safari/537.36',
                  'Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.91 Mobile Safari/537.36',
                  ]
        ACCEPT = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9"
        ACCEPT_ENCODING = "gzip, deflate, br" #br这个务必要去掉
        ACCEPT_LANGUAGE = "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ru;q=0.6,ja;q=0.5,ko;q=0.4"

        header = {
                  "User-Agent":UA_LST[random.randrange(0, len(UA_LST))],
                  "Accept":ACCEPT,
                  "Accept-Encoding":ACCEPT_ENCODING,
                  "Accept-Language":ACCEPT_LANGUAGE,
                  "upgrade-insecure-requests":"1",
                  "pragma":"no-cache",
                  "cache-control":"no-cache"
        }
        
        self.headers = header
    
    def hello(self):
        print("*"*50)
        print(" "*15 + " 抖音下载小助手")
        print(" "*15 + " 无水印 | 有水印")
        print(" "*12 + " 输入用户的sec_uid即可")
        print(" "*2 + " 用浏览器打开用户的分享链接,复制参数中的sec_uid")
        print("*"*50)
        return self
    
    @retry(stop_max_attempt_number=5)
    def get_requests(self, url, params=None):
        if params is None:
            params = {}
        response = requests.get(url, params=params, headers=self.headers, timeout=10)
        assert response.status_code == 200
        return response
    
    @retry(stop_max_attempt_number=5)
    def post_requests(self, url, data=None):
        if data is None:
            data = {}
        response = requests.post(url, data=data, headers=self.headers, timeout=10)
        assert response.status_code == 200
        return response
    
    # 获得用户的vod链接
    # type_flag:视频类型
    # 返回nickname和video_list
    def get_video_urls(self, sec_uid, type_flag='p'):
        user_url_prefix = 'https://www.iesdouyin.com/web/api/v2/aweme/post' if type_flag == 'p' else 'https://www.iesdouyin.com/web/api/v2/aweme/like'
        logger.info('--解析视频链接中--\r')
        i = 0
        result = []
        while result == []:
            i = i + 1
            logger.info("--正在第{}次尝试--\r".format(str(i)))
            user_url = user_url_prefix + "/?sec_uid=%s&count=2000" % (sec_uid)#一次去取2000个,有点儿过分了。
            response = self.get_requests(user_url)
            html = json.loads(response.content.decode())
            if html['aweme_list'] != []:
                result = html['aweme_list']
        
        nickname = None
        video_list = []
        
        for item in result:
            if nickname is None:
                nickname = item['author']['nickname'] if re.sub(r'[\/:*?"<>|]', '', item['author']['nickname']) else None
            
            video_list.append({
                'desc':re.sub(r'[\/:*?"<>|]', '', item['desc']) if item['desc'] else '无标题' + str(int(time.time())),
                'url':item['video']['play_addr']['url_list'][0]
                }
            )
            
        return nickname , video_list
    
    def get_download_url(self, video_url, water_flag):
        if water_flag == True:
            download_url = video_url.replace('api.amemv.com', 'aweme.snssdk.com')
        else:
            download_url = video_url.replace('aweme.snssdk.com', 'api.amemv.com')
        return download_url
    
    def video_downloader(self, video_url, video_name, water_flag=False):
        size = 0
        video_url = self.get_download_url(video_url, water_flag)
        with closing(requests.get(video_url, headers=self.headers, stream=True)) as response:
            chunk_size = 1024
            content_size = int(response.headers['content-length'])
            if response.status_code == 200:
                logger.info('----[文件大小]:%0.2f MB' % (content_size / chunk_size / 1024))
                with open(video_name + ".mp4", 'wb') as file:
                    for data in response.iter_content(chunk_size=chunk_size):
                        file.write(data)
                        size += len(data)
                        file.flush()
                        logger.info('----[下载进度]:%0.2f%%' % float(size / content_size * 100) + '\r')
                        # sys.stdout.flush()
    
    def run(self):
        default_sec_uid = 'MS4wLjABAAAAH-=' #请替换
        sec_uid = input('请输入用户的sec_uid:')
        sec_uid = sec_uid if sec_uid else default_sec_uid
        
        water_flag = input('是否下载带有水印的视频(0-否(默认) ,1-是):')
        water_flag = bool(int(water_flag)) if water_flag else 0
        
        type_flag = input('p-上传的 (默认),l-收藏的')
        type_flag = type_flag if type_flag else 'p'
        
        save_dir = input('保存位置')
        save_dir = save_dir if save_dir else PY_GEN_DIR
        
        nickname , video_list = self.get_video_urls(sec_uid, type_flag)
        nickname_dir = os.path.join(save_dir, nickname)
        
        if not os.path.exists(nickname_dir):
            os.makedirs(nickname_dir)
        if type_flag == 'f':
            if 'favorite' not in os.listdir(nickname_dir):
                os.mkdir(os.path.join(nickname_dir, 'favorite'))
        logger.info('----视频下载中:共有{}个作品---'.format(str(len(video_list))))
        
        for num in range(len(video_list)):
            vod_desc = video_list[num]['desc'].replace("\r", '').replace("\n", '').replace('\r\n', '')
            logger.info("---正在解析第%d个视频链接[%s] , 请稍后---" % (num + 1, vod_desc))
            
            video_path = os.path.join(nickname_dir, vod_desc) if type_flag != 'f' else os.path.join(nickname_dir, 'favorite', vod_desc)
            
            if os.path.isfile(video_path):
                logger.info("---视频已存在---")
            else:
                self.video_downloader(video_list[num]['url'], video_path, water_flag)
            logger.info('\n')
        logger.info("下载完成")


if __name__ == '__main__':
    config_logger()
    Douyin().hello().run()


其实应该可以循环往下获取vod_url的,但是上面那个仅仅获取了一部分vod而已,并不是全部

tips:
2022-08-31
今天发现get_video_urls报错了,response.content.decode无法decode,仔细看了下response.headers['Content-Encoding'],发现已经变成br了,
这就需要安装一个包brotli了,看这里。但之前运行的时候,都是ok的啊,怎么突然爆出个br。
又仔细看了下request头部,发现我的accept-encoding写的居然是:gzip, deflate,br.

这不是给自己挖坑么?都写了支持br了,人家给你返回一个conten-encoding=br也没错啊,现在把br去掉,一切就正常了。