[extractor/instagram] Fix post/story extractors (#4074)

Closes #4343, #3077, #2736, #3002
Authored by: pritam20ps05, pukkandan
This commit is contained in:
Pritam Das 2022-07-15 22:14:43 +05:30 committed by GitHub
parent 88f60feb32
commit e3e606de12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 99 additions and 86 deletions

View File

@ -1,17 +1,17 @@
import itertools
import hashlib
import itertools
import json
import re
import time
import urllib.error
from .common import InfoExtractor
from ..compat import (
compat_HTTPError,
)
from ..utils import (
ExtractorError,
format_field,
decode_base_n,
encode_base_n,
float_or_none,
format_field,
get_element_by_attribute,
int_or_none,
lowercase_escape,
@ -22,6 +22,18 @@ from ..utils import (
urlencode_postdata,
)
_ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
def _pk_to_id(id):
"""Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
return encode_base_n(int(id.split('_')[0]), table=_ENCODING_CHARS)
def _id_to_pk(shortcode):
"""Covert a shortcode to a numeric value"""
return decode_base_n(shortcode[:11], table=_ENCODING_CHARS)
class InstagramBaseIE(InfoExtractor):
_NETRC_MACHINE = 'instagram'
@ -156,6 +168,15 @@ class InstagramBaseIE(InfoExtractor):
if isinstance(product_info, list):
product_info = product_info[0]
comment_data = traverse_obj(product_info, ('edge_media_to_parent_comment', 'edges'))
comments = [{
'author': traverse_obj(comment_dict, ('node', 'owner', 'username')),
'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id')),
'id': traverse_obj(comment_dict, ('node', 'id')),
'text': traverse_obj(comment_dict, ('node', 'text')),
'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), expected_type=int_or_none),
} for comment_dict in comment_data] if comment_data else None
user_info = product_info.get('user') or {}
info_dict = {
'id': product_info.get('code') or product_info.get('id'),
@ -168,6 +189,7 @@ class InstagramBaseIE(InfoExtractor):
'view_count': int_or_none(product_info.get('view_count')),
'like_count': int_or_none(product_info.get('like_count')),
'comment_count': int_or_none(product_info.get('comment_count')),
'comments': comments,
'http_headers': {
'Referer': 'https://www.instagram.com/',
}
@ -214,23 +236,9 @@ class InstagramIOSIE(InfoExtractor):
'add_ie': ['Instagram']
}]
def _get_id(self, id):
"""Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
chrs = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
media_id = int(id.split('_')[0])
shortened_id = ''
while media_id > 0:
r = media_id % 64
media_id = (media_id - r) // 64
shortened_id = chrs[r] + shortened_id
return shortened_id
def _real_extract(self, url):
return {
'_type': 'url_transparent',
'url': f'http://instagram.com/tv/{self._get_id(self._match_id(url))}/',
'ie_key': 'Instagram',
}
video_id = _pk_to_id(self._match_id(url))
return self.url_result(f'http://instagram.com/tv/{video_id}', InstagramIE, video_id)
class InstagramIE(InstagramBaseIE):
@ -358,39 +366,49 @@ class InstagramIE(InstagramBaseIE):
def _real_extract(self, url):
video_id, url = self._match_valid_url(url).group('id', 'url')
webpage, urlh = self._download_webpage_handle(url, video_id)
if 'www.instagram.com/accounts/login' in urlh.geturl():
self.report_warning('Main webpage is locked behind the login page. '
'Retrying with embed webpage (Note that some metadata might be missing)')
webpage = self._download_webpage(
'https://www.instagram.com/p/%s/embed/' % video_id, video_id, note='Downloading embed webpage')
shared_data = self._parse_json(
self._search_regex(
r'window\._sharedData\s*=\s*({.+?});',
webpage, 'shared data', default='{}'),
video_id, fatal=False)
media = traverse_obj(
shared_data,
('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'),
('entry_data', 'PostPage', 0, 'media'),
expected_type=dict)
# _sharedData.entry_data.PostPage is empty when authenticated (see
# https://github.com/ytdl-org/youtube-dl/pull/22880)
general_info = self._download_json(
f'https://www.instagram.com/graphql/query/?query_hash=9f8827793ef34641b2fb195d4d41151c'
f'&variables=%7B"shortcode":"{video_id}",'
'"parent_comment_count":10,"has_threaded_comments":true}', video_id, fatal=False, errnote=False,
headers={
'Accept': '*',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'Authority': 'www.instagram.com',
'Referer': 'https://www.instagram.com',
'x-ig-app-id': '936619743392459',
})
media = traverse_obj(general_info, ('data', 'shortcode_media')) or {}
if not media:
additional_data = self._parse_json(
self._search_regex(
r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*({.+?})\s*\);',
webpage, 'additional data', default='{}'),
video_id, fatal=False)
product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
if product_item:
return self._extract_product(product_item)
media = traverse_obj(additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {}
self.report_warning('General metadata extraction failed', video_id)
if not media and 'www.instagram.com/accounts/login' in urlh.geturl():
self.raise_login_required('You need to log in to access this content')
info = self._download_json(
f'https://i.instagram.com/api/v1/media/{_id_to_pk(video_id)}/info/', video_id,
fatal=False, note='Downloading video info', errnote=False, headers={
'Accept': '*',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'Authority': 'www.instagram.com',
'Referer': 'https://www.instagram.com',
'x-ig-app-id': '936619743392459',
})
if info:
media.update(info['items'][0])
return self._extract_product(media)
webpage = self._download_webpage(
f'https://www.instagram.com/p/{video_id}/embed/', video_id,
note='Downloading embed webpage', fatal=False)
if not webpage:
self.raise_login_required('Requested content was not found, the content might be private')
additional_data = self._search_json(
r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*', webpage, 'additional data', video_id, fatal=False)
product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
if product_item:
media.update(product_item)
return self._extract_product(media)
media.update(traverse_obj(
additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {})
username = traverse_obj(media, ('owner', 'username')) or self._search_regex(
r'"owner"\s*:\s*{\s*"username"\s*:\s*"(.+?)"', webpage, 'username', fatal=False)
@ -519,7 +537,7 @@ class InstagramPlaylistBaseIE(InstagramBaseIE):
except ExtractorError as e:
# if it's an error caused by a bad query, and there are
# more GIS templates to try, ignore it and keep trying
if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403:
if isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 403:
if gis_tmpl != gis_tmpls[-1]:
continue
raise
@ -629,41 +647,36 @@ class InstagramStoryIE(InstagramBaseIE):
def _real_extract(self, url):
username, story_id = self._match_valid_url(url).groups()
story_info_url = f'{username}/{story_id}/?__a=1' if username == 'highlights' else f'{username}/?__a=1'
story_info = self._download_json(f'https://www.instagram.com/stories/{story_info_url}', story_id, headers={
'X-IG-App-ID': 936619743392459,
'X-ASBD-ID': 198387,
'X-IG-WWW-Claim': 0,
'X-Requested-With': 'XMLHttpRequest',
'Referer': url,
})
user_id = story_info['user']['id']
highlight_title = traverse_obj(story_info, ('highlight', 'title'))
story_info = self._download_webpage(url, story_id)
user_info = self._search_json(r'"user":', story_info, 'user info', story_id, fatal=False)
if not user_info:
self.raise_login_required('This content is unreachable')
user_id = user_info.get('id')
story_info_url = user_id if username != 'highlights' else f'highlight:{story_id}'
videos = self._download_json(f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}', story_id, headers={
'X-IG-App-ID': 936619743392459,
'X-ASBD-ID': 198387,
'X-IG-WWW-Claim': 0,
})['reels']
videos = traverse_obj(self._download_json(
f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}',
story_id, errnote=False, fatal=False, headers={
'X-IG-App-ID': 936619743392459,
'X-ASBD-ID': 198387,
'X-IG-WWW-Claim': 0,
}), 'reels')
if not videos:
self.raise_login_required('You need to log in to access this content')
full_name = traverse_obj(videos, ('user', 'full_name'))
user_info = {}
if not (username and username != 'highlights' and full_name):
user_info = self._download_json(
f'https://i.instagram.com/api/v1/users/{user_id}/info/', story_id, headers={
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SM-A505F Build/RP1A.200720.012; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/96.0.4664.45 Mobile Safari/537.36 Instagram 214.1.0.29.120 Android (30/11; 450dpi; 1080x2122; samsung; SM-A505F; a50; exynos9610; en_US; 333717274)',
}, note='Downloading user info')
username = traverse_obj(user_info, ('user', 'username')) or username
full_name = traverse_obj(user_info, ('user', 'full_name')) or full_name
full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (str(user_id), 'user', 'full_name'))
story_title = traverse_obj(videos, (f'highlight:{story_id}', 'title'))
if not story_title:
story_title = f'Story by {username}'
highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (str(user_id), 'items'))
return self.playlist_result([{
**self._extract_product(highlight),
'title': f'Story by {username}',
'uploader': full_name,
'uploader_id': user_id,
} for highlight in highlights], playlist_id=story_id, playlist_title=highlight_title)
info_data = []
for highlight in highlights:
highlight_data = self._extract_product(highlight)
if highlight_data.get('formats'):
info_data.append({
**highlight_data,
'uploader': full_name,
'uploader_id': user_id,
})
return self.playlist_result(info_data, playlist_id=story_id, playlist_title=story_title)