import itertools import json import random import string from .common import InfoExtractor from ..utils import ( ExtractorError, format_field, int_or_none, str_or_none, try_get, ) class TrovoBaseIE(InfoExtractor): _VALID_URL_BASE = r'https?://(?:www\.)?trovo\.live/' _HEADERS = {'Origin': 'https://trovo.live'} def _call_api(self, video_id, data): if 'persistedQuery' in data.get('extensions', {}): url = 'https://gql.trovo.live' else: url = 'https://api-web.trovo.live/graphql' resp = self._download_json( url, video_id, data=json.dumps([data]).encode(), headers={'Accept': 'application/json'}, query={ 'qid': ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)), })[0] if 'errors' in resp: raise ExtractorError(f'Trovo said: {resp["errors"][0]["message"]}') return resp['data'][data['operationName']] def _extract_streamer_info(self, data): streamer_info = data.get('streamerInfo') or {} username = streamer_info.get('userName') return { 'uploader': streamer_info.get('nickName'), 'uploader_id': str_or_none(streamer_info.get('uid')), 'uploader_url': format_field(username, None, 'https://trovo.live/%s'), } class TrovoIE(TrovoBaseIE): _VALID_URL = TrovoBaseIE._VALID_URL_BASE + r'(?!(?:clip|video)/)(?P[^/?&#]+)' def _real_extract(self, url): username = self._match_id(url) live_info = self._call_api(username, data={ 'operationName': 'live_LiveReaderService_GetLiveInfo', 'variables': { 'params': { 'userName': username, }, }, }) if live_info.get('isLive') == 0: raise ExtractorError('%s is offline' % username, expected=True) program_info = live_info['programInfo'] program_id = program_info['id'] title = program_info['title'] formats = [] for stream_info in (program_info.get('streamInfo') or []): play_url = stream_info.get('playUrl') if not play_url: continue format_id = stream_info.get('desc') formats.append({ 'format_id': format_id, 'height': int_or_none(format_id[:-1]) if format_id else None, 'url': play_url, 'http_headers': self._HEADERS, }) self._sort_formats(formats) info = { 'id': program_id, 'title': title, 'formats': formats, 'thumbnail': program_info.get('coverUrl'), 'is_live': True, } info.update(self._extract_streamer_info(live_info)) return info class TrovoVodIE(TrovoBaseIE): _VALID_URL = TrovoBaseIE._VALID_URL_BASE + r'(?:clip|video)/(?P[^/?&#]+)' _TESTS = [{ 'url': 'https://trovo.live/clip/lc-5285890818705062210?ltab=videos', 'params': {'getcomments': True}, 'info_dict': { 'id': 'lc-5285890818705062210', 'ext': 'mp4', 'title': 'fatal moaning for a super good🤣🤣', 'uploader': 'OneTappedYou', 'timestamp': 1621628019, 'upload_date': '20210521', 'uploader_id': '100719456', 'duration': 31, 'view_count': int, 'like_count': int, 'comment_count': int, 'comments': 'mincount:1', 'categories': ['Call of Duty: Mobile'], 'uploader_url': 'https://trovo.live/OneTappedYou', 'thumbnail': r're:^https?://.*\.jpg', }, }, { 'url': 'https://trovo.live/video/ltv-100095501_100095501_1609596043', 'only_matching': True, }] def _real_extract(self, url): vid = self._match_id(url) # NOTE: It is also possible to extract this info from the Nuxt data on the website, # however that seems unreliable - sometimes it randomly doesn't return the data, # at least when using a non-residential IP. resp = self._call_api(vid, data={ 'operationName': 'batchGetVodDetailInfo', 'variables': { 'params': { 'vids': [vid], }, }, 'extensions': { 'persistedQuery': { 'version': 1, 'sha256Hash': 'ceae0355d66476e21a1dd8e8af9f68de95b4019da2cda8b177c9a2255dad31d0', }, }, }) vod_detail_info = resp['VodDetailInfos'][vid] vod_info = vod_detail_info['vodInfo'] title = vod_info['title'] if try_get(vod_info, lambda x: x['playbackRights']['playbackRights'] != 'Normal'): playback_rights_setting = vod_info['playbackRights']['playbackRightsSetting'] if playback_rights_setting == 'SubscriberOnly': raise ExtractorError('This video is only available for subscribers', expected=True) else: raise ExtractorError(f'This video is not available ({playback_rights_setting})', expected=True) language = vod_info.get('languageName') formats = [] for play_info in (vod_info.get('playInfos') or []): play_url = play_info.get('playUrl') if not play_url: continue format_id = play_info.get('desc') formats.append({ 'ext': 'mp4', 'filesize': int_or_none(play_info.get('fileSize')), 'format_id': format_id, 'height': int_or_none(format_id[:-1]) if format_id else None, 'language': language, 'protocol': 'm3u8_native', 'tbr': int_or_none(play_info.get('bitrate')), 'url': play_url, 'http_headers': self._HEADERS, }) self._sort_formats(formats) category = vod_info.get('categoryName') get_count = lambda x: int_or_none(vod_info.get(x + 'Num')) info = { 'id': vid, 'title': title, 'formats': formats, 'thumbnail': vod_info.get('coverUrl'), 'timestamp': int_or_none(vod_info.get('publishTs')), 'duration': int_or_none(vod_info.get('duration')), 'view_count': get_count('watch'), 'like_count': get_count('like'), 'comment_count': get_count('comment'), 'categories': [category] if category else None, '__post_extractor': self.extract_comments(vid), } info.update(self._extract_streamer_info(vod_detail_info)) return info def _get_comments(self, vid): for page in itertools.count(1): comments_json = self._call_api(vid, data={ 'operationName': 'getCommentList', 'variables': { 'params': { 'appInfo': { 'postID': vid, }, 'preview': {}, 'pageSize': 99, 'page': page, }, }, 'extensions': { 'persistedQuery': { 'version': 1, 'sha256Hash': 'be8e5f9522ddac7f7c604c0d284fd22481813263580849926c4c66fb767eed25', }, }, }) for comment in comments_json['commentList']: content = comment.get('content') if not content: continue author = comment.get('author') or {} parent = comment.get('parentID') yield { 'author': author.get('nickName'), 'author_id': str_or_none(author.get('uid')), 'id': str_or_none(comment.get('commentID')), 'text': content, 'timestamp': int_or_none(comment.get('createdAt')), 'parent': 'root' if parent == 0 else str_or_none(parent), } if comments_json['lastPage']: break class TrovoChannelBaseIE(TrovoBaseIE): def _get_vod_json(self, page, uid): raise NotImplementedError('This method must be implemented by subclasses') def _entries(self, uid): for page in itertools.count(1): vod_json = self._get_vod_json(page, uid) vods = vod_json.get('vodInfos', []) for vod in vods: yield self.url_result( 'https://trovo.live/%s/%s' % (self._TYPE, vod.get('vid')), ie=TrovoVodIE.ie_key()) has_more = vod_json['hasMore'] if not has_more: break def _real_extract(self, url): id = self._match_id(url) live_info = self._call_api(id, data={ 'operationName': 'live_LiveReaderService_GetLiveInfo', 'variables': { 'params': { 'userName': id, }, }, }) uid = str(live_info['streamerInfo']['uid']) return self.playlist_result(self._entries(uid), playlist_id=uid) class TrovoChannelVodIE(TrovoChannelBaseIE): _VALID_URL = r'trovovod:(?P[^\s]+)' IE_DESC = 'All VODs of a trovo.live channel; "trovovod:" prefix' _TESTS = [{ 'url': 'trovovod:OneTappedYou', 'playlist_mincount': 24, 'info_dict': { 'id': '100719456', }, }] _TYPE = 'video' def _get_vod_json(self, page, uid): return self._call_api(uid, data={ 'operationName': 'getChannelLtvVideoInfos', 'variables': { 'params': { 'channelID': int(uid), 'pageSize': 99, 'currPage': page, }, }, 'extensions': { 'persistedQuery': { 'version': 1, 'sha256Hash': '78fe32792005eab7e922cafcdad9c56bed8bbc5f5df3c7cd24fcb84a744f5f78', }, }, }) class TrovoChannelClipIE(TrovoChannelBaseIE): _VALID_URL = r'trovoclip:(?P[^\s]+)' IE_DESC = 'All Clips of a trovo.live channel; "trovoclip:" prefix' _TESTS = [{ 'url': 'trovoclip:OneTappedYou', 'playlist_mincount': 29, 'info_dict': { 'id': '100719456', }, }] _TYPE = 'clip' def _get_vod_json(self, page, uid): return self._call_api(uid, data={ 'operationName': 'getChannelClipVideoInfos', 'variables': { 'params': { 'channelID': int(uid), 'pageSize': 99, 'currPage': page, }, }, 'extensions': { 'persistedQuery': { 'version': 1, 'sha256Hash': 'e7924bfe20059b5c75fc8ff9e7929f43635681a7bdf3befa01072ed22c8eff31', }, }, })