From adbc4ec4bbfbe57842049cf9194384480f534859 Mon Sep 17 00:00:00 2001 From: The Hatsune Daishi Date: Mon, 20 Dec 2021 15:06:46 +0900 Subject: [PATCH] [dash,youtube] Download live from start to end (#888) * Add option `--live-from-start` to enable downloading live videos from start * Add key `is_from_start` in formats to identify formats (of live videos) that downloads from start * [dash] Create protocol `http_dash_segments_generator` that allows a function to be passed instead of fragments * [fragment] Allow multiple live dash formats to download simultaneously * [youtube] Implement fragment re-fetching for the live dash formats * [youtube] Re-extract dash manifest every 5 hours (manifest expires in 6hrs) * [postprocessor/ffmpeg] Add `FFmpegFixupDuplicateMoovPP` to fixup duplicated moov atoms Known issue: Ctrl+C doesn't work on Windows when downloading multiple formats Closes #1521 Authored by: nao20010128nao, pukkandan --- README.md | 7 +- yt_dlp/YoutubeDL.py | 77 ++++++++---- yt_dlp/__init__.py | 1 + yt_dlp/downloader/__init__.py | 12 +- yt_dlp/downloader/dash.py | 68 +++++++---- yt_dlp/downloader/f4m.py | 2 +- yt_dlp/downloader/fragment.py | 47 ++++++-- yt_dlp/extractor/common.py | 7 +- yt_dlp/extractor/youtube.py | 201 +++++++++++++++++++++++++++---- yt_dlp/minicurses.py | 1 + yt_dlp/options.py | 8 ++ yt_dlp/postprocessor/__init__.py | 1 + yt_dlp/postprocessor/common.py | 3 +- yt_dlp/postprocessor/ffmpeg.py | 14 ++- yt_dlp/utils.py | 6 - 15 files changed, 355 insertions(+), 100 deletions(-) diff --git a/README.md b/README.md index ef83b8e3b..6311157df 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ # NEW FEATURES * Redirect channel's home URL automatically to `/video` to preserve the old behaviour * `255kbps` audio is extracted (if available) from youtube music when premium cookies are given * Youtube music Albums, channels etc can be downloaded ([except self-uploaded music](https://github.com/yt-dlp/yt-dlp/issues/723)) + * Download livestreams from the start using `--live-from-start` * **Cookies from browser**: Cookies can be automatically extracted from all major web browsers using `--cookies-from-browser BROWSER[:PROFILE]` @@ -340,6 +341,10 @@ ## General Options: --flat-playlist Do not extract the videos of a playlist, only list them --no-flat-playlist Extract the videos of a playlist + --live-from-start Download livestreams from the start. + Currently only supported for YouTube + --no-live-from-start Download livestreams from the current + time (default) --wait-for-video MIN[-MAX] Wait for scheduled streams to become available. Pass the minimum number of seconds (or range) to wait between retries @@ -1585,7 +1590,7 @@ #### youtube * `skip`: `hls` or `dash` (or both) to skip download of the respective manifests * `player_client`: Clients to extract video data from. The main clients are `web`, `android`, `ios`, `mweb`. These also have `_music`, `_embedded`, `_agegate`, and `_creator` variants (Eg: `web_embedded`) (`mweb` has only `_agegate`). By default, `android,web` is used, but the agegate and creator variants are added as required for age-gated videos. Similarly the music variants are added for `music.youtube.com` urls. You can also use `all` to use all the clients, and `default` for the default clients. * `player_skip`: Skip some network requests that are generally needed for robust extraction. One or more of `configs` (skip client configs), `webpage` (skip initial webpage), `js` (skip js player). While these options can help reduce the number of requests needed or avoid some rate-limiting, they could cause some issues. See [#860](https://github.com/yt-dlp/yt-dlp/pull/860) for more details -* `include_live_dash`: Include live dash formats (These formats don't download properly) +* `include_live_dash`: Include live dash formats even without `--live-from-start` (These formats don't download properly) * `comment_sort`: `top` or `new` (default) - choose comment sorting mode (on YouTube's side) * `max_comments`: Limit the amount of comments to gather. Comma-separated list of integers representing `max-comments,max-parents,max-replies,max-replies-per-thread`. Default is `all,all,all,all`. * E.g. `all,all,1000,10` will get a maximum of 1000 replies total, with up to 10 replies per thread. `1000,all,100` will get a maximum of 1000 comments, with a maximum of 100 replies total. diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py index 80d779bee..b5d438096 100644 --- a/yt_dlp/YoutubeDL.py +++ b/yt_dlp/YoutubeDL.py @@ -5,7 +5,6 @@ import collections import contextlib -import copy import datetime import errno import fileinput @@ -144,6 +143,7 @@ from .postprocessor import ( get_postprocessor, EmbedThumbnailPP, + FFmpegFixupDuplicateMoovPP, FFmpegFixupDurationPP, FFmpegFixupM3u8PP, FFmpegFixupM4aPP, @@ -1107,7 +1107,7 @@ def get_value(mdict): def _dumpjson_default(obj): if isinstance(obj, (set, LazyList)): return list(obj) - raise TypeError(f'Object of type {type(obj).__name__} is not JSON serializable') + return repr(obj) def create_key(outer_mobj): if not outer_mobj.group('has_key'): @@ -2071,8 +2071,7 @@ def selector_function(ctx): selector_1, selector_2 = map(_build_selector_function, selector.selector) def selector_function(ctx): - for pair in itertools.product( - selector_1(copy.deepcopy(ctx)), selector_2(copy.deepcopy(ctx))): + for pair in itertools.product(selector_1(ctx), selector_2(ctx)): yield _merge(pair) elif selector.type == SINGLE: # atom @@ -2142,7 +2141,7 @@ def selector_function(ctx): filters = [self._build_format_filter(f) for f in selector.filters] def final_selector(ctx): - ctx_copy = copy.deepcopy(ctx) + ctx_copy = dict(ctx) for _filter in filters: ctx_copy['formats'] = list(filter(_filter, ctx_copy['formats'])) return selector_function(ctx_copy) @@ -2354,6 +2353,10 @@ def sanitize_numeric_fields(info): if not self.params.get('allow_unplayable_formats'): formats = [f for f in formats if not f.get('has_drm')] + if info_dict.get('is_live'): + get_from_start = bool(self.params.get('live_from_start')) + formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start] + if not formats: self.raise_no_formats(info_dict) @@ -2660,7 +2663,9 @@ def dl(self, name, info, subtitle=False, test=False): urls = '", "'.join([f['url'] for f in info.get('requested_formats', [])] or [info['url']]) self.write_debug('Invoking downloader on "%s"' % urls) - new_info = copy.deepcopy(self._copy_infodict(info)) + # Note: Ideally info should be a deep-copied so that hooks cannot modify it. + # But it may contain objects that are not deep-copyable + new_info = self._copy_infodict(info) if new_info.get('http_headers') is None: new_info['http_headers'] = self._calc_headers(new_info) return fd.download(name, new_info, subtitle) @@ -2675,7 +2680,7 @@ def process_info(self, info_dict): if self._num_downloads >= int(max_downloads): raise MaxDownloadsReached() - if info_dict.get('is_live'): + if info_dict.get('is_live') and not self.params.get('live_from_start'): info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M') # TODO: backward compatibility, to be removed @@ -2889,15 +2894,22 @@ def correct_ext(filename, ext=new_ext): dl_filename = existing_file(full_filename, temp_filename) info_dict['__real_download'] = False + downloaded = [] + merger = FFmpegMergerPP(self) + + fd = get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-') if dl_filename is not None: self.report_file_already_downloaded(dl_filename) - elif get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-'): + elif fd: + for f in requested_formats if fd != FFmpegFD else []: + f['filepath'] = fname = prepend_extension( + correct_ext(temp_filename, info_dict['ext']), + 'f%s' % f['format_id'], info_dict['ext']) + downloaded.append(fname) info_dict['url'] = '\n'.join(f['url'] for f in requested_formats) success, real_download = self.dl(temp_filename, info_dict) info_dict['__real_download'] = real_download else: - downloaded = [] - merger = FFmpegMergerPP(self) if self.params.get('allow_unplayable_formats'): self.report_warning( 'You have requested merging of multiple formats ' @@ -2909,7 +2921,7 @@ def correct_ext(filename, ext=new_ext): 'The formats won\'t be merged.') if temp_filename == '-': - reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict) + reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict, self.params) else 'but the formats are incompatible for simultaneous download' if merger.available else 'but ffmpeg is not installed') self.report_warning( @@ -2931,14 +2943,15 @@ def correct_ext(filename, ext=new_ext): partial_success, real_download = self.dl(fname, new_info) info_dict['__real_download'] = info_dict['__real_download'] or real_download success = success and partial_success - if merger.available and not self.params.get('allow_unplayable_formats'): - info_dict['__postprocessors'].append(merger) - info_dict['__files_to_merge'] = downloaded - # Even if there were no downloads, it is being merged only now - info_dict['__real_download'] = True - else: - for file in downloaded: - files_to_move[file] = None + + if downloaded and merger.available and not self.params.get('allow_unplayable_formats'): + info_dict['__postprocessors'].append(merger) + info_dict['__files_to_merge'] = downloaded + # Even if there were no downloads, it is being merged only now + info_dict['__real_download'] = True + else: + for file in downloaded: + files_to_move[file] = None else: # Just a single file dl_filename = existing_file(full_filename, temp_filename) @@ -3005,9 +3018,14 @@ def ffmpeg_fixup(cndn, msg, cls): downloader = get_suitable_downloader(info_dict, self.params) if 'protocol' in info_dict else None downloader = downloader.__name__ if downloader else None - ffmpeg_fixup(info_dict.get('requested_formats') is None and downloader == 'HlsFD', - 'Possible MPEG-TS in MP4 container or malformed AAC timestamps', - FFmpegFixupM3u8PP) + + if info_dict.get('requested_formats') is None: # Not necessary if doing merger + ffmpeg_fixup(downloader == 'HlsFD', + 'Possible MPEG-TS in MP4 container or malformed AAC timestamps', + FFmpegFixupM3u8PP) + ffmpeg_fixup(info_dict.get('is_live') and downloader == 'DashSegmentsFD', + 'Possible duplicate MOOV atoms', FFmpegFixupDuplicateMoovPP) + ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed timestamps detected', FFmpegFixupTimestampPP) ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed duration detected', FFmpegFixupDurationPP) @@ -3104,10 +3122,17 @@ def sanitize_info(info_dict, remove_private_keys=False): k.startswith('_') or k in remove_keys or v in empty_values) else: reject = lambda k, v: k in remove_keys - filter_fn = lambda obj: ( - list(map(filter_fn, obj)) if isinstance(obj, (LazyList, list, tuple, set)) - else obj if not isinstance(obj, dict) - else dict((k, filter_fn(v)) for k, v in obj.items() if not reject(k, v))) + + def filter_fn(obj): + if isinstance(obj, dict): + return {k: filter_fn(v) for k, v in obj.items() if not reject(k, v)} + elif isinstance(obj, (list, tuple, set, LazyList)): + return list(map(filter_fn, obj)) + elif obj is None or isinstance(obj, (str, int, float, bool)): + return obj + else: + return repr(obj) + return filter_fn(info_dict) @staticmethod diff --git a/yt_dlp/__init__.py b/yt_dlp/__init__.py index 3dccdb186..ab68f26c0 100644 --- a/yt_dlp/__init__.py +++ b/yt_dlp/__init__.py @@ -745,6 +745,7 @@ def report_deprecation(val, old, new=None): 'youtube_include_hls_manifest': opts.youtube_include_hls_manifest, 'encoding': opts.encoding, 'extract_flat': opts.extract_flat, + 'live_from_start': opts.live_from_start, 'wait_for_video': opts.wait_for_video, 'mark_watched': opts.mark_watched, 'merge_output_format': opts.merge_output_format, diff --git a/yt_dlp/downloader/__init__.py b/yt_dlp/downloader/__init__.py index 5270e8081..acc19f43a 100644 --- a/yt_dlp/downloader/__init__.py +++ b/yt_dlp/downloader/__init__.py @@ -12,10 +12,15 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N info_copy = info_dict.copy() info_copy['to_stdout'] = to_stdout - downloaders = [_get_suitable_downloader(info_copy, proto, params, default) - for proto in (protocol or info_copy['protocol']).split('+')] + protocols = (protocol or info_copy['protocol']).split('+') + downloaders = [_get_suitable_downloader(info_copy, proto, params, default) for proto in protocols] + if set(downloaders) == {FFmpegFD} and FFmpegFD.can_merge_formats(info_copy, params): return FFmpegFD + elif (set(downloaders) == {DashSegmentsFD} + and not (to_stdout and len(protocols) > 1) + and set(protocols) == {'http_dash_segments_generator'}): + return DashSegmentsFD elif len(downloaders) == 1: return downloaders[0] return None @@ -49,6 +54,7 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N 'rtsp': RtspFD, 'f4m': F4mFD, 'http_dash_segments': DashSegmentsFD, + 'http_dash_segments_generator': DashSegmentsFD, 'ism': IsmFD, 'mhtml': MhtmlFD, 'niconico_dmc': NiconicoDmcFD, @@ -63,6 +69,7 @@ def shorten_protocol_name(proto, simplify=False): 'm3u8_native': 'm3u8_n', 'rtmp_ffmpeg': 'rtmp_f', 'http_dash_segments': 'dash', + 'http_dash_segments_generator': 'dash_g', 'niconico_dmc': 'dmc', 'websocket_frag': 'WSfrag', } @@ -71,6 +78,7 @@ def shorten_protocol_name(proto, simplify=False): 'https': 'http', 'ftps': 'ftp', 'm3u8_native': 'm3u8', + 'http_dash_segments_generator': 'dash', 'rtmp_ffmpeg': 'rtmp', 'm3u8_frag_urls': 'm3u8', 'dash_frag_urls': 'dash', diff --git a/yt_dlp/downloader/dash.py b/yt_dlp/downloader/dash.py index 6444ad692..8dd43f4fa 100644 --- a/yt_dlp/downloader/dash.py +++ b/yt_dlp/downloader/dash.py @@ -1,4 +1,5 @@ from __future__ import unicode_literals +import time from ..downloader import get_suitable_downloader from .fragment import FragmentFD @@ -15,27 +16,53 @@ class DashSegmentsFD(FragmentFD): FD_NAME = 'dashsegments' def real_download(self, filename, info_dict): - if info_dict.get('is_live'): + if info_dict.get('is_live') and set(info_dict['protocol'].split('+')) != {'http_dash_segments_generator'}: self.report_error('Live DASH videos are not supported') - fragment_base_url = info_dict.get('fragment_base_url') - fragments = info_dict['fragments'][:1] if self.params.get( - 'test', False) else info_dict['fragments'] - + real_start = time.time() real_downloader = get_suitable_downloader( info_dict, self.params, None, protocol='dash_frag_urls', to_stdout=(filename == '-')) - ctx = { - 'filename': filename, - 'total_frags': len(fragments), - } + requested_formats = [{**info_dict, **fmt} for fmt in info_dict.get('requested_formats', [])] + args = [] + for fmt in requested_formats or [info_dict]: + try: + fragment_count = 1 if self.params.get('test') else len(fmt['fragments']) + except TypeError: + fragment_count = None + ctx = { + 'filename': fmt.get('filepath') or filename, + 'live': 'is_from_start' if fmt.get('is_from_start') else fmt.get('is_live'), + 'total_frags': fragment_count, + } - if real_downloader: - self._prepare_external_frag_download(ctx) - else: - self._prepare_and_start_frag_download(ctx, info_dict) + if real_downloader: + self._prepare_external_frag_download(ctx) + else: + self._prepare_and_start_frag_download(ctx, fmt) + ctx['start'] = real_start + + fragments_to_download = self._get_fragments(fmt, ctx) + + if real_downloader: + self.to_screen( + '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename())) + info_dict['fragments'] = fragments_to_download + fd = real_downloader(self.ydl, self.params) + return fd.real_download(filename, info_dict) + + args.append([ctx, fragments_to_download, fmt]) + + return self.download_and_append_fragments_multiple(*args) + + def _resolve_fragments(self, fragments, ctx): + fragments = fragments(ctx) if callable(fragments) else fragments + return [next(fragments)] if self.params.get('test') else fragments + + def _get_fragments(self, fmt, ctx): + fragment_base_url = fmt.get('fragment_base_url') + fragments = self._resolve_fragments(fmt['fragments'], ctx) - fragments_to_download = [] frag_index = 0 for i, fragment in enumerate(fragments): frag_index += 1 @@ -46,17 +73,8 @@ def real_download(self, filename, info_dict): assert fragment_base_url fragment_url = urljoin(fragment_base_url, fragment['path']) - fragments_to_download.append({ + yield { 'frag_index': frag_index, 'index': i, 'url': fragment_url, - }) - - if real_downloader: - self.to_screen( - '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename())) - info_dict['fragments'] = fragments_to_download - fd = real_downloader(self.ydl, self.params) - return fd.real_download(filename, info_dict) - - return self.download_and_append_fragments(ctx, fragments_to_download, info_dict) + } diff --git a/yt_dlp/downloader/f4m.py b/yt_dlp/downloader/f4m.py index 9da2776d9..0008b7c28 100644 --- a/yt_dlp/downloader/f4m.py +++ b/yt_dlp/downloader/f4m.py @@ -366,7 +366,7 @@ def real_download(self, filename, info_dict): ctx = { 'filename': filename, 'total_frags': total_frags, - 'live': live, + 'live': bool(live), } self._prepare_frag_download(ctx) diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py index 04b0f68c0..79c6561c7 100644 --- a/yt_dlp/downloader/fragment.py +++ b/yt_dlp/downloader/fragment.py @@ -1,9 +1,10 @@ from __future__ import division, unicode_literals +import http.client +import json +import math import os import time -import json -from math import ceil try: import concurrent.futures @@ -15,6 +16,7 @@ from .http import HttpFD from ..aes import aes_cbc_decrypt_bytes from ..compat import ( + compat_os_name, compat_urllib_error, compat_struct_pack, ) @@ -90,7 +92,7 @@ def _prepare_and_start_frag_download(self, ctx, info_dict): self._start_frag_download(ctx, info_dict) def __do_ytdl_file(self, ctx): - return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file') + return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file') def _read_ytdl_file(self, ctx): assert 'ytdl_corrupt' not in ctx @@ -375,17 +377,20 @@ def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_f @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... all args must be either tuple or list ''' + interrupt_trigger = [True] max_progress = len(args) if max_progress == 1: return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func) - max_workers = self.params.get('concurrent_fragment_downloads', max_progress) + max_workers = self.params.get('concurrent_fragment_downloads', 1) if max_progress > 1: self._prepare_multiline_status(max_progress) def thread_func(idx, ctx, fragments, info_dict, tpe): ctx['max_progress'] = max_progress ctx['progress_idx'] = idx - return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe) + return self.download_and_append_fragments( + ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, + tpe=tpe, interrupt_trigger=interrupt_trigger) class FTPE(concurrent.futures.ThreadPoolExecutor): # has to stop this or it's going to wait on the worker thread itself @@ -393,8 +398,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): pass spins = [] + if compat_os_name == 'nt': + self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. ' + 'This is a known issue and patches are welcome') for idx, (ctx, fragments, info_dict) in enumerate(args): - tpe = FTPE(ceil(max_workers / max_progress)) + tpe = FTPE(math.ceil(max_workers / max_progress)) job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe) spins.append((tpe, job)) @@ -402,18 +410,32 @@ def __exit__(self, exc_type, exc_val, exc_tb): for tpe, job in spins: try: result = result and job.result() + except KeyboardInterrupt: + interrupt_trigger[0] = False finally: tpe.shutdown(wait=True) + if not interrupt_trigger[0]: + raise KeyboardInterrupt() return result - def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None): + def download_and_append_fragments( + self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, + tpe=None, interrupt_trigger=None): + if not interrupt_trigger: + interrupt_trigger = (True, ) + fragment_retries = self.params.get('fragment_retries', 0) - is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True) + is_fatal = ( + ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0)) + if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)) + if not pack_func: pack_func = lambda frag_content, _: frag_content def download_fragment(fragment, ctx): frag_index = ctx['fragment_index'] = fragment['frag_index'] + if not interrupt_trigger[0]: + return False, frag_index headers = info_dict.get('http_headers', {}).copy() byte_range = fragment.get('byte_range') if byte_range: @@ -428,7 +450,7 @@ def download_fragment(fragment, ctx): if not success: return False, frag_index break - except compat_urllib_error.HTTPError as err: + except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err: # Unavailable (possibly temporary) fragments may be served. # First we try to retry then either skip or abort. # See https://github.com/ytdl-org/youtube-dl/issues/10165, @@ -466,7 +488,8 @@ def append_fragment(frag_content, frag_index, ctx): decrypt_fragment = self.decrypter(info_dict) - max_workers = self.params.get('concurrent_fragment_downloads', 1) + max_workers = math.ceil( + self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1)) if can_threaded_download and max_workers > 1: def _download_fragment(fragment): @@ -477,6 +500,8 @@ def _download_fragment(fragment): self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments): + if not interrupt_trigger[0]: + break ctx['fragment_filename_sanitized'] = frag_filename ctx['fragment_index'] = frag_index result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) @@ -484,6 +509,8 @@ def _download_fragment(fragment): return False else: for fragment in fragments: + if not interrupt_trigger[0]: + break frag_content, frag_index = download_fragment(fragment, ctx) result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) if not result: diff --git a/yt_dlp/extractor/common.py b/yt_dlp/extractor/common.py index 52099b4b4..9abbaf04f 100644 --- a/yt_dlp/extractor/common.py +++ b/yt_dlp/extractor/common.py @@ -163,9 +163,8 @@ class InfoExtractor(object): * filesize_approx An estimate for the number of bytes * player_url SWF Player URL (used for rtmpdump). * protocol The protocol that will be used for the actual - download, lower-case. - "http", "https", "rtsp", "rtmp", "rtmp_ffmpeg", "rtmpe", - "m3u8", "m3u8_native" or "http_dash_segments". + download, lower-case. One of "http", "https" or + one of the protocols defined in downloader.PROTOCOL_MAP * fragment_base_url Base URL for fragments. Each fragment's path value (if present) will be relative to @@ -181,6 +180,8 @@ class InfoExtractor(object): fragment_base_url * "duration" (optional, int or float) * "filesize" (optional, int) + * is_from_start Is a live format that can be downloaded + from the start. Boolean * preference Order number of this format. If this field is present and not None, the formats get sorted by this field, regardless of all other values. diff --git a/yt_dlp/extractor/youtube.py b/yt_dlp/extractor/youtube.py index 5a3b98bb5..1f5009399 100644 --- a/yt_dlp/extractor/youtube.py +++ b/yt_dlp/extractor/youtube.py @@ -5,6 +5,7 @@ import calendar import copy import datetime +import functools import hashlib import itertools import json @@ -15,6 +16,7 @@ import sys import time import traceback +import threading from .common import InfoExtractor, SearchInfoExtractor from ..compat import ( @@ -1747,6 +1749,142 @@ def __init__(self, *args, **kwargs): self._code_cache = {} self._player_cache = {} + def _prepare_live_from_start_formats(self, formats, video_id, live_start_time, url, webpage_url, smuggled_data): + EXPIRATION_DURATION = 18_000 + lock = threading.Lock() + + is_live = True + expiration_time = time.time() + EXPIRATION_DURATION + formats = [f for f in formats if f.get('is_from_start')] + + def refetch_manifest(format_id): + nonlocal formats, expiration_time, is_live + if time.time() <= expiration_time: + return + + _, _, prs, player_url = self._download_player_responses(url, smuggled_data, video_id, webpage_url) + video_details = traverse_obj( + prs, (..., 'videoDetails'), expected_type=dict, default=[]) + microformats = traverse_obj( + prs, (..., 'microformat', 'playerMicroformatRenderer'), + expected_type=dict, default=[]) + _, is_live, _, formats = self._list_formats(video_id, microformats, video_details, prs, player_url) + expiration_time = time.time() + EXPIRATION_DURATION + + def mpd_feed(format_id): + """ + @returns (manifest_url, manifest_stream_number, is_live) or None + """ + with lock: + refetch_manifest(format_id) + + f = next((f for f in formats if f['format_id'] == format_id), None) + if not f: + self.report_warning( + f'Cannot find refreshed manifest for format {format_id}{bug_reports_message()}') + return None + return f['manifest_url'], f['manifest_stream_number'], is_live + + for f in formats: + f['protocol'] = 'http_dash_segments_generator' + f['fragments'] = functools.partial( + self._live_dash_fragments, f['format_id'], live_start_time, mpd_feed) + + def _live_dash_fragments(self, format_id, live_start_time, mpd_feed, ctx): + FETCH_SPAN, MAX_DURATION = 5, 432000 + + mpd_url, stream_number, is_live = None, None, True + + begin_index = 0 + download_start_time = ctx.get('start') or time.time() + + lack_early_segments = download_start_time - (live_start_time or download_start_time) > MAX_DURATION + if lack_early_segments: + self.report_warning(bug_reports_message( + 'Starting download from the last 120 hours of the live stream since ' + 'YouTube does not have data before that. If you think this is wrong,'), only_once=True) + lack_early_segments = True + + known_idx, no_fragment_score, last_segment_url = begin_index, 0, None + fragments, fragment_base_url = None, None + + def _extract_sequence_from_mpd(refresh_sequence): + nonlocal mpd_url, stream_number, is_live, no_fragment_score, fragments, fragment_base_url + # Obtain from MPD's maximum seq value + old_mpd_url = mpd_url + mpd_url, stream_number, is_live = mpd_feed(format_id) or (mpd_url, stream_number, False) + if old_mpd_url == mpd_url and not refresh_sequence: + return True, last_seq + try: + fmts, _ = self._extract_mpd_formats_and_subtitles( + mpd_url, None, note=False, errnote=False, fatal=False) + except ExtractorError: + fmts = None + if not fmts: + no_fragment_score += 1 + return False, last_seq + fmt_info = next(x for x in fmts if x['manifest_stream_number'] == stream_number) + fragments = fmt_info['fragments'] + fragment_base_url = fmt_info['fragment_base_url'] + assert fragment_base_url + + _last_seq = int(re.search(r'(?:/|^)sq/(\d+)', fragments[-1]['path']).group(1)) + return True, _last_seq + + while is_live: + fetch_time = time.time() + if no_fragment_score > 30: + return + if last_segment_url: + # Obtain from "X-Head-Seqnum" header value from each segment + try: + urlh = self._request_webpage( + last_segment_url, None, note=False, errnote=False, fatal=False) + except ExtractorError: + urlh = None + last_seq = try_get(urlh, lambda x: int_or_none(x.headers['X-Head-Seqnum'])) + if last_seq is None: + no_fragment_score += 1 + last_segment_url = None + continue + else: + should_retry, last_seq = _extract_sequence_from_mpd(True) + if not should_retry: + continue + + if known_idx > last_seq: + last_segment_url = None + continue + + last_seq += 1 + + if begin_index < 0 and known_idx < 0: + # skip from the start when it's negative value + known_idx = last_seq + begin_index + if lack_early_segments: + known_idx = max(known_idx, last_seq - int(MAX_DURATION // fragments[-1]['duration'])) + try: + for idx in range(known_idx, last_seq): + # do not update sequence here or you'll get skipped some part of it + should_retry, _ = _extract_sequence_from_mpd(False) + if not should_retry: + # retry when it gets weird state + known_idx = idx - 1 + raise ExtractorError('breaking out of outer loop') + last_segment_url = urljoin(fragment_base_url, 'sq/%d' % idx) + yield { + 'url': last_segment_url, + } + if known_idx == last_seq: + no_fragment_score += 5 + else: + no_fragment_score = 0 + known_idx = last_seq + except ExtractorError: + continue + + time.sleep(max(0, FETCH_SPAN + fetch_time - time.time())) + def _extract_player_url(self, *ytcfgs, webpage=None): player_url = traverse_obj( ytcfgs, (..., 'PLAYER_JS_URL'), (..., 'WEB_PLAYER_CONTEXT_CONFIGS', ..., 'jsUrl'), @@ -2548,11 +2686,13 @@ def _extract_formats(self, streaming_data, video_id, player_url, is_live): dct['container'] = dct['ext'] + '_dash' yield dct + live_from_start = is_live and self.get_param('live_from_start') skip_manifests = self._configuration_arg('skip') - get_dash = ( - (not is_live or self._configuration_arg('include_live_dash')) - and 'dash' not in skip_manifests and self.get_param('youtube_include_dash_manifest', True)) - get_hls = 'hls' not in skip_manifests and self.get_param('youtube_include_hls_manifest', True) + if not self.get_param('youtube_include_hls_manifest', True): + skip_manifests.append('hls') + get_dash = 'dash' not in skip_manifests and ( + not is_live or live_from_start or self._configuration_arg('include_live_dash')) + get_hls = not live_from_start and 'hls' not in skip_manifests def process_manifest_format(f, proto, itag): if itag in itags: @@ -2583,6 +2723,9 @@ def process_manifest_format(f, proto, itag): if process_manifest_format(f, 'dash', f['format_id']): f['filesize'] = int_or_none(self._search_regex( r'/clen/(\d+)', f.get('fragment_base_url') or f['url'], 'file size', default=None)) + if live_from_start: + f['is_from_start'] = True + yield f def _extract_storyboard(self, player_responses, duration): @@ -2620,12 +2763,7 @@ def _extract_storyboard(self, player_responses, duration): } for j in range(math.ceil(fragment_count))], } - def _real_extract(self, url): - url, smuggled_data = unsmuggle_url(url, {}) - video_id = self._match_id(url) - - base_url = self.http_scheme() + '//www.youtube.com/' - webpage_url = base_url + 'watch?v=' + video_id + def _download_player_responses(self, url, smuggled_data, video_id, webpage_url): webpage = None if 'webpage' not in self._configuration_arg('player_skip'): webpage = self._download_webpage( @@ -2637,6 +2775,28 @@ def _real_extract(self, url): self._get_requested_clients(url, smuggled_data), video_id, webpage, master_ytcfg) + return webpage, master_ytcfg, player_responses, player_url + + def _list_formats(self, video_id, microformats, video_details, player_responses, player_url): + live_broadcast_details = traverse_obj(microformats, (..., 'liveBroadcastDetails')) + is_live = get_first(video_details, 'isLive') + if is_live is None: + is_live = get_first(live_broadcast_details, 'isLiveNow') + + streaming_data = traverse_obj(player_responses, (..., 'streamingData'), default=[]) + formats = list(self._extract_formats(streaming_data, video_id, player_url, is_live)) + + return live_broadcast_details, is_live, streaming_data, formats + + def _real_extract(self, url): + url, smuggled_data = unsmuggle_url(url, {}) + video_id = self._match_id(url) + + base_url = self.http_scheme() + '//www.youtube.com/' + webpage_url = base_url + 'watch?v=' + video_id + + webpage, master_ytcfg, player_responses, player_url = self._download_player_responses(url, smuggled_data, video_id, webpage_url) + playability_statuses = traverse_obj( player_responses, (..., 'playabilityStatus'), expected_type=dict, default=[]) @@ -2705,13 +2865,7 @@ def feed_entry(name): return self.playlist_result( entries, video_id, video_title, video_description) - live_broadcast_details = traverse_obj(microformats, (..., 'liveBroadcastDetails')) - is_live = get_first(video_details, 'isLive') - if is_live is None: - is_live = get_first(live_broadcast_details, 'isLiveNow') - - streaming_data = traverse_obj(player_responses, (..., 'streamingData'), default=[]) - formats = list(self._extract_formats(streaming_data, video_id, player_url, is_live)) + live_broadcast_details, is_live, streaming_data, formats = self._list_formats(video_id, microformats, video_details, player_responses, player_url) if not formats: if not self.get_param('allow_unplayable_formats') and traverse_obj(streaming_data, (..., 'licenseInfos')): @@ -2814,10 +2968,13 @@ def feed_entry(name): is_live = False if is_upcoming is None and (live_content or is_live): is_upcoming = False - live_starttime = parse_iso8601(get_first(live_broadcast_details, 'startTimestamp')) - live_endtime = parse_iso8601(get_first(live_broadcast_details, 'endTimestamp')) - if not duration and live_endtime and live_starttime: - duration = live_endtime - live_starttime + live_start_time = parse_iso8601(get_first(live_broadcast_details, 'startTimestamp')) + live_end_time = parse_iso8601(get_first(live_broadcast_details, 'endTimestamp')) + if not duration and live_end_time and live_start_time: + duration = live_end_time - live_start_time + + if is_live and self.get_param('live_from_start'): + self._prepare_live_from_start_formats(formats, video_id, live_start_time, url, webpage_url, smuggled_data) formats.extend(self._extract_storyboard(player_responses, duration)) @@ -2860,7 +3017,7 @@ def feed_entry(name): else None if is_live is None or is_upcoming is None else live_content), 'live_status': 'is_upcoming' if is_upcoming else None, # rest will be set by YoutubeDL - 'release_timestamp': live_starttime, + 'release_timestamp': live_start_time, } pctr = traverse_obj(player_responses, (..., 'captions', 'playerCaptionsTracklistRenderer'), expected_type=dict) diff --git a/yt_dlp/minicurses.py b/yt_dlp/minicurses.py index c81153c1e..f9f99e390 100644 --- a/yt_dlp/minicurses.py +++ b/yt_dlp/minicurses.py @@ -147,6 +147,7 @@ def _move_cursor(self, dest): def print_at_line(self, text, pos): if self._HAVE_FULLCAP: self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text) + return text = self._add_line_number(text, pos) textlen = len(text) diff --git a/yt_dlp/options.py b/yt_dlp/options.py index f4293e688..e3d753adf 100644 --- a/yt_dlp/options.py +++ b/yt_dlp/options.py @@ -258,6 +258,14 @@ def _dict_from_options_callback( '--no-flat-playlist', action='store_false', dest='extract_flat', help='Extract the videos of a playlist') + general.add_option( + '--live-from-start', + action='store_true', dest='live_from_start', + help='Download livestreams from the start. Currently only supported for YouTube') + general.add_option( + '--no-live-from-start', + action='store_false', dest='live_from_start', + help='Download livestreams from the current time (default)') general.add_option( '--wait-for-video', dest='wait_for_video', metavar='MIN[-MAX]', default=None, diff --git a/yt_dlp/postprocessor/__init__.py b/yt_dlp/postprocessor/__init__.py index 4ae230d2f..7f8adb368 100644 --- a/yt_dlp/postprocessor/__init__.py +++ b/yt_dlp/postprocessor/__init__.py @@ -9,6 +9,7 @@ FFmpegPostProcessor, FFmpegEmbedSubtitlePP, FFmpegExtractAudioPP, + FFmpegFixupDuplicateMoovPP, FFmpegFixupDurationPP, FFmpegFixupStretchedPP, FFmpegFixupTimestampPP, diff --git a/yt_dlp/postprocessor/common.py b/yt_dlp/postprocessor/common.py index ab9eb6acf..f2467c542 100644 --- a/yt_dlp/postprocessor/common.py +++ b/yt_dlp/postprocessor/common.py @@ -1,6 +1,5 @@ from __future__ import unicode_literals -import copy import functools import os @@ -18,7 +17,7 @@ class PostProcessorMetaClass(type): def run_wrapper(func): @functools.wraps(func) def run(self, info, *args, **kwargs): - info_copy = copy.deepcopy(self._copy_infodict(info)) + info_copy = self._copy_infodict(info) self._hook_progress({'status': 'started'}, info_copy) ret = func(self, info, *args, **kwargs) if ret is not None: diff --git a/yt_dlp/postprocessor/ffmpeg.py b/yt_dlp/postprocessor/ffmpeg.py index 26af55a9b..594762974 100644 --- a/yt_dlp/postprocessor/ffmpeg.py +++ b/yt_dlp/postprocessor/ffmpeg.py @@ -908,13 +908,23 @@ def run(self, info): return [], info -class FFmpegFixupDurationPP(FFmpegFixupPostProcessor): +class FFmpegCopyStreamPostProcessor(FFmpegFixupPostProcessor): + MESSAGE = 'Copying stream' + @PostProcessor._restrict_to(images=False) def run(self, info): - self._fixup('Fixing video duration', info['filepath'], ['-c', 'copy', '-map', '0', '-dn']) + self._fixup(self.MESSAGE, info['filepath'], ['-c', 'copy', '-map', '0', '-dn']) return [], info +class FFmpegFixupDurationPP(FFmpegCopyStreamPostProcessor): + MESSAGE = 'Fixing video duration' + + +class FFmpegFixupDuplicateMoovPP(FFmpegCopyStreamPostProcessor): + MESSAGE = 'Fixing duplicate MOOV atoms' + + class FFmpegSubtitlesConvertorPP(FFmpegPostProcessor): SUPPORTED_EXTS = ('srt', 'vtt', 'ass', 'lrc') diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py index 81c95f3e9..2919324c6 100644 --- a/yt_dlp/utils.py +++ b/yt_dlp/utils.py @@ -2631,12 +2631,6 @@ def __reversed__(self): def __copy__(self): return type(self)(self.__iterable, reverse=self.__reversed, _cache=self.__cache) - def __deepcopy__(self, memo): - # FIXME: This is actually just a shallow copy - id_ = id(self) - memo[id_] = self.__copy__() - return memo[id_] - def __repr__(self): # repr and str should mimic a list. So we exhaust the iterable return repr(self.exhaust())