From 16974726a4716da850036b9ae12104534050a8a8 Mon Sep 17 00:00:00 2001 From: Mozi <29089388+pzhlkj6612@users.noreply.github.com> Date: Sun, 10 Mar 2024 02:17:57 +0000 Subject: [PATCH] [ie/niconico] Directly download live timeshift videos; WebSocket fixes Major changes: - Make a downloader for live timeshift videos. Time-based download rate limit applies. RetryManager-based error recovery applies. - Fix the incorrect url for WebSocket reconnection. - Correctly close the WebSocket connection. - [!] Apply "FFmpegFixupM3u8PP" for both non-timeshift and timeshift MPEG-TS files by adding "m3u8_*" prefixes and inheriting from "HlsFD". - [!] Change the protocol from "hls+fmp4" to "hls" in "startWatching" WebSocket requests because I didn't see it in my test. Minor changes: - Support metadata extraction when no formats. - Set "live_status" instead of "is_live". - Clean up "info_dict": Change WebSocket configs to private to hide them from users; extract common fields and remove unused ones. - Update a download test. --- yt_dlp/downloader/__init__.py | 5 +- yt_dlp/downloader/niconico.py | 202 +++++++++++++++++++++++++++------- yt_dlp/extractor/niconico.py | 101 ++++++++++------- 3 files changed, 229 insertions(+), 79 deletions(-) diff --git a/yt_dlp/downloader/__init__.py b/yt_dlp/downloader/__init__.py index 51a9f28f0..444c34857 100644 --- a/yt_dlp/downloader/__init__.py +++ b/yt_dlp/downloader/__init__.py @@ -30,7 +30,7 @@ from .hls import HlsFD from .http import HttpFD from .ism import IsmFD from .mhtml import MhtmlFD -from .niconico import NiconicoDmcFD, NiconicoLiveFD +from .niconico import NiconicoDmcFD, NiconicoLiveFD, NiconicoLiveTimeshiftFD from .rtmp import RtmpFD from .rtsp import RtspFD from .websocket import WebSocketFragmentFD @@ -50,7 +50,8 @@ PROTOCOL_MAP = { 'ism': IsmFD, 'mhtml': MhtmlFD, 'niconico_dmc': NiconicoDmcFD, - 'niconico_live': NiconicoLiveFD, + 'm3u8_niconico_live': NiconicoLiveFD, + 'm3u8_niconico_live_timeshift': NiconicoLiveTimeshiftFD, 'fc2_live': FC2LiveFD, 'websocket_frag': WebSocketFragmentFD, 'youtube_live_chat': YoutubeLiveChatFD, diff --git a/yt_dlp/downloader/niconico.py b/yt_dlp/downloader/niconico.py index fef8bff73..4c51bb166 100644 --- a/yt_dlp/downloader/niconico.py +++ b/yt_dlp/downloader/niconico.py @@ -1,12 +1,23 @@ +import contextlib import json +import math import threading import time from . import get_suitable_downloader from .common import FileDownloader from .external import FFmpegFD +from ..downloader.hls import HlsFD from ..networking import Request -from ..utils import DownloadError, str_or_none, try_get +from ..networking.exceptions import RequestError +from ..utils import ( + DownloadError, + RetryManager, + str_or_none, + traverse_obj, + try_get, + urljoin, +) class NiconicoDmcFD(FileDownloader): @@ -56,34 +67,33 @@ class NiconicoDmcFD(FileDownloader): return success -class NiconicoLiveFD(FileDownloader): - """ Downloads niconico live without being stopped """ +class NiconicoLiveBaseFD(FileDownloader): + _WEBSOCKET_RECONNECT_DELAY = 10 - def real_download(self, filename, info_dict): - video_id = info_dict['video_id'] - ws_url = info_dict['url'] - ws_extractor = info_dict['ws'] - ws_origin_host = info_dict['origin'] - live_quality = info_dict.get('live_quality', 'high') - live_latency = info_dict.get('live_latency', 'high') - dl = FFmpegFD(self.ydl, self.params or {}) + @contextlib.contextmanager + def _ws_context(self, info_dict): + """ Hold a WebSocket object and release it when leaving """ - new_info_dict = info_dict.copy() - new_info_dict.update({ - 'protocol': 'm3u8', - }) + video_id = info_dict['id'] + live_latency = info_dict['live_latency'] + self.ws = info_dict['__ws'] + + self.m3u8_lock = threading.Event() + self.m3u8_url = info_dict['manifest_url'] + self.m3u8_lock.set() def communicate_ws(reconnect): if reconnect: - ws = self.ydl.urlopen(Request(ws_url, headers={'Origin': f'https://{ws_origin_host}'})) + self.ws = self.ydl.urlopen(Request( + self.ws.url, headers={'Origin': self.ws.wsw.request.headers['Origin']})) if self.ydl.params.get('verbose', False): self.to_screen('[debug] Sending startWatching request') - ws.send(json.dumps({ + self.ws.send(json.dumps({ 'type': 'startWatching', 'data': { 'stream': { - 'quality': live_quality, - 'protocol': 'hls+fmp4', + 'quality': 'abr', + 'protocol': 'hls', 'latency': live_latency, 'chasePlay': False }, @@ -94,11 +104,9 @@ class NiconicoLiveFD(FileDownloader): 'reconnect': True, } })) - else: - ws = ws_extractor - with ws: + with self.ws: while True: - recv = ws.recv() + recv = self.ws.recv() if not recv: continue data = json.loads(recv) @@ -106,35 +114,155 @@ class NiconicoLiveFD(FileDownloader): continue if data.get('type') == 'ping': # pong back - ws.send(r'{"type":"pong"}') - ws.send(r'{"type":"keepSeat"}') + self.ws.send(r'{"type":"pong"}') + self.ws.send(r'{"type":"keepSeat"}') + elif data.get('type') == 'stream': + self.m3u8_url = data['data']['uri'] + self.m3u8_lock.set() elif data.get('type') == 'disconnect': self.write_debug(data) - return True + return elif data.get('type') == 'error': self.write_debug(data) message = try_get(data, lambda x: x['body']['code'], str) or recv - return DownloadError(message) + raise DownloadError(message) elif self.ydl.params.get('verbose', False): if len(recv) > 100: recv = recv[:100] + '...' self.to_screen('[debug] Server said: %s' % recv) + stopped = threading.Event() + def ws_main(): reconnect = False - while True: + while not stopped.is_set(): try: - ret = communicate_ws(reconnect) - if ret is True: - return - except BaseException as e: - self.to_screen('[%s] %s: Connection error occured, reconnecting after 10 seconds: %s' % ('niconico:live', video_id, str_or_none(e))) - time.sleep(10) - continue - finally: + communicate_ws(reconnect) + break # Disconnected + except BaseException as e: # Including TransportError + if stopped.is_set(): + break + + self.m3u8_lock.clear() # m3u8 url may be changed + + self.to_screen('[%s] %s: Connection error occured, reconnecting after %d seconds: %s' % ('niconico:live', video_id, self._WEBSOCKET_RECONNECT_DELAY, str_or_none(e))) + time.sleep(self._WEBSOCKET_RECONNECT_DELAY) + reconnect = True + self.m3u8_lock.set() # Release possible locks + thread = threading.Thread(target=ws_main, daemon=True) thread.start() - return dl.download(filename, new_info_dict) + try: + yield self + finally: + stopped.set() + self.ws.close() + thread.join() + + def _master_m3u8_url(self): + """ Get the refreshed manifest url after WebSocket reconnection to prevent HTTP 403 """ + + self.m3u8_lock.wait() + return self.m3u8_url + + +class NiconicoLiveFD(NiconicoLiveBaseFD): + """ Downloads niconico live without being stopped """ + + def real_download(self, filename, info_dict): + with self._ws_context(info_dict): + new_info_dict = info_dict.copy() + new_info_dict.update({ + 'protocol': 'm3u8', + }) + + return FFmpegFD(self.ydl, self.params or {}).download(filename, new_info_dict) + + +class NiconicoLiveTimeshiftFD(NiconicoLiveBaseFD, HlsFD): + """ Downloads niconico live timeshift VOD """ + + _PER_FRAGMENT_DOWNLOAD_RATIO = 0.1 + + def real_download(self, filename, info_dict): + with self._ws_context(info_dict) as ws_context: + from ..extractor.niconico import NiconicoIE + ie = NiconicoIE(self.ydl) + + video_id = info_dict['id'] + + # Get format index + for format_index, fmt in enumerate(info_dict['formats']): + if fmt['format_id'] == info_dict['format_id']: + break + + # Get video info + total_duration = 0 + fragment_duration = 0 + for line in ie._download_webpage(info_dict['url'], video_id, note='Downloading m3u8').splitlines(): + if '#STREAM-DURATION' in line: + total_duration = int(float(line.split(':')[1])) + if '#EXT-X-TARGETDURATION' in line: + fragment_duration = int(line.split(':')[1]) + if not all({total_duration, fragment_duration}): + raise DownloadError('Unable to get required video info') + + ctx = { + 'filename': filename, + 'total_frags': math.ceil(total_duration / fragment_duration), + } + + self._prepare_and_start_frag_download(ctx, info_dict) + + downloaded_duration = ctx['fragment_index'] * fragment_duration + while True: + if downloaded_duration > total_duration: + break + + retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry) + for retry in retry_manager: + try: + # Refresh master m3u8 (if possible) and get the url of the previously-chose format + master_m3u8_url = ws_context._master_m3u8_url() + formats = ie._extract_m3u8_formats( + master_m3u8_url, video_id, query={"start": downloaded_duration}, live=False, note=False, fatal=False) + media_m3u8_url = traverse_obj(formats, (format_index, {dict}, 'url'), get_all=False) + if not media_m3u8_url: + raise DownloadError('Unable to get playlist') + + # Get all fragments + media_m3u8 = ie._download_webpage(media_m3u8_url, video_id, note=False) + fragment_urls = traverse_obj(media_m3u8.splitlines(), ( + lambda _, v: not v.startswith('#'), {lambda url: urljoin(media_m3u8_url, url)})) + + with self.DurationLimiter(len(fragment_urls) * fragment_duration * self._PER_FRAGMENT_DOWNLOAD_RATIO): + for fragment_url in fragment_urls: + success = self._download_fragment(ctx, fragment_url, info_dict) + if not success: + return False + self._append_fragment(ctx, self._read_fragment(ctx)) + downloaded_duration += fragment_duration + + except (DownloadError, RequestError) as err: # Including HTTPError and TransportError + retry.error = err + continue + + if retry_manager.error: + return False + + return self._finish_frag_download(ctx, info_dict) + + class DurationLimiter(): + def __init__(self, target): + self.target = target + + def __enter__(self): + self.start = time.time() + + def __exit__(self, *exc): + remaining = self.target - (time.time() - self.start) + if remaining > 0: + time.sleep(remaining) diff --git a/yt_dlp/extractor/niconico.py b/yt_dlp/extractor/niconico.py index 6a4624602..5fa84a34b 100644 --- a/yt_dlp/extractor/niconico.py +++ b/yt_dlp/extractor/niconico.py @@ -919,17 +919,30 @@ class NiconicoLiveIE(InfoExtractor): 'info_dict': { 'id': 'lv339533123', 'title': '激辛ペヤング食べます‪( ;ᯅ; )‬(歌枠オーディション参加中)', - 'view_count': 1526, - 'comment_count': 1772, + 'view_count': int, + 'comment_count': int, 'description': '初めましてもかって言います❕\nのんびり自由に適当に暮らしてます', 'uploader': 'もか', 'channel': 'ゲストさんのコミュニティ', 'channel_id': 'co5776900', 'channel_url': 'https://com.nicovideo.jp/community/co5776900', 'timestamp': 1670677328, - 'is_live': True, + 'ext': None, + 'live_latency': 'high', + 'live_status': 'was_live', + 'thumbnail': r're:^https://[\w.-]+/\w+/\w+', + 'thumbnails': list, + 'upload_date': '20221210', }, - 'skip': 'livestream', + 'params': { + 'skip_download': True, + 'ignore_no_formats_error': True, + }, + 'expected_warnings': [ + 'The live hasn\'t started yet or already ended.', + 'No video formats found!', + 'Requested format is not available', + ], }, { 'url': 'https://live2.nicovideo.jp/watch/lv339533123', 'only_matching': True, @@ -943,36 +956,14 @@ class NiconicoLiveIE(InfoExtractor): _KNOWN_LATENCY = ('high', 'low') - def _real_extract(self, url): - video_id = self._match_id(url) - webpage, urlh = self._download_webpage_handle(f'https://live.nicovideo.jp/watch/{video_id}', video_id) - - embedded_data = self._parse_json(unescapeHTML(self._search_regex( - r'