From a6213a49250129f25e8f435ff3fadf4a3237f6e1 Mon Sep 17 00:00:00 2001 From: pukkandan Date: Wed, 24 Nov 2021 08:31:52 +0530 Subject: [PATCH] [cleanup,youtube] Reorganize Tab and Search extractor inheritances --- yt_dlp/extractor/youtube.py | 1395 ++++++++++++++++++----------------- 1 file changed, 698 insertions(+), 697 deletions(-) diff --git a/yt_dlp/extractor/youtube.py b/yt_dlp/extractor/youtube.py index 632129bc6..a8d515f5c 100644 --- a/yt_dlp/extractor/youtube.py +++ b/yt_dlp/extractor/youtube.py @@ -44,6 +44,7 @@ from ..utils import ( join_nonempty, mimetype2ext, network_exceptions, + NO_DEFAULT, orderedSet, parse_codecs, parse_count, @@ -3116,26 +3117,699 @@ class YoutubeIE(YoutubeBaseInfoExtractor): return info +class YoutubeTabBaseInfoExtractor(YoutubeBaseInfoExtractor): -class YoutubeTabIE(YoutubeBaseInfoExtractor): + def _extract_channel_id(self, webpage): + channel_id = self._html_search_meta( + 'channelId', webpage, 'channel id', default=None) + if channel_id: + return channel_id + channel_url = self._html_search_meta( + ('og:url', 'al:ios:url', 'al:android:url', 'al:web:url', + 'twitter:url', 'twitter:app:url:iphone', 'twitter:app:url:ipad', + 'twitter:app:url:googleplay'), webpage, 'channel url') + return self._search_regex( + r'https?://(?:www\.)?youtube\.com/channel/([^/?#&])+', + channel_url, 'channel id') + + @staticmethod + def _extract_basic_item_renderer(item): + # Modified from _extract_grid_item_renderer + known_basic_renderers = ( + 'playlistRenderer', 'videoRenderer', 'channelRenderer', 'showRenderer' + ) + for key, renderer in item.items(): + if not isinstance(renderer, dict): + continue + elif key in known_basic_renderers: + return renderer + elif key.startswith('grid') and key.endswith('Renderer'): + return renderer + + def _grid_entries(self, grid_renderer): + for item in grid_renderer['items']: + if not isinstance(item, dict): + continue + renderer = self._extract_basic_item_renderer(item) + if not isinstance(renderer, dict): + continue + title = self._get_text(renderer, 'title') + + # playlist + playlist_id = renderer.get('playlistId') + if playlist_id: + yield self.url_result( + 'https://www.youtube.com/playlist?list=%s' % playlist_id, + ie=YoutubeTabIE.ie_key(), video_id=playlist_id, + video_title=title) + continue + # video + video_id = renderer.get('videoId') + if video_id: + yield self._extract_video(renderer) + continue + # channel + channel_id = renderer.get('channelId') + if channel_id: + yield self.url_result( + 'https://www.youtube.com/channel/%s' % channel_id, + ie=YoutubeTabIE.ie_key(), video_title=title) + continue + # generic endpoint URL support + ep_url = urljoin('https://www.youtube.com/', try_get( + renderer, lambda x: x['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'], + compat_str)) + if ep_url: + for ie in (YoutubeTabIE, YoutubePlaylistIE, YoutubeIE): + if ie.suitable(ep_url): + yield self.url_result( + ep_url, ie=ie.ie_key(), video_id=ie._match_id(ep_url), video_title=title) + break + + def _shelf_entries_from_content(self, shelf_renderer): + content = shelf_renderer.get('content') + if not isinstance(content, dict): + return + renderer = content.get('gridRenderer') or content.get('expandedShelfContentsRenderer') + if renderer: + # TODO: add support for nested playlists so each shelf is processed + # as separate playlist + # TODO: this includes only first N items + for entry in self._grid_entries(renderer): + yield entry + renderer = content.get('horizontalListRenderer') + if renderer: + # TODO + pass + + def _shelf_entries(self, shelf_renderer, skip_channels=False): + ep = try_get( + shelf_renderer, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'], + compat_str) + shelf_url = urljoin('https://www.youtube.com', ep) + if shelf_url: + # Skipping links to another channels, note that checking for + # endpoint.commandMetadata.webCommandMetadata.webPageTypwebPageType == WEB_PAGE_TYPE_CHANNEL + # will not work + if skip_channels and '/channels?' in shelf_url: + return + title = self._get_text(shelf_renderer, 'title') + yield self.url_result(shelf_url, video_title=title) + # Shelf may not contain shelf URL, fallback to extraction from content + for entry in self._shelf_entries_from_content(shelf_renderer): + yield entry + + def _playlist_entries(self, video_list_renderer): + for content in video_list_renderer['contents']: + if not isinstance(content, dict): + continue + renderer = content.get('playlistVideoRenderer') or content.get('playlistPanelVideoRenderer') + if not isinstance(renderer, dict): + continue + video_id = renderer.get('videoId') + if not video_id: + continue + yield self._extract_video(renderer) + + def _rich_entries(self, rich_grid_renderer): + renderer = try_get( + rich_grid_renderer, lambda x: x['content']['videoRenderer'], dict) or {} + video_id = renderer.get('videoId') + if not video_id: + return + yield self._extract_video(renderer) + + def _video_entry(self, video_renderer): + video_id = video_renderer.get('videoId') + if video_id: + return self._extract_video(video_renderer) + + def _post_thread_entries(self, post_thread_renderer): + post_renderer = try_get( + post_thread_renderer, lambda x: x['post']['backstagePostRenderer'], dict) + if not post_renderer: + return + # video attachment + video_renderer = try_get( + post_renderer, lambda x: x['backstageAttachment']['videoRenderer'], dict) or {} + video_id = video_renderer.get('videoId') + if video_id: + entry = self._extract_video(video_renderer) + if entry: + yield entry + # playlist attachment + playlist_id = try_get( + post_renderer, lambda x: x['backstageAttachment']['playlistRenderer']['playlistId'], compat_str) + if playlist_id: + yield self.url_result( + 'https://www.youtube.com/playlist?list=%s' % playlist_id, + ie=YoutubeTabIE.ie_key(), video_id=playlist_id) + # inline video links + runs = try_get(post_renderer, lambda x: x['contentText']['runs'], list) or [] + for run in runs: + if not isinstance(run, dict): + continue + ep_url = try_get( + run, lambda x: x['navigationEndpoint']['urlEndpoint']['url'], compat_str) + if not ep_url: + continue + if not YoutubeIE.suitable(ep_url): + continue + ep_video_id = YoutubeIE._match_id(ep_url) + if video_id == ep_video_id: + continue + yield self.url_result(ep_url, ie=YoutubeIE.ie_key(), video_id=ep_video_id) + + def _post_thread_continuation_entries(self, post_thread_continuation): + contents = post_thread_continuation.get('contents') + if not isinstance(contents, list): + return + for content in contents: + renderer = content.get('backstagePostThreadRenderer') + if not isinstance(renderer, dict): + continue + for entry in self._post_thread_entries(renderer): + yield entry + + r''' # unused + def _rich_grid_entries(self, contents): + for content in contents: + video_renderer = try_get(content, lambda x: x['richItemRenderer']['content']['videoRenderer'], dict) + if video_renderer: + entry = self._video_entry(video_renderer) + if entry: + yield entry + ''' + def _extract_entries(self, parent_renderer, continuation_list): + # continuation_list is modified in-place with continuation_list = [continuation_token] + continuation_list[:] = [None] + contents = try_get(parent_renderer, lambda x: x['contents'], list) or [] + for content in contents: + if not isinstance(content, dict): + continue + is_renderer = try_get(content, lambda x: x['itemSectionRenderer'], dict) + if not is_renderer: + renderer = content.get('richItemRenderer') + if renderer: + for entry in self._rich_entries(renderer): + yield entry + continuation_list[0] = self._extract_continuation(parent_renderer) + continue + isr_contents = try_get(is_renderer, lambda x: x['contents'], list) or [] + for isr_content in isr_contents: + if not isinstance(isr_content, dict): + continue + + known_renderers = { + 'playlistVideoListRenderer': self._playlist_entries, + 'gridRenderer': self._grid_entries, + 'shelfRenderer': lambda x: self._shelf_entries(x), + 'backstagePostThreadRenderer': self._post_thread_entries, + 'videoRenderer': lambda x: [self._video_entry(x)], + } + for key, renderer in isr_content.items(): + if key not in known_renderers: + continue + for entry in known_renderers[key](renderer): + if entry: + yield entry + continuation_list[0] = self._extract_continuation(renderer) + break + + if not continuation_list[0]: + continuation_list[0] = self._extract_continuation(is_renderer) + + if not continuation_list[0]: + continuation_list[0] = self._extract_continuation(parent_renderer) + + def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data): + continuation_list = [None] + extract_entries = lambda x: self._extract_entries(x, continuation_list) + tab_content = try_get(tab, lambda x: x['content'], dict) + if not tab_content: + return + parent_renderer = ( + try_get(tab_content, lambda x: x['sectionListRenderer'], dict) + or try_get(tab_content, lambda x: x['richGridRenderer'], dict) or {}) + for entry in extract_entries(parent_renderer): + yield entry + continuation = continuation_list[0] + + for page_num in itertools.count(1): + if not continuation: + break + headers = self.generate_api_headers( + ytcfg=ytcfg, account_syncid=account_syncid, visitor_data=visitor_data) + response = self._extract_response( + item_id='%s page %s' % (item_id, page_num), + query=continuation, headers=headers, ytcfg=ytcfg, + check_get_keys=('continuationContents', 'onResponseReceivedActions', 'onResponseReceivedEndpoints')) + + if not response: + break + # Extracting updated visitor data is required to prevent an infinite extraction loop in some cases + # See: https://github.com/ytdl-org/youtube-dl/issues/28702 + visitor_data = self._extract_visitor_data(response) or visitor_data + + known_continuation_renderers = { + 'playlistVideoListContinuation': self._playlist_entries, + 'gridContinuation': self._grid_entries, + 'itemSectionContinuation': self._post_thread_continuation_entries, + 'sectionListContinuation': extract_entries, # for feeds + } + continuation_contents = try_get( + response, lambda x: x['continuationContents'], dict) or {} + continuation_renderer = None + for key, value in continuation_contents.items(): + if key not in known_continuation_renderers: + continue + continuation_renderer = value + continuation_list = [None] + for entry in known_continuation_renderers[key](continuation_renderer): + yield entry + continuation = continuation_list[0] or self._extract_continuation(continuation_renderer) + break + if continuation_renderer: + continue + + known_renderers = { + 'gridPlaylistRenderer': (self._grid_entries, 'items'), + 'gridVideoRenderer': (self._grid_entries, 'items'), + 'gridChannelRenderer': (self._grid_entries, 'items'), + 'playlistVideoRenderer': (self._playlist_entries, 'contents'), + 'itemSectionRenderer': (extract_entries, 'contents'), # for feeds + 'richItemRenderer': (extract_entries, 'contents'), # for hashtag + 'backstagePostThreadRenderer': (self._post_thread_continuation_entries, 'contents') + } + on_response_received = dict_get(response, ('onResponseReceivedActions', 'onResponseReceivedEndpoints')) + continuation_items = try_get( + on_response_received, lambda x: x[0]['appendContinuationItemsAction']['continuationItems'], list) + continuation_item = try_get(continuation_items, lambda x: x[0], dict) or {} + video_items_renderer = None + for key, value in continuation_item.items(): + if key not in known_renderers: + continue + video_items_renderer = {known_renderers[key][1]: continuation_items} + continuation_list = [None] + for entry in known_renderers[key][0](video_items_renderer): + yield entry + continuation = continuation_list[0] or self._extract_continuation(video_items_renderer) + break + if video_items_renderer: + continue + break + + @staticmethod + def _extract_selected_tab(tabs): + for tab in tabs: + renderer = dict_get(tab, ('tabRenderer', 'expandableTabRenderer')) or {} + if renderer.get('selected') is True: + return renderer + else: + raise ExtractorError('Unable to find selected tab') + + @classmethod + def _extract_uploader(cls, data): + uploader = {} + renderer = cls._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {} + owner = try_get( + renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict) + if owner: + uploader['uploader'] = owner.get('text') + uploader['uploader_id'] = try_get( + owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str) + uploader['uploader_url'] = urljoin( + 'https://www.youtube.com/', + try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str)) + return {k: v for k, v in uploader.items() if v is not None} + + def _extract_from_tabs(self, item_id, ytcfg, data, tabs): + playlist_id = title = description = channel_url = channel_name = channel_id = None + thumbnails_list = [] + tags = [] + + selected_tab = self._extract_selected_tab(tabs) + renderer = try_get( + data, lambda x: x['metadata']['channelMetadataRenderer'], dict) + if renderer: + channel_name = renderer.get('title') + channel_url = renderer.get('channelUrl') + channel_id = renderer.get('externalId') + else: + renderer = try_get( + data, lambda x: x['metadata']['playlistMetadataRenderer'], dict) + + if renderer: + title = renderer.get('title') + description = renderer.get('description', '') + playlist_id = channel_id + tags = renderer.get('keywords', '').split() + thumbnails_list = ( + try_get(renderer, lambda x: x['avatar']['thumbnails'], list) + or try_get( + self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer'), + lambda x: x['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'], + list) + or []) + + thumbnails = [] + for t in thumbnails_list: + if not isinstance(t, dict): + continue + thumbnail_url = url_or_none(t.get('url')) + if not thumbnail_url: + continue + thumbnails.append({ + 'url': thumbnail_url, + 'width': int_or_none(t.get('width')), + 'height': int_or_none(t.get('height')), + }) + if playlist_id is None: + playlist_id = item_id + if title is None: + title = ( + try_get(data, lambda x: x['header']['hashtagHeaderRenderer']['hashtag']['simpleText']) + or playlist_id) + title += format_field(selected_tab, 'title', ' - %s') + title += format_field(selected_tab, 'expandedText', ' - %s') + metadata = { + 'playlist_id': playlist_id, + 'playlist_title': title, + 'playlist_description': description, + 'uploader': channel_name, + 'uploader_id': channel_id, + 'uploader_url': channel_url, + 'thumbnails': thumbnails, + 'tags': tags, + } + availability = self._extract_availability(data) + if availability: + metadata['availability'] = availability + if not channel_id: + metadata.update(self._extract_uploader(data)) + metadata.update({ + 'channel': metadata['uploader'], + 'channel_id': metadata['uploader_id'], + 'channel_url': metadata['uploader_url']}) + return self.playlist_result( + self._entries( + selected_tab, playlist_id, ytcfg, + self._extract_account_syncid(ytcfg, data), + self._extract_visitor_data(data, ytcfg)), + **metadata) + + def _extract_mix_playlist(self, playlist, playlist_id, data, ytcfg): + first_id = last_id = response = None + for page_num in itertools.count(1): + videos = list(self._playlist_entries(playlist)) + if not videos: + return + start = next((i for i, v in enumerate(videos) if v['id'] == last_id), -1) + 1 + if start >= len(videos): + return + for video in videos[start:]: + if video['id'] == first_id: + self.to_screen('First video %s found again; Assuming end of Mix' % first_id) + return + yield video + first_id = first_id or videos[0]['id'] + last_id = videos[-1]['id'] + watch_endpoint = try_get( + playlist, lambda x: x['contents'][-1]['playlistPanelVideoRenderer']['navigationEndpoint']['watchEndpoint']) + headers = self.generate_api_headers( + ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data), + visitor_data=self._extract_visitor_data(response, data, ytcfg)) + query = { + 'playlistId': playlist_id, + 'videoId': watch_endpoint.get('videoId') or last_id, + 'index': watch_endpoint.get('index') or len(videos), + 'params': watch_endpoint.get('params') or 'OAE%3D' + } + response = self._extract_response( + item_id='%s page %d' % (playlist_id, page_num), + query=query, ep='next', headers=headers, ytcfg=ytcfg, + check_get_keys='contents' + ) + playlist = try_get( + response, lambda x: x['contents']['twoColumnWatchNextResults']['playlist']['playlist'], dict) + + def _extract_from_playlist(self, item_id, url, data, playlist, ytcfg): + title = playlist.get('title') or try_get( + data, lambda x: x['titleText']['simpleText'], compat_str) + playlist_id = playlist.get('playlistId') or item_id + + # Delegating everything except mix playlists to regular tab-based playlist URL + playlist_url = urljoin(url, try_get( + playlist, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'], + compat_str)) + if playlist_url and playlist_url != url: + return self.url_result( + playlist_url, ie=YoutubeTabIE.ie_key(), video_id=playlist_id, + video_title=title) + + return self.playlist_result( + self._extract_mix_playlist(playlist, playlist_id, data, ytcfg), + playlist_id=playlist_id, playlist_title=title) + + def _extract_availability(self, data): + """ + Gets the availability of a given playlist/tab. + Note: Unless YouTube tells us explicitly, we do not assume it is public + @param data: response + """ + is_private = is_unlisted = None + renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') or {} + badge_labels = self._extract_badges(renderer) + + # Personal playlists, when authenticated, have a dropdown visibility selector instead of a badge + privacy_dropdown_entries = try_get( + renderer, lambda x: x['privacyForm']['dropdownFormFieldRenderer']['dropdown']['dropdownRenderer']['entries'], list) or [] + for renderer_dict in privacy_dropdown_entries: + is_selected = try_get( + renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False + if not is_selected: + continue + label = self._get_text(renderer_dict, ('privacyDropdownItemRenderer', 'label')) + if label: + badge_labels.add(label.lower()) + break + + for badge_label in badge_labels: + if badge_label == 'unlisted': + is_unlisted = True + elif badge_label == 'private': + is_private = True + elif badge_label == 'public': + is_unlisted = is_private = False + return self._availability(is_private, False, False, False, is_unlisted) + + @staticmethod + def _extract_sidebar_info_renderer(data, info_renderer, expected_type=dict): + sidebar_renderer = try_get( + data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) or [] + for item in sidebar_renderer: + renderer = try_get(item, lambda x: x[info_renderer], expected_type) + if renderer: + return renderer + + def _reload_with_unavailable_videos(self, item_id, data, ytcfg): + """ + Get playlist with unavailable videos if the 'show unavailable videos' button exists. + """ + browse_id = params = None + renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') + if not renderer: + return + menu_renderer = try_get( + renderer, lambda x: x['menu']['menuRenderer']['items'], list) or [] + for menu_item in menu_renderer: + if not isinstance(menu_item, dict): + continue + nav_item_renderer = menu_item.get('menuNavigationItemRenderer') + text = try_get( + nav_item_renderer, lambda x: x['text']['simpleText'], compat_str) + if not text or text.lower() != 'show unavailable videos': + continue + browse_endpoint = try_get( + nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {} + browse_id = browse_endpoint.get('browseId') + params = browse_endpoint.get('params') + break + + headers = self.generate_api_headers( + ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data), + visitor_data=self._extract_visitor_data(data, ytcfg)) + query = { + 'params': params or 'wgYCCAA=', + 'browseId': browse_id or 'VL%s' % item_id + } + return self._extract_response( + item_id=item_id, headers=headers, query=query, + check_get_keys='contents', fatal=False, ytcfg=ytcfg, + note='Downloading API JSON with unavailable videos') + + def _extract_webpage(self, url, item_id, fatal=True): + retries = self.get_param('extractor_retries', 3) + count = -1 + webpage = data = last_error = None + while count < retries: + count += 1 + # Sometimes youtube returns a webpage with incomplete ytInitialData + # See: https://github.com/yt-dlp/yt-dlp/issues/116 + if last_error: + self.report_warning('%s. Retrying ...' % last_error) + try: + webpage = self._download_webpage( + url, item_id, + note='Downloading webpage%s' % (' (retry #%d)' % count if count else '',)) + data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {} + except ExtractorError as e: + if isinstance(e.cause, network_exceptions): + if not isinstance(e.cause, compat_HTTPError) or e.cause.code not in (403, 429): + last_error = error_to_compat_str(e.cause or e.msg) + if count < retries: + continue + if fatal: + raise + self.report_warning(error_to_compat_str(e)) + break + else: + try: + self._extract_and_report_alerts(data) + except ExtractorError as e: + if fatal: + raise + self.report_warning(error_to_compat_str(e)) + break + + if dict_get(data, ('contents', 'currentVideoEndpoint')): + break + + last_error = 'Incomplete yt initial data received' + if count >= retries: + if fatal: + raise ExtractorError(last_error) + self.report_warning(last_error) + break + + return webpage, data + + def _extract_data(self, url, item_id, ytcfg=None, fatal=True, webpage_fatal=False, default_client='web'): + data = None + if 'webpage' not in self._configuration_arg('skip'): + webpage, data = self._extract_webpage(url, item_id, fatal=webpage_fatal) + ytcfg = ytcfg or self.extract_ytcfg(item_id, webpage) + if not data: + if not ytcfg and self.is_authenticated: + msg = 'Playlists that require authentication may not extract correctly without a successful webpage download.' + if 'authcheck' not in self._configuration_arg('skip') and fatal: + raise ExtractorError( + msg + ' If you are not downloading private content, or your cookies are only for the first account and channel,' + ' pass "--extractor-args youtubetab:skip=authcheck" to skip this check', + expected=True) + self.report_warning(msg, only_once=True) + data = self._extract_tab_endpoint(url, item_id, ytcfg, fatal=fatal, default_client=default_client) + return data, ytcfg + + def _extract_tab_endpoint(self, url, item_id, ytcfg=None, fatal=True, default_client='web'): + headers = self.generate_api_headers(ytcfg=ytcfg, default_client=default_client) + resolve_response = self._extract_response( + item_id=item_id, query={'url': url}, check_get_keys='endpoint', headers=headers, ytcfg=ytcfg, fatal=fatal, + ep='navigation/resolve_url', note='Downloading API parameters API JSON', default_client=default_client) + endpoints = {'browseEndpoint': 'browse', 'watchEndpoint': 'next'} + for ep_key, ep in endpoints.items(): + params = try_get(resolve_response, lambda x: x['endpoint'][ep_key], dict) + if params: + return self._extract_response( + item_id=item_id, query=params, ep=ep, headers=headers, + ytcfg=ytcfg, fatal=fatal, default_client=default_client, + check_get_keys=('contents', 'currentVideoEndpoint')) + err_note = 'Failed to resolve url (does the playlist exist?)' + if fatal: + raise ExtractorError(err_note, expected=True) + self.report_warning(err_note, item_id) + + @staticmethod + def _smuggle_data(entries, data): + for entry in entries: + if data: + entry['url'] = smuggle_url(entry['url'], data) + yield entry + + _SEARCH_PARAMS = None + + def _search_results(self, query, params=NO_DEFAULT): + data = {'query': query} + if params is NO_DEFAULT: + params = self._SEARCH_PARAMS + if params: + data['params'] = params + continuation = {} + for page_num in itertools.count(1): + data.update(continuation) + search = self._extract_response( + item_id='query "%s" page %s' % (query, page_num), ep='search', query=data, + check_get_keys=('contents', 'onResponseReceivedCommands') + ) + if not search: + break + slr_contents = try_get( + search, + (lambda x: x['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents'], + lambda x: x['onResponseReceivedCommands'][0]['appendContinuationItemsAction']['continuationItems']), + list) + if not slr_contents: + break + + # Youtube sometimes adds promoted content to searches, + # changing the index location of videos and token. + # So we search through all entries till we find them. + continuation = None + for slr_content in slr_contents: + if not continuation: + continuation = self._extract_continuation({'contents': [slr_content]}) + + isr_contents = try_get( + slr_content, + lambda x: x['itemSectionRenderer']['contents'], + list) + if not isr_contents: + continue + for content in isr_contents: + if not isinstance(content, dict): + continue + video = content.get('videoRenderer') + if not isinstance(video, dict): + continue + video_id = video.get('videoId') + if not video_id: + continue + + yield self._extract_video(video) + + if not continuation: + break + + +class YoutubeTabIE(YoutubeTabBaseInfoExtractor): IE_DESC = 'YouTube Tabs' - _VALID_URL = r'''(?x) - https?:// - (?:\w+\.)? - (?: - youtube(?:kids)?\.com| - %(invidious)s - )/ - (?: - (?Pchannel|c|user|browse)/| - (?P - feed/|hashtag/| - (?:playlist|watch)\?.*?\blist= - )| - (?!(?:%(reserved_names)s)\b) # Direct URLs - ) - (?P[^/?\#&]+) - ''' % { + _VALID_URL = r'''(?x: + https?:// + (?:\w+\.)? + (?: + youtube(?:kids)?\.com| + %(invidious)s + )/ + (?: + (?Pchannel|c|user|browse)/| + (?P + feed/|hashtag/| + (?:playlist|watch)\?.*?\blist= + )| + (?!(?:%(reserved_names)s)\b) # Direct URLs + ) + (?P[^/?\#&]+) + )''' % { 'reserved_names': YoutubeBaseInfoExtractor._RESERVED_NAMES, 'invidious': '|'.join(YoutubeBaseInfoExtractor._INVIDIOUS_SITES), } @@ -3606,621 +4280,6 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor): return False if YoutubeIE.suitable(url) else super( YoutubeTabIE, cls).suitable(url) - def _extract_channel_id(self, webpage): - channel_id = self._html_search_meta( - 'channelId', webpage, 'channel id', default=None) - if channel_id: - return channel_id - channel_url = self._html_search_meta( - ('og:url', 'al:ios:url', 'al:android:url', 'al:web:url', - 'twitter:url', 'twitter:app:url:iphone', 'twitter:app:url:ipad', - 'twitter:app:url:googleplay'), webpage, 'channel url') - return self._search_regex( - r'https?://(?:www\.)?youtube\.com/channel/([^/?#&])+', - channel_url, 'channel id') - - @staticmethod - def _extract_basic_item_renderer(item): - # Modified from _extract_grid_item_renderer - known_basic_renderers = ( - 'playlistRenderer', 'videoRenderer', 'channelRenderer', 'showRenderer' - ) - for key, renderer in item.items(): - if not isinstance(renderer, dict): - continue - elif key in known_basic_renderers: - return renderer - elif key.startswith('grid') and key.endswith('Renderer'): - return renderer - - def _grid_entries(self, grid_renderer): - for item in grid_renderer['items']: - if not isinstance(item, dict): - continue - renderer = self._extract_basic_item_renderer(item) - if not isinstance(renderer, dict): - continue - title = self._get_text(renderer, 'title') - - # playlist - playlist_id = renderer.get('playlistId') - if playlist_id: - yield self.url_result( - 'https://www.youtube.com/playlist?list=%s' % playlist_id, - ie=YoutubeTabIE.ie_key(), video_id=playlist_id, - video_title=title) - continue - # video - video_id = renderer.get('videoId') - if video_id: - yield self._extract_video(renderer) - continue - # channel - channel_id = renderer.get('channelId') - if channel_id: - yield self.url_result( - 'https://www.youtube.com/channel/%s' % channel_id, - ie=YoutubeTabIE.ie_key(), video_title=title) - continue - # generic endpoint URL support - ep_url = urljoin('https://www.youtube.com/', try_get( - renderer, lambda x: x['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'], - compat_str)) - if ep_url: - for ie in (YoutubeTabIE, YoutubePlaylistIE, YoutubeIE): - if ie.suitable(ep_url): - yield self.url_result( - ep_url, ie=ie.ie_key(), video_id=ie._match_id(ep_url), video_title=title) - break - - def _shelf_entries_from_content(self, shelf_renderer): - content = shelf_renderer.get('content') - if not isinstance(content, dict): - return - renderer = content.get('gridRenderer') or content.get('expandedShelfContentsRenderer') - if renderer: - # TODO: add support for nested playlists so each shelf is processed - # as separate playlist - # TODO: this includes only first N items - for entry in self._grid_entries(renderer): - yield entry - renderer = content.get('horizontalListRenderer') - if renderer: - # TODO - pass - - def _shelf_entries(self, shelf_renderer, skip_channels=False): - ep = try_get( - shelf_renderer, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'], - compat_str) - shelf_url = urljoin('https://www.youtube.com', ep) - if shelf_url: - # Skipping links to another channels, note that checking for - # endpoint.commandMetadata.webCommandMetadata.webPageTypwebPageType == WEB_PAGE_TYPE_CHANNEL - # will not work - if skip_channels and '/channels?' in shelf_url: - return - title = self._get_text(shelf_renderer, 'title') - yield self.url_result(shelf_url, video_title=title) - # Shelf may not contain shelf URL, fallback to extraction from content - for entry in self._shelf_entries_from_content(shelf_renderer): - yield entry - - def _playlist_entries(self, video_list_renderer): - for content in video_list_renderer['contents']: - if not isinstance(content, dict): - continue - renderer = content.get('playlistVideoRenderer') or content.get('playlistPanelVideoRenderer') - if not isinstance(renderer, dict): - continue - video_id = renderer.get('videoId') - if not video_id: - continue - yield self._extract_video(renderer) - - def _rich_entries(self, rich_grid_renderer): - renderer = try_get( - rich_grid_renderer, lambda x: x['content']['videoRenderer'], dict) or {} - video_id = renderer.get('videoId') - if not video_id: - return - yield self._extract_video(renderer) - - def _video_entry(self, video_renderer): - video_id = video_renderer.get('videoId') - if video_id: - return self._extract_video(video_renderer) - - def _post_thread_entries(self, post_thread_renderer): - post_renderer = try_get( - post_thread_renderer, lambda x: x['post']['backstagePostRenderer'], dict) - if not post_renderer: - return - # video attachment - video_renderer = try_get( - post_renderer, lambda x: x['backstageAttachment']['videoRenderer'], dict) or {} - video_id = video_renderer.get('videoId') - if video_id: - entry = self._extract_video(video_renderer) - if entry: - yield entry - # playlist attachment - playlist_id = try_get( - post_renderer, lambda x: x['backstageAttachment']['playlistRenderer']['playlistId'], compat_str) - if playlist_id: - yield self.url_result( - 'https://www.youtube.com/playlist?list=%s' % playlist_id, - ie=YoutubeTabIE.ie_key(), video_id=playlist_id) - # inline video links - runs = try_get(post_renderer, lambda x: x['contentText']['runs'], list) or [] - for run in runs: - if not isinstance(run, dict): - continue - ep_url = try_get( - run, lambda x: x['navigationEndpoint']['urlEndpoint']['url'], compat_str) - if not ep_url: - continue - if not YoutubeIE.suitable(ep_url): - continue - ep_video_id = YoutubeIE._match_id(ep_url) - if video_id == ep_video_id: - continue - yield self.url_result(ep_url, ie=YoutubeIE.ie_key(), video_id=ep_video_id) - - def _post_thread_continuation_entries(self, post_thread_continuation): - contents = post_thread_continuation.get('contents') - if not isinstance(contents, list): - return - for content in contents: - renderer = content.get('backstagePostThreadRenderer') - if not isinstance(renderer, dict): - continue - for entry in self._post_thread_entries(renderer): - yield entry - - r''' # unused - def _rich_grid_entries(self, contents): - for content in contents: - video_renderer = try_get(content, lambda x: x['richItemRenderer']['content']['videoRenderer'], dict) - if video_renderer: - entry = self._video_entry(video_renderer) - if entry: - yield entry - ''' - def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data): - - def extract_entries(parent_renderer): # this needs to called again for continuation to work with feeds - contents = try_get(parent_renderer, lambda x: x['contents'], list) or [] - for content in contents: - if not isinstance(content, dict): - continue - is_renderer = try_get(content, lambda x: x['itemSectionRenderer'], dict) - if not is_renderer: - renderer = content.get('richItemRenderer') - if renderer: - for entry in self._rich_entries(renderer): - yield entry - continuation_list[0] = self._extract_continuation(parent_renderer) - continue - isr_contents = try_get(is_renderer, lambda x: x['contents'], list) or [] - for isr_content in isr_contents: - if not isinstance(isr_content, dict): - continue - - known_renderers = { - 'playlistVideoListRenderer': self._playlist_entries, - 'gridRenderer': self._grid_entries, - 'shelfRenderer': lambda x: self._shelf_entries(x, tab.get('title') != 'Channels'), - 'backstagePostThreadRenderer': self._post_thread_entries, - 'videoRenderer': lambda x: [self._video_entry(x)], - } - for key, renderer in isr_content.items(): - if key not in known_renderers: - continue - for entry in known_renderers[key](renderer): - if entry: - yield entry - continuation_list[0] = self._extract_continuation(renderer) - break - - if not continuation_list[0]: - continuation_list[0] = self._extract_continuation(is_renderer) - - if not continuation_list[0]: - continuation_list[0] = self._extract_continuation(parent_renderer) - - continuation_list = [None] # Python 2 does not support nonlocal - tab_content = try_get(tab, lambda x: x['content'], dict) - if not tab_content: - return - parent_renderer = ( - try_get(tab_content, lambda x: x['sectionListRenderer'], dict) - or try_get(tab_content, lambda x: x['richGridRenderer'], dict) or {}) - for entry in extract_entries(parent_renderer): - yield entry - continuation = continuation_list[0] - - for page_num in itertools.count(1): - if not continuation: - break - headers = self.generate_api_headers( - ytcfg=ytcfg, account_syncid=account_syncid, visitor_data=visitor_data) - response = self._extract_response( - item_id='%s page %s' % (item_id, page_num), - query=continuation, headers=headers, ytcfg=ytcfg, - check_get_keys=('continuationContents', 'onResponseReceivedActions', 'onResponseReceivedEndpoints')) - - if not response: - break - # Extracting updated visitor data is required to prevent an infinite extraction loop in some cases - # See: https://github.com/ytdl-org/youtube-dl/issues/28702 - visitor_data = self._extract_visitor_data(response) or visitor_data - - known_continuation_renderers = { - 'playlistVideoListContinuation': self._playlist_entries, - 'gridContinuation': self._grid_entries, - 'itemSectionContinuation': self._post_thread_continuation_entries, - 'sectionListContinuation': extract_entries, # for feeds - } - continuation_contents = try_get( - response, lambda x: x['continuationContents'], dict) or {} - continuation_renderer = None - for key, value in continuation_contents.items(): - if key not in known_continuation_renderers: - continue - continuation_renderer = value - continuation_list = [None] - for entry in known_continuation_renderers[key](continuation_renderer): - yield entry - continuation = continuation_list[0] or self._extract_continuation(continuation_renderer) - break - if continuation_renderer: - continue - - known_renderers = { - 'gridPlaylistRenderer': (self._grid_entries, 'items'), - 'gridVideoRenderer': (self._grid_entries, 'items'), - 'gridChannelRenderer': (self._grid_entries, 'items'), - 'playlistVideoRenderer': (self._playlist_entries, 'contents'), - 'itemSectionRenderer': (extract_entries, 'contents'), # for feeds - 'richItemRenderer': (extract_entries, 'contents'), # for hashtag - 'backstagePostThreadRenderer': (self._post_thread_continuation_entries, 'contents') - } - on_response_received = dict_get(response, ('onResponseReceivedActions', 'onResponseReceivedEndpoints')) - continuation_items = try_get( - on_response_received, lambda x: x[0]['appendContinuationItemsAction']['continuationItems'], list) - continuation_item = try_get(continuation_items, lambda x: x[0], dict) or {} - video_items_renderer = None - for key, value in continuation_item.items(): - if key not in known_renderers: - continue - video_items_renderer = {known_renderers[key][1]: continuation_items} - continuation_list = [None] - for entry in known_renderers[key][0](video_items_renderer): - yield entry - continuation = continuation_list[0] or self._extract_continuation(video_items_renderer) - break - if video_items_renderer: - continue - break - - @staticmethod - def _extract_selected_tab(tabs): - for tab in tabs: - renderer = dict_get(tab, ('tabRenderer', 'expandableTabRenderer')) or {} - if renderer.get('selected') is True: - return renderer - else: - raise ExtractorError('Unable to find selected tab') - - @classmethod - def _extract_uploader(cls, data): - uploader = {} - renderer = cls._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {} - owner = try_get( - renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict) - if owner: - uploader['uploader'] = owner.get('text') - uploader['uploader_id'] = try_get( - owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str) - uploader['uploader_url'] = urljoin( - 'https://www.youtube.com/', - try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str)) - return {k: v for k, v in uploader.items() if v is not None} - - def _extract_from_tabs(self, item_id, ytcfg, data, tabs): - playlist_id = title = description = channel_url = channel_name = channel_id = None - thumbnails_list = [] - tags = [] - - selected_tab = self._extract_selected_tab(tabs) - renderer = try_get( - data, lambda x: x['metadata']['channelMetadataRenderer'], dict) - if renderer: - channel_name = renderer.get('title') - channel_url = renderer.get('channelUrl') - channel_id = renderer.get('externalId') - else: - renderer = try_get( - data, lambda x: x['metadata']['playlistMetadataRenderer'], dict) - - if renderer: - title = renderer.get('title') - description = renderer.get('description', '') - playlist_id = channel_id - tags = renderer.get('keywords', '').split() - thumbnails_list = ( - try_get(renderer, lambda x: x['avatar']['thumbnails'], list) - or try_get( - self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer'), - lambda x: x['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'], - list) - or []) - - thumbnails = [] - for t in thumbnails_list: - if not isinstance(t, dict): - continue - thumbnail_url = url_or_none(t.get('url')) - if not thumbnail_url: - continue - thumbnails.append({ - 'url': thumbnail_url, - 'width': int_or_none(t.get('width')), - 'height': int_or_none(t.get('height')), - }) - if playlist_id is None: - playlist_id = item_id - if title is None: - title = ( - try_get(data, lambda x: x['header']['hashtagHeaderRenderer']['hashtag']['simpleText']) - or playlist_id) - title += format_field(selected_tab, 'title', ' - %s') - title += format_field(selected_tab, 'expandedText', ' - %s') - metadata = { - 'playlist_id': playlist_id, - 'playlist_title': title, - 'playlist_description': description, - 'uploader': channel_name, - 'uploader_id': channel_id, - 'uploader_url': channel_url, - 'thumbnails': thumbnails, - 'tags': tags, - } - availability = self._extract_availability(data) - if availability: - metadata['availability'] = availability - if not channel_id: - metadata.update(self._extract_uploader(data)) - metadata.update({ - 'channel': metadata['uploader'], - 'channel_id': metadata['uploader_id'], - 'channel_url': metadata['uploader_url']}) - return self.playlist_result( - self._entries( - selected_tab, playlist_id, ytcfg, - self._extract_account_syncid(ytcfg, data), - self._extract_visitor_data(data, ytcfg)), - **metadata) - - def _extract_mix_playlist(self, playlist, playlist_id, data, ytcfg): - first_id = last_id = response = None - for page_num in itertools.count(1): - videos = list(self._playlist_entries(playlist)) - if not videos: - return - start = next((i for i, v in enumerate(videos) if v['id'] == last_id), -1) + 1 - if start >= len(videos): - return - for video in videos[start:]: - if video['id'] == first_id: - self.to_screen('First video %s found again; Assuming end of Mix' % first_id) - return - yield video - first_id = first_id or videos[0]['id'] - last_id = videos[-1]['id'] - watch_endpoint = try_get( - playlist, lambda x: x['contents'][-1]['playlistPanelVideoRenderer']['navigationEndpoint']['watchEndpoint']) - headers = self.generate_api_headers( - ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data), - visitor_data=self._extract_visitor_data(response, data, ytcfg)) - query = { - 'playlistId': playlist_id, - 'videoId': watch_endpoint.get('videoId') or last_id, - 'index': watch_endpoint.get('index') or len(videos), - 'params': watch_endpoint.get('params') or 'OAE%3D' - } - response = self._extract_response( - item_id='%s page %d' % (playlist_id, page_num), - query=query, ep='next', headers=headers, ytcfg=ytcfg, - check_get_keys='contents' - ) - playlist = try_get( - response, lambda x: x['contents']['twoColumnWatchNextResults']['playlist']['playlist'], dict) - - def _extract_from_playlist(self, item_id, url, data, playlist, ytcfg): - title = playlist.get('title') or try_get( - data, lambda x: x['titleText']['simpleText'], compat_str) - playlist_id = playlist.get('playlistId') or item_id - - # Delegating everything except mix playlists to regular tab-based playlist URL - playlist_url = urljoin(url, try_get( - playlist, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'], - compat_str)) - if playlist_url and playlist_url != url: - return self.url_result( - playlist_url, ie=YoutubeTabIE.ie_key(), video_id=playlist_id, - video_title=title) - - return self.playlist_result( - self._extract_mix_playlist(playlist, playlist_id, data, ytcfg), - playlist_id=playlist_id, playlist_title=title) - - def _extract_availability(self, data): - """ - Gets the availability of a given playlist/tab. - Note: Unless YouTube tells us explicitly, we do not assume it is public - @param data: response - """ - is_private = is_unlisted = None - renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') or {} - badge_labels = self._extract_badges(renderer) - - # Personal playlists, when authenticated, have a dropdown visibility selector instead of a badge - privacy_dropdown_entries = try_get( - renderer, lambda x: x['privacyForm']['dropdownFormFieldRenderer']['dropdown']['dropdownRenderer']['entries'], list) or [] - for renderer_dict in privacy_dropdown_entries: - is_selected = try_get( - renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False - if not is_selected: - continue - label = self._get_text(renderer_dict, ('privacyDropdownItemRenderer', 'label')) - if label: - badge_labels.add(label.lower()) - break - - for badge_label in badge_labels: - if badge_label == 'unlisted': - is_unlisted = True - elif badge_label == 'private': - is_private = True - elif badge_label == 'public': - is_unlisted = is_private = False - return self._availability(is_private, False, False, False, is_unlisted) - - @staticmethod - def _extract_sidebar_info_renderer(data, info_renderer, expected_type=dict): - sidebar_renderer = try_get( - data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) or [] - for item in sidebar_renderer: - renderer = try_get(item, lambda x: x[info_renderer], expected_type) - if renderer: - return renderer - - def _reload_with_unavailable_videos(self, item_id, data, ytcfg): - """ - Get playlist with unavailable videos if the 'show unavailable videos' button exists. - """ - browse_id = params = None - renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') - if not renderer: - return - menu_renderer = try_get( - renderer, lambda x: x['menu']['menuRenderer']['items'], list) or [] - for menu_item in menu_renderer: - if not isinstance(menu_item, dict): - continue - nav_item_renderer = menu_item.get('menuNavigationItemRenderer') - text = try_get( - nav_item_renderer, lambda x: x['text']['simpleText'], compat_str) - if not text or text.lower() != 'show unavailable videos': - continue - browse_endpoint = try_get( - nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {} - browse_id = browse_endpoint.get('browseId') - params = browse_endpoint.get('params') - break - - headers = self.generate_api_headers( - ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data), - visitor_data=self._extract_visitor_data(data, ytcfg)) - query = { - 'params': params or 'wgYCCAA=', - 'browseId': browse_id or 'VL%s' % item_id - } - return self._extract_response( - item_id=item_id, headers=headers, query=query, - check_get_keys='contents', fatal=False, ytcfg=ytcfg, - note='Downloading API JSON with unavailable videos') - - def _extract_webpage(self, url, item_id, fatal=True): - retries = self.get_param('extractor_retries', 3) - count = -1 - webpage = data = last_error = None - while count < retries: - count += 1 - # Sometimes youtube returns a webpage with incomplete ytInitialData - # See: https://github.com/yt-dlp/yt-dlp/issues/116 - if last_error: - self.report_warning('%s. Retrying ...' % last_error) - try: - webpage = self._download_webpage( - url, item_id, - note='Downloading webpage%s' % (' (retry #%d)' % count if count else '',)) - data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {} - except ExtractorError as e: - if isinstance(e.cause, network_exceptions): - if not isinstance(e.cause, compat_HTTPError) or e.cause.code not in (403, 429): - last_error = error_to_compat_str(e.cause or e.msg) - if count < retries: - continue - if fatal: - raise - self.report_warning(error_to_compat_str(e)) - break - else: - try: - self._extract_and_report_alerts(data) - except ExtractorError as e: - if fatal: - raise - self.report_warning(error_to_compat_str(e)) - break - - if dict_get(data, ('contents', 'currentVideoEndpoint')): - break - - last_error = 'Incomplete yt initial data received' - if count >= retries: - if fatal: - raise ExtractorError(last_error) - self.report_warning(last_error) - break - - return webpage, data - - def _extract_data(self, url, item_id, ytcfg=None, fatal=True, webpage_fatal=False, default_client='web'): - data = None - if 'webpage' not in self._configuration_arg('skip'): - webpage, data = self._extract_webpage(url, item_id, fatal=webpage_fatal) - ytcfg = ytcfg or self.extract_ytcfg(item_id, webpage) - if not data: - if not ytcfg and self.is_authenticated: - msg = 'Playlists that require authentication may not extract correctly without a successful webpage download.' - if 'authcheck' not in self._configuration_arg('skip') and fatal: - raise ExtractorError( - msg + ' If you are not downloading private content, or your cookies are only for the first account and channel,' - ' pass "--extractor-args youtubetab:skip=authcheck" to skip this check', - expected=True) - self.report_warning(msg, only_once=True) - data = self._extract_tab_endpoint(url, item_id, ytcfg, fatal=fatal, default_client=default_client) - return data, ytcfg - - def _extract_tab_endpoint(self, url, item_id, ytcfg=None, fatal=True, default_client='web'): - headers = self.generate_api_headers(ytcfg=ytcfg, default_client=default_client) - resolve_response = self._extract_response( - item_id=item_id, query={'url': url}, check_get_keys='endpoint', headers=headers, ytcfg=ytcfg, fatal=fatal, - ep='navigation/resolve_url', note='Downloading API parameters API JSON', default_client=default_client) - endpoints = {'browseEndpoint': 'browse', 'watchEndpoint': 'next'} - for ep_key, ep in endpoints.items(): - params = try_get(resolve_response, lambda x: x['endpoint'][ep_key], dict) - if params: - return self._extract_response( - item_id=item_id, query=params, ep=ep, headers=headers, - ytcfg=ytcfg, fatal=fatal, default_client=default_client, - check_get_keys=('contents', 'currentVideoEndpoint')) - err_note = 'Failed to resolve url (does the playlist exist?)' - if fatal: - raise ExtractorError(err_note, expected=True) - self.report_warning(err_note, item_id) - - @staticmethod - def _smuggle_data(entries, data): - for entry in entries: - if data: - entry['url'] = smuggle_url(entry['url'], data) - yield entry - def _real_extract(self, url): url, smuggled_data = unsmuggle_url(url, {}) if self.is_music_url(url): @@ -4506,77 +4565,24 @@ class YoutubeFavouritesIE(YoutubeBaseInfoExtractor): ie=YoutubeTabIE.ie_key()) -class YoutubeSearchIE(SearchInfoExtractor, YoutubeTabIE): - IE_DESC = 'YouTube searches' +class YoutubeSearchIE(YoutubeTabBaseInfoExtractor, SearchInfoExtractor): + IE_DESC = 'YouTube search' IE_NAME = 'youtube:search' _SEARCH_KEY = 'ytsearch' _SEARCH_PARAMS = None _TESTS = [] - def _search_results(self, query): - data = {'query': query} - if self._SEARCH_PARAMS: - data['params'] = self._SEARCH_PARAMS - continuation = {} - for page_num in itertools.count(1): - data.update(continuation) - search = self._extract_response( - item_id='query "%s" page %s' % (query, page_num), ep='search', query=data, - check_get_keys=('contents', 'onResponseReceivedCommands') - ) - if not search: - break - slr_contents = try_get( - search, - (lambda x: x['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents'], - lambda x: x['onResponseReceivedCommands'][0]['appendContinuationItemsAction']['continuationItems']), - list) - if not slr_contents: - break - - # Youtube sometimes adds promoted content to searches, - # changing the index location of videos and token. - # So we search through all entries till we find them. - continuation = None - for slr_content in slr_contents: - if not continuation: - continuation = self._extract_continuation({'contents': [slr_content]}) - - isr_contents = try_get( - slr_content, - lambda x: x['itemSectionRenderer']['contents'], - list) - if not isr_contents: - continue - for content in isr_contents: - if not isinstance(content, dict): - continue - video = content.get('videoRenderer') - if not isinstance(video, dict): - continue - video_id = video.get('videoId') - if not video_id: - continue - - yield self._extract_video(video) - - if not continuation: - break - - -class YoutubeSearchDateIE(YoutubeSearchIE): +class YoutubeSearchDateIE(SearchInfoExtractor, YoutubeTabBaseInfoExtractor): IE_NAME = YoutubeSearchIE.IE_NAME + ':date' _SEARCH_KEY = 'ytsearchdate' - IE_DESC = 'YouTube searches, newest videos first' + IE_DESC = 'YouTube search, newest videos first' _SEARCH_PARAMS = 'CAI%3D' -class YoutubeSearchURLIE(YoutubeSearchIE): +class YoutubeSearchURLIE(YoutubeTabBaseInfoExtractor): IE_DESC = 'YouTube search URLs with sorting and filter support' IE_NAME = YoutubeSearchIE.IE_NAME + '_url' - _SEARCH_KEY = None _VALID_URL = r'https?://(?:www\.)?youtube\.com/results\?(.*?&)?(?:search_query|q)=(?:[^&]+)(?:[&]|$)' - # _MAX_RESULTS = 100 _TESTS = [{ 'url': 'https://www.youtube.com/results?baz=bar&search_query=youtube-dl+test+video&filters=video&lclk=video', 'playlist_mincount': 5, @@ -4589,15 +4595,10 @@ class YoutubeSearchURLIE(YoutubeSearchIE): 'only_matching': True, }] - @classmethod - def _make_valid_url(cls): - return cls._VALID_URL - def _real_extract(self, url): qs = parse_qs(url) query = (qs.get('search_query') or qs.get('q'))[0] - self._SEARCH_PARAMS = qs.get('sp', ('',))[0] - return self._get_n_results(query, self._MAX_RESULTS) + return self.playlist_result(self._search_results(query, qs.get('sp', (None,))[0]), query, query) class YoutubeFeedsInfoExtractor(YoutubeTabIE):