0
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2024-11-21 02:05:12 +00:00

[networking] Add module (#2861)

No actual changes - code is only moved around
This commit is contained in:
pukkandan 2023-07-15 14:30:08 +05:30
parent 1b392f905d
commit c365dba843
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39
13 changed files with 587 additions and 500 deletions

View file

@ -74,7 +74,7 @@ offlinetest: codetest
$(PYTHON) -m pytest -k "not download"
# XXX: This is hard to maintain
CODE_FOLDERS = yt_dlp yt_dlp/downloader yt_dlp/extractor yt_dlp/postprocessor yt_dlp/compat yt_dlp/compat/urllib yt_dlp/utils yt_dlp/dependencies
CODE_FOLDERS = yt_dlp yt_dlp/downloader yt_dlp/extractor yt_dlp/postprocessor yt_dlp/compat yt_dlp/compat/urllib yt_dlp/utils yt_dlp/dependencies yt_dlp/networking
yt-dlp: yt_dlp/*.py yt_dlp/*/*.py
mkdir -p zip
for d in $(CODE_FOLDERS) ; do \

View file

@ -54,6 +54,7 @@ def commit_lookup(cls):
'core',
'dependencies',
'jsinterp',
'networking',
'outtmpl',
'formats',
'plugins',

View file

@ -258,15 +258,6 @@ def test_sanitize_url(self):
self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
self.assertEqual(sanitize_url('foo bar'), 'foo bar')
def test_extract_basic_auth(self):
auth_header = lambda url: sanitized_Request(url).get_header('Authorization')
self.assertFalse(auth_header('http://foo.bar'))
self.assertFalse(auth_header('http://:foo.bar'))
self.assertEqual(auth_header('http://@foo.bar'), 'Basic Og==')
self.assertEqual(auth_header('http://:pass@foo.bar'), 'Basic OnBhc3M=')
self.assertEqual(auth_header('http://user:@foo.bar'), 'Basic dXNlcjo=')
self.assertEqual(auth_header('http://user:pass@foo.bar'), 'Basic dXNlcjpwYXNz')
def test_expand_path(self):
def env(var):
return f'%{var}%' if sys.platform == 'win32' else f'${var}'
@ -2324,6 +2315,15 @@ def test_traverse_obj(self):
self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 'group')), ['0123', '3'],
msg='function on a `re.Match` should give group name as well')
def test_extract_basic_auth(self):
auth_header = lambda url: sanitized_Request(url).get_header('Authorization')
self.assertFalse(auth_header('http://foo.bar'))
self.assertFalse(auth_header('http://:foo.bar'))
self.assertEqual(auth_header('http://@foo.bar'), 'Basic Og==')
self.assertEqual(auth_header('http://:pass@foo.bar'), 'Basic OnBhc3M=')
self.assertEqual(auth_header('http://user:@foo.bar'), 'Basic dXNlcjo=')
self.assertEqual(auth_header('http://user:pass@foo.bar'), 'Basic dXNlcjpwYXNz')
if __name__ == '__main__':
unittest.main()

View file

@ -151,6 +151,7 @@
write_json_file,
write_string,
)
from .utils.networking import clean_headers
from .version import CHANNEL, RELEASE_GIT_HEAD, VARIANT, __version__
if compat_os_name == 'nt':
@ -672,6 +673,7 @@ def process_color_policy(stream):
raise
self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
self.params['http_headers'] = merge_headers(std_headers, self.params.get('http_headers', {}))
if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header()
@ -745,9 +747,6 @@ def check_deprecated(param, option, suggestion):
else self.params['format'] if callable(self.params['format'])
else self.build_format_selector(self.params['format']))
# Set http_headers defaults according to std_headers
self.params['http_headers'] = merge_headers(std_headers, self.params.get('http_headers', {}))
hooks = {
'post_hooks': self.add_post_hook,
'progress_hooks': self.add_progress_hook,
@ -941,12 +940,14 @@ def __enter__(self):
self.save_console_title()
return self
def __exit__(self, *args):
self.restore_console_title()
def save_cookies(self):
if self.params.get('cookiefile') is not None:
self.cookiejar.save(ignore_discard=True, ignore_expires=True)
def __exit__(self, *args):
self.restore_console_title()
self.save_cookies()
def trouble(self, message=None, tb=None, is_error=True):
"""Determine action to take when a download problem appears.
@ -2468,9 +2469,7 @@ def restore_last_token(self):
def _calc_headers(self, info_dict):
res = merge_headers(self.params['http_headers'], info_dict.get('http_headers') or {})
if 'Youtubedl-No-Compression' in res: # deprecated
res.pop('Youtubedl-No-Compression', None)
res['Accept-Encoding'] = 'identity'
clean_headers(res)
cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
if cookies:
encoder = LenientSimpleCookie()
@ -3856,12 +3855,6 @@ def list_thumbnails(self, info_dict):
def list_subtitles(self, video_id, subtitles, name='subtitles'):
self.__list_table(video_id, name, self.render_subtitles_table, video_id, subtitles)
def urlopen(self, req):
""" Start an HTTP download """
if isinstance(req, str):
req = sanitized_Request(req)
return self._opener.open(req, timeout=self._socket_timeout)
def print_debug_header(self):
if not self.params.get('verbose'):
return
@ -3989,13 +3982,8 @@ def _setup_opener(self):
return
timeout_val = self.params.get('socket_timeout')
self._socket_timeout = 20 if timeout_val is None else float(timeout_val)
opts_cookiesfrombrowser = self.params.get('cookiesfrombrowser')
opts_cookiefile = self.params.get('cookiefile')
opts_proxy = self.params.get('proxy')
self.cookiejar = load_cookies(opts_cookiefile, opts_cookiesfrombrowser, self)
cookie_processor = YoutubeDLCookieProcessor(self.cookiejar)
if opts_proxy is not None:
if opts_proxy == '':
@ -4037,6 +4025,18 @@ def file_open(*args, **kwargs):
opener.addheaders = []
self._opener = opener
@functools.cached_property
def cookiejar(self):
"""Global cookiejar instance"""
return load_cookies(
self.params.get('cookiefile'), self.params.get('cookiesfrombrowser'), self)
def urlopen(self, req):
""" Start an HTTP download """
if isinstance(req, str):
req = sanitized_Request(req)
return self._opener.open(req, timeout=self._socket_timeout)
def encode(self, s):
if isinstance(s, bytes):
return s # Already encoded

View file

View file

@ -0,0 +1,139 @@
from __future__ import annotations
import contextlib
import ssl
import sys
import urllib.parse
from ..dependencies import certifi
from ..socks import ProxyType
from ..utils import YoutubeDLError
def ssl_load_certs(context: ssl.SSLContext, use_certifi=True):
if certifi and use_certifi:
context.load_verify_locations(cafile=certifi.where())
else:
try:
context.load_default_certs()
# Work around the issue in load_default_certs when there are bad certificates. See:
# https://github.com/yt-dlp/yt-dlp/issues/1060,
# https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
except ssl.SSLError:
# enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
for storename in ('CA', 'ROOT'):
_ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
def _ssl_load_windows_store_certs(ssl_context, storename):
# Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
try:
certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
if encoding == 'x509_asn' and (
trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
except PermissionError:
return
for cert in certs:
with contextlib.suppress(ssl.SSLError):
ssl_context.load_verify_locations(cadata=cert)
def make_socks_proxy_opts(socks_proxy):
url_components = urllib.parse.urlparse(socks_proxy)
if url_components.scheme.lower() == 'socks5':
socks_type = ProxyType.SOCKS5
elif url_components.scheme.lower() in ('socks', 'socks4'):
socks_type = ProxyType.SOCKS4
elif url_components.scheme.lower() == 'socks4a':
socks_type = ProxyType.SOCKS4A
def unquote_if_non_empty(s):
if not s:
return s
return urllib.parse.unquote_plus(s)
return {
'proxytype': socks_type,
'addr': url_components.hostname,
'port': url_components.port or 1080,
'rdns': True,
'username': unquote_if_non_empty(url_components.username),
'password': unquote_if_non_empty(url_components.password),
}
def get_redirect_method(method, status):
"""Unified redirect method handling"""
# A 303 must either use GET or HEAD for subsequent request
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
if status == 303 and method != 'HEAD':
method = 'GET'
# 301 and 302 redirects are commonly turned into a GET from a POST
# for subsequent requests by browsers, so we'll do the same.
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
if status in (301, 302) and method == 'POST':
method = 'GET'
return method
def make_ssl_context(
verify=True,
client_certificate=None,
client_certificate_key=None,
client_certificate_password=None,
legacy_support=False,
use_certifi=True,
):
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = verify
context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE
# Some servers may reject requests if ALPN extension is not sent. See:
# https://github.com/python/cpython/issues/85140
# https://github.com/yt-dlp/yt-dlp/issues/3878
with contextlib.suppress(NotImplementedError):
context.set_alpn_protocols(['http/1.1'])
if verify:
ssl_load_certs(context, use_certifi)
if legacy_support:
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
context.set_ciphers('DEFAULT') # compat
elif ssl.OPENSSL_VERSION_INFO >= (1, 1, 1) and not ssl.OPENSSL_VERSION.startswith('LibreSSL'):
# Use the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
# This is to ensure consistent behavior across Python versions and libraries, and help avoid fingerprinting
# in some situations [2][3].
# Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
# untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
# LibreSSL is excluded until further investigation due to cipher support issues [5][6].
# 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
# 2. https://github.com/yt-dlp/yt-dlp/issues/4627
# 3. https://github.com/yt-dlp/yt-dlp/pull/5294
# 4. https://peps.python.org/pep-0644/
# 5. https://peps.python.org/pep-0644/#libressl-support
# 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
context.set_ciphers(
'@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
context.minimum_version = ssl.TLSVersion.TLSv1_2
if client_certificate:
try:
context.load_cert_chain(
client_certificate, keyfile=client_certificate_key,
password=client_certificate_password)
except ssl.SSLError:
raise YoutubeDLError('Unable to load client certificate')
return context
def add_accept_encoding_header(headers, supported_encodings):
if supported_encodings and 'Accept-Encoding' not in headers:
headers['Accept-Encoding'] = ', '.join(supported_encodings)
elif 'Accept-Encoding' not in headers:
headers['Accept-Encoding'] = 'identity'

View file

@ -0,0 +1,315 @@
import functools
import gzip
import http.client
import io
import socket
import ssl
import urllib.error
import urllib.parse
import urllib.request
import urllib.response
import zlib
from ._helper import (
add_accept_encoding_header,
get_redirect_method,
make_socks_proxy_opts,
)
from ..dependencies import brotli
from ..socks import sockssocket
from ..utils import escape_url, update_url_query
from ..utils.networking import clean_headers, std_headers
SUPPORTED_ENCODINGS = ['gzip', 'deflate']
if brotli:
SUPPORTED_ENCODINGS.append('br')
def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
hc = http_class(*args, **kwargs)
source_address = ydl_handler._params.get('source_address')
if source_address is not None:
# This is to workaround _create_connection() from socket where it will try all
# address data from getaddrinfo() including IPv6. This filters the result from
# getaddrinfo() based on the source_address value.
# This is based on the cpython socket.create_connection() function.
# https://github.com/python/cpython/blob/master/Lib/socket.py#L691
def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
host, port = address
err = None
addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
af = socket.AF_INET if '.' in source_address[0] else socket.AF_INET6
ip_addrs = [addr for addr in addrs if addr[0] == af]
if addrs and not ip_addrs:
ip_version = 'v4' if af == socket.AF_INET else 'v6'
raise OSError(
"No remote IP%s addresses available for connect, can't use '%s' as source address"
% (ip_version, source_address[0]))
for res in ip_addrs:
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
sock.bind(source_address)
sock.connect(sa)
err = None # Explicitly break reference cycle
return sock
except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise OSError('getaddrinfo returns an empty list')
if hasattr(hc, '_create_connection'):
hc._create_connection = _create_connection
hc.source_address = (source_address, 0)
return hc
class HTTPHandler(urllib.request.HTTPHandler):
"""Handler for HTTP requests and responses.
This class, when installed with an OpenerDirector, automatically adds
the standard headers to every HTTP request and handles gzipped, deflated and
brotli responses from web servers.
Part of this code was copied from:
http://techknack.net/python-urllib2-handlers/
Andrew Rowls, the author of that code, agreed to release it to the
public domain.
"""
def __init__(self, params, *args, **kwargs):
urllib.request.HTTPHandler.__init__(self, *args, **kwargs)
self._params = params
def http_open(self, req):
conn_class = http.client.HTTPConnection
socks_proxy = req.headers.get('Ytdl-socks-proxy')
if socks_proxy:
conn_class = make_socks_conn_class(conn_class, socks_proxy)
del req.headers['Ytdl-socks-proxy']
return self.do_open(functools.partial(
_create_http_connection, self, conn_class, False),
req)
@staticmethod
def deflate(data):
if not data:
return data
try:
return zlib.decompress(data, -zlib.MAX_WBITS)
except zlib.error:
return zlib.decompress(data)
@staticmethod
def brotli(data):
if not data:
return data
return brotli.decompress(data)
@staticmethod
def gz(data):
gz = gzip.GzipFile(fileobj=io.BytesIO(data), mode='rb')
try:
return gz.read()
except OSError as original_oserror:
# There may be junk add the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range(1, 1024):
try:
gz = gzip.GzipFile(fileobj=io.BytesIO(data[:-i]), mode='rb')
return gz.read()
except OSError:
continue
else:
raise original_oserror
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
# always respected by websites, some tend to give out URLs with non percent-encoded
# non-ASCII characters (see telemb.py, ard.py [#3412])
# urllib chokes on URLs with non-ASCII characters (see http://bugs.python.org/issue3991)
# To work around aforementioned issue we will replace request's original URL with
# percent-encoded one
# Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
# the code of this workaround has been moved here from YoutubeDL.urlopen()
url = req.get_full_url()
url_escaped = escape_url(url)
# Substitute URL if any change after escaping
if url != url_escaped:
req = update_Request(req, url=url_escaped)
for h, v in self._params.get('http_headers', std_headers).items():
# Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
# The dict keys are capitalized because of this bug by urllib
if h.capitalize() not in req.headers:
req.add_header(h, v)
clean_headers(req.headers)
add_accept_encoding_header(req.headers, SUPPORTED_ENCODINGS)
return super().do_request_(req)
def http_response(self, req, resp):
old_resp = resp
# Content-Encoding header lists the encodings in order that they were applied [1].
# To decompress, we simply do the reverse.
# [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
decoded_response = None
for encoding in (e.strip() for e in reversed(resp.headers.get('Content-encoding', '').split(','))):
if encoding == 'gzip':
decoded_response = self.gz(decoded_response or resp.read())
elif encoding == 'deflate':
decoded_response = self.deflate(decoded_response or resp.read())
elif encoding == 'br' and brotli:
decoded_response = self.brotli(decoded_response or resp.read())
if decoded_response is not None:
resp = urllib.request.addinfourl(io.BytesIO(decoded_response), old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
if 300 <= resp.code < 400:
location = resp.headers.get('Location')
if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
location = location.encode('iso-8859-1').decode()
location_escaped = escape_url(location)
if location != location_escaped:
del resp.headers['Location']
resp.headers['Location'] = location_escaped
return resp
https_request = http_request
https_response = http_response
def make_socks_conn_class(base_class, socks_proxy):
assert issubclass(base_class, (
http.client.HTTPConnection, http.client.HTTPSConnection))
proxy_args = make_socks_proxy_opts(socks_proxy)
class SocksConnection(base_class):
def connect(self):
self.sock = sockssocket()
self.sock.setproxy(**proxy_args)
if isinstance(self.timeout, (int, float)):
self.sock.settimeout(self.timeout)
self.sock.connect((self.host, self.port))
if isinstance(self, http.client.HTTPSConnection):
if hasattr(self, '_context'): # Python > 2.6
self.sock = self._context.wrap_socket(
self.sock, server_hostname=self.host)
else:
self.sock = ssl.wrap_socket(self.sock)
return SocksConnection
class RedirectHandler(urllib.request.HTTPRedirectHandler):
"""YoutubeDL redirect handler
The code is based on HTTPRedirectHandler implementation from CPython [1].
This redirect handler fixes and improves the logic to better align with RFC7261
and what browsers tend to do [2][3]
1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
2. https://datatracker.ietf.org/doc/html/rfc7231
3. https://github.com/python/cpython/issues/91306
"""
http_error_301 = http_error_303 = http_error_307 = http_error_308 = urllib.request.HTTPRedirectHandler.http_error_302
def redirect_request(self, req, fp, code, msg, headers, newurl):
if code not in (301, 302, 303, 307, 308):
raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
new_data = req.data
# Technically the Cookie header should be in unredirected_hdrs,
# however in practice some may set it in normal headers anyway.
# We will remove it here to prevent any leaks.
remove_headers = ['Cookie']
new_method = get_redirect_method(req.get_method(), code)
# only remove payload if method changed (e.g. POST to GET)
if new_method != req.get_method():
new_data = None
remove_headers.extend(['Content-Length', 'Content-Type'])
new_headers = {k: v for k, v in req.headers.items() if k.title() not in remove_headers}
return urllib.request.Request(
newurl, headers=new_headers, origin_req_host=req.origin_req_host,
unverifiable=True, method=new_method, data=new_data)
class ProxyHandler(urllib.request.ProxyHandler):
def __init__(self, proxies=None):
# Set default handlers
for type in ('http', 'https'):
setattr(self, '%s_open' % type,
lambda r, proxy='__noproxy__', type=type, meth=self.proxy_open:
meth(r, proxy, type))
urllib.request.ProxyHandler.__init__(self, proxies)
def proxy_open(self, req, proxy, type):
req_proxy = req.headers.get('Ytdl-request-proxy')
if req_proxy is not None:
proxy = req_proxy
del req.headers['Ytdl-request-proxy']
if proxy == '__noproxy__':
return None # No Proxy
if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
req.add_header('Ytdl-socks-proxy', proxy)
# yt-dlp's http/https handlers do wrapping the socket with socks
return None
return urllib.request.ProxyHandler.proxy_open(
self, req, proxy, type)
class PUTRequest(urllib.request.Request):
def get_method(self):
return 'PUT'
class HEADRequest(urllib.request.Request):
def get_method(self):
return 'HEAD'
def update_Request(req, url=None, data=None, headers=None, query=None):
req_headers = req.headers.copy()
req_headers.update(headers or {})
req_data = data or req.data
req_url = update_url_query(url or req.get_full_url(), query)
req_get_method = req.get_method()
if req_get_method == 'HEAD':
req_type = HEADRequest
elif req_get_method == 'PUT':
req_type = PUTRequest
else:
req_type = urllib.request.Request
new_req = req_type(
req_url, data=req_data, headers=req_headers,
origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
if hasattr(req, 'timeout'):
new_req.timeout = req.timeout
return new_req

View file

@ -0,0 +1,9 @@
import http.client
import socket
import ssl
import urllib.error
network_exceptions = [urllib.error.URLError, http.client.HTTPException, socket.error]
if hasattr(ssl, 'CertificateError'):
network_exceptions.append(ssl.CertificateError)
network_exceptions = tuple(network_exceptions)

View file

@ -3,13 +3,10 @@
from ..compat.compat_utils import passthrough_module
# XXX: Implement this the same way as other DeprecationWarnings without circular import
passthrough_module(__name__, '._legacy', callback=lambda attr: warnings.warn(
DeprecationWarning(f'{__name__}.{attr} is deprecated'), stacklevel=5))
passthrough_module(__name__, '._deprecated')
del passthrough_module
# isort: off
from .traversal import *
from ._utils import *
from ._utils import _configuration_args, _get_exe_version_output
from ._deprecated import *

View file

@ -1,7 +1,26 @@
"""Deprecated - New code should avoid these"""
import warnings
from ..compat.compat_utils import passthrough_module
# XXX: Implement this the same way as other DeprecationWarnings without circular import
passthrough_module(__name__, '.._legacy', callback=lambda attr: warnings.warn(
DeprecationWarning(f'{__name__}.{attr} is deprecated'), stacklevel=6))
del passthrough_module
from ._utils import preferredencoding
# isort: split
from ..networking._urllib import PUTRequest # noqa: F401
from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest # noqa: F401
from ..networking._urllib import HTTPHandler as YoutubeDLHandler # noqa: F401
from ..networking._urllib import ProxyHandler as PerRequestProxyHandler # noqa: F401
from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler # noqa: F401
from ..networking._urllib import make_socks_conn_class, update_Request # noqa: F401
from ..networking.exceptions import network_exceptions # noqa: F401
from .networking import random_user_agent, std_headers # noqa: F401
def encodeFilename(s, for_subprocess=False):
assert isinstance(s, str)

View file

@ -11,7 +11,6 @@
import email.header
import email.utils
import errno
import gzip
import hashlib
import hmac
import html.entities
@ -46,7 +45,6 @@
import urllib.parse
import urllib.request
import xml.etree.ElementTree
import zlib
from . import traversal
@ -58,8 +56,7 @@
compat_os_name,
compat_shlex_quote,
)
from ..dependencies import brotli, certifi, websockets, xattr
from ..socks import ProxyType, sockssocket
from ..dependencies import websockets, xattr
__name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module
@ -67,65 +64,6 @@
compiled_regex_type = type(re.compile(''))
def random_user_agent():
_USER_AGENT_TPL = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36'
_CHROME_VERSIONS = (
'90.0.4430.212',
'90.0.4430.24',
'90.0.4430.70',
'90.0.4430.72',
'90.0.4430.85',
'90.0.4430.93',
'91.0.4472.101',
'91.0.4472.106',
'91.0.4472.114',
'91.0.4472.124',
'91.0.4472.164',
'91.0.4472.19',
'91.0.4472.77',
'92.0.4515.107',
'92.0.4515.115',
'92.0.4515.131',
'92.0.4515.159',
'92.0.4515.43',
'93.0.4556.0',
'93.0.4577.15',
'93.0.4577.63',
'93.0.4577.82',
'94.0.4606.41',
'94.0.4606.54',
'94.0.4606.61',
'94.0.4606.71',
'94.0.4606.81',
'94.0.4606.85',
'95.0.4638.17',
'95.0.4638.50',
'95.0.4638.54',
'95.0.4638.69',
'95.0.4638.74',
'96.0.4664.18',
'96.0.4664.45',
'96.0.4664.55',
'96.0.4664.93',
'97.0.4692.20',
)
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
SUPPORTED_ENCODINGS = [
'gzip', 'deflate'
]
if brotli:
SUPPORTED_ENCODINGS.append('br')
std_headers = {
'User-Agent': random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Sec-Fetch-Mode': 'navigate',
}
USER_AGENTS = {
'Safari': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 Safari/533.20.27',
}
@ -958,80 +896,16 @@ def formatSeconds(secs, delim=':', msec=False):
return '%s.%03d' % (ret, time.milliseconds) if msec else ret
def _ssl_load_windows_store_certs(ssl_context, storename):
# Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
try:
certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
if encoding == 'x509_asn' and (
trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
except PermissionError:
return
for cert in certs:
with contextlib.suppress(ssl.SSLError):
ssl_context.load_verify_locations(cadata=cert)
def make_HTTPS_handler(params, **kwargs):
opts_check_certificate = not params.get('nocheckcertificate')
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = opts_check_certificate
if params.get('legacyserverconnect'):
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
# Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
context.set_ciphers('DEFAULT')
elif (
sys.version_info < (3, 10)
and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
and not ssl.OPENSSL_VERSION.startswith('LibreSSL')
):
# Backport the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
# This is to ensure consistent behavior across Python versions, and help avoid fingerprinting
# in some situations [2][3].
# Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
# untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
# LibreSSL is excluded until further investigation due to cipher support issues [5][6].
# 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
# 2. https://github.com/yt-dlp/yt-dlp/issues/4627
# 3. https://github.com/yt-dlp/yt-dlp/pull/5294
# 4. https://peps.python.org/pep-0644/
# 5. https://peps.python.org/pep-0644/#libressl-support
# 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
if opts_check_certificate:
if certifi and 'no-certifi' not in params.get('compat_opts', []):
context.load_verify_locations(cafile=certifi.where())
else:
try:
context.load_default_certs()
# Work around the issue in load_default_certs when there are bad certificates. See:
# https://github.com/yt-dlp/yt-dlp/issues/1060,
# https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
except ssl.SSLError:
# enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
for storename in ('CA', 'ROOT'):
_ssl_load_windows_store_certs(context, storename)
context.set_default_verify_paths()
client_certfile = params.get('client_certificate')
if client_certfile:
try:
context.load_cert_chain(
client_certfile, keyfile=params.get('client_certificate_key'),
password=params.get('client_certificate_password'))
except ssl.SSLError:
raise YoutubeDLError('Unable to load client certificate')
# Some servers may reject requests if ALPN extension is not sent. See:
# https://github.com/python/cpython/issues/85140
# https://github.com/yt-dlp/yt-dlp/issues/3878
with contextlib.suppress(NotImplementedError):
context.set_alpn_protocols(['http/1.1'])
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
from ..networking._helper import make_ssl_context
return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
verify=not params.get('nocheckcertificate'),
client_certificate=params.get('client_certificate'),
client_certificate_key=params.get('client_certificate_key'),
client_certificate_password=params.get('client_certificate_password'),
legacy_support=params.get('legacyserverconnect'),
use_certifi='no-certifi' not in params.get('compat_opts', []),
), **kwargs)
def bug_reports_message(before=';'):
@ -1059,12 +933,6 @@ def __init__(self, msg=None):
super().__init__(self.msg)
network_exceptions = [urllib.error.URLError, http.client.HTTPException, socket.error]
if hasattr(ssl, 'CertificateError'):
network_exceptions.append(ssl.CertificateError)
network_exceptions = tuple(network_exceptions)
class ExtractorError(YoutubeDLError):
"""Error during info extraction."""
@ -1072,6 +940,7 @@ def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=N
""" tb, if given, is the original traceback (so that it can be printed out).
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
"""
from ..networking.exceptions import network_exceptions
if sys.exc_info()[0] in network_exceptions:
expected = True
@ -1271,225 +1140,6 @@ class XAttrUnavailableError(YoutubeDLError):
pass
def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
hc = http_class(*args, **kwargs)
source_address = ydl_handler._params.get('source_address')
if source_address is not None:
# This is to workaround _create_connection() from socket where it will try all
# address data from getaddrinfo() including IPv6. This filters the result from
# getaddrinfo() based on the source_address value.
# This is based on the cpython socket.create_connection() function.
# https://github.com/python/cpython/blob/master/Lib/socket.py#L691
def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
host, port = address
err = None
addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
af = socket.AF_INET if '.' in source_address[0] else socket.AF_INET6
ip_addrs = [addr for addr in addrs if addr[0] == af]
if addrs and not ip_addrs:
ip_version = 'v4' if af == socket.AF_INET else 'v6'
raise OSError(
"No remote IP%s addresses available for connect, can't use '%s' as source address"
% (ip_version, source_address[0]))
for res in ip_addrs:
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
sock.bind(source_address)
sock.connect(sa)
err = None # Explicitly break reference cycle
return sock
except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise OSError('getaddrinfo returns an empty list')
if hasattr(hc, '_create_connection'):
hc._create_connection = _create_connection
hc.source_address = (source_address, 0)
return hc
class YoutubeDLHandler(urllib.request.HTTPHandler):
"""Handler for HTTP requests and responses.
This class, when installed with an OpenerDirector, automatically adds
the standard headers to every HTTP request and handles gzipped, deflated and
brotli responses from web servers.
Part of this code was copied from:
http://techknack.net/python-urllib2-handlers/
Andrew Rowls, the author of that code, agreed to release it to the
public domain.
"""
def __init__(self, params, *args, **kwargs):
urllib.request.HTTPHandler.__init__(self, *args, **kwargs)
self._params = params
def http_open(self, req):
conn_class = http.client.HTTPConnection
socks_proxy = req.headers.get('Ytdl-socks-proxy')
if socks_proxy:
conn_class = make_socks_conn_class(conn_class, socks_proxy)
del req.headers['Ytdl-socks-proxy']
return self.do_open(functools.partial(
_create_http_connection, self, conn_class, False),
req)
@staticmethod
def deflate(data):
if not data:
return data
try:
return zlib.decompress(data, -zlib.MAX_WBITS)
except zlib.error:
return zlib.decompress(data)
@staticmethod
def brotli(data):
if not data:
return data
return brotli.decompress(data)
@staticmethod
def gz(data):
gz = gzip.GzipFile(fileobj=io.BytesIO(data), mode='rb')
try:
return gz.read()
except OSError as original_oserror:
# There may be junk add the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range(1, 1024):
try:
gz = gzip.GzipFile(fileobj=io.BytesIO(data[:-i]), mode='rb')
return gz.read()
except OSError:
continue
else:
raise original_oserror
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
# always respected by websites, some tend to give out URLs with non percent-encoded
# non-ASCII characters (see telemb.py, ard.py [#3412])
# urllib chokes on URLs with non-ASCII characters (see http://bugs.python.org/issue3991)
# To work around aforementioned issue we will replace request's original URL with
# percent-encoded one
# Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
# the code of this workaround has been moved here from YoutubeDL.urlopen()
url = req.get_full_url()
url_escaped = escape_url(url)
# Substitute URL if any change after escaping
if url != url_escaped:
req = update_Request(req, url=url_escaped)
for h, v in self._params.get('http_headers', std_headers).items():
# Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
# The dict keys are capitalized because of this bug by urllib
if h.capitalize() not in req.headers:
req.add_header(h, v)
if 'Youtubedl-no-compression' in req.headers: # deprecated
req.headers.pop('Youtubedl-no-compression', None)
req.add_header('Accept-encoding', 'identity')
if 'Accept-encoding' not in req.headers:
req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
return super().do_request_(req)
def http_response(self, req, resp):
old_resp = resp
# Content-Encoding header lists the encodings in order that they were applied [1].
# To decompress, we simply do the reverse.
# [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
decoded_response = None
for encoding in (e.strip() for e in reversed(resp.headers.get('Content-encoding', '').split(','))):
if encoding == 'gzip':
decoded_response = self.gz(decoded_response or resp.read())
elif encoding == 'deflate':
decoded_response = self.deflate(decoded_response or resp.read())
elif encoding == 'br' and brotli:
decoded_response = self.brotli(decoded_response or resp.read())
if decoded_response is not None:
resp = urllib.request.addinfourl(io.BytesIO(decoded_response), old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
if 300 <= resp.code < 400:
location = resp.headers.get('Location')
if location:
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
location = location.encode('iso-8859-1').decode()
location_escaped = escape_url(location)
if location != location_escaped:
del resp.headers['Location']
resp.headers['Location'] = location_escaped
return resp
https_request = http_request
https_response = http_response
def make_socks_conn_class(base_class, socks_proxy):
assert issubclass(base_class, (
http.client.HTTPConnection, http.client.HTTPSConnection))
url_components = urllib.parse.urlparse(socks_proxy)
if url_components.scheme.lower() == 'socks5':
socks_type = ProxyType.SOCKS5
elif url_components.scheme.lower() in ('socks', 'socks4'):
socks_type = ProxyType.SOCKS4
elif url_components.scheme.lower() == 'socks4a':
socks_type = ProxyType.SOCKS4A
def unquote_if_non_empty(s):
if not s:
return s
return urllib.parse.unquote_plus(s)
proxy_args = (
socks_type,
url_components.hostname, url_components.port or 1080,
True, # Remote DNS
unquote_if_non_empty(url_components.username),
unquote_if_non_empty(url_components.password),
)
class SocksConnection(base_class):
def connect(self):
self.sock = sockssocket()
self.sock.setproxy(*proxy_args)
if isinstance(self.timeout, (int, float)):
self.sock.settimeout(self.timeout)
self.sock.connect((self.host, self.port))
if isinstance(self, http.client.HTTPSConnection):
if hasattr(self, '_context'): # Python > 2.6
self.sock = self._context.wrap_socket(
self.sock, server_hostname=self.host)
else:
self.sock = ssl.wrap_socket(self.sock)
return SocksConnection
class YoutubeDLHTTPSHandler(urllib.request.HTTPSHandler):
def __init__(self, params, https_conn_class=None, *args, **kwargs):
urllib.request.HTTPSHandler.__init__(self, *args, **kwargs)
@ -1507,9 +1157,11 @@ def https_open(self, req):
socks_proxy = req.headers.get('Ytdl-socks-proxy')
if socks_proxy:
from ..networking._urllib import make_socks_conn_class
conn_class = make_socks_conn_class(conn_class, socks_proxy)
del req.headers['Ytdl-socks-proxy']
from ..networking._urllib import _create_http_connection
try:
return self.do_open(
functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
@ -1535,56 +1187,6 @@ def http_response(self, request, response):
https_response = http_response
class YoutubeDLRedirectHandler(urllib.request.HTTPRedirectHandler):
"""YoutubeDL redirect handler
The code is based on HTTPRedirectHandler implementation from CPython [1].
This redirect handler fixes and improves the logic to better align with RFC7261
and what browsers tend to do [2][3]
1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
2. https://datatracker.ietf.org/doc/html/rfc7231
3. https://github.com/python/cpython/issues/91306
"""
http_error_301 = http_error_303 = http_error_307 = http_error_308 = urllib.request.HTTPRedirectHandler.http_error_302
def redirect_request(self, req, fp, code, msg, headers, newurl):
if code not in (301, 302, 303, 307, 308):
raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
new_method = req.get_method()
new_data = req.data
# Technically the Cookie header should be in unredirected_hdrs,
# however in practice some may set it in normal headers anyway.
# We will remove it here to prevent any leaks.
remove_headers = ['Cookie']
# A 303 must either use GET or HEAD for subsequent request
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
if code == 303 and req.get_method() != 'HEAD':
new_method = 'GET'
# 301 and 302 redirects are commonly turned into a GET from a POST
# for subsequent requests by browsers, so we'll do the same.
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
elif code in (301, 302) and req.get_method() == 'POST':
new_method = 'GET'
# only remove payload if method changed (e.g. POST to GET)
if new_method != req.get_method():
new_data = None
remove_headers.extend(['Content-Length', 'Content-Type'])
new_headers = {k: v for k, v in req.headers.items() if k.title() not in remove_headers}
return urllib.request.Request(
newurl, headers=new_headers, origin_req_host=req.origin_req_host,
unverifiable=True, method=new_method, data=new_data)
def extract_timezone(date_str):
m = re.search(
r'''(?x)
@ -2390,16 +1992,6 @@ def urljoin(base, path):
return urllib.parse.urljoin(base, path)
class HEADRequest(urllib.request.Request):
def get_method(self):
return 'HEAD'
class PUTRequest(urllib.request.Request):
def get_method(self):
return 'PUT'
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
if get_attr and v is not None:
v = getattr(v, get_attr, None)
@ -3016,26 +2608,6 @@ def update_url_query(url, query):
return update_url(url, query_update=query)
def update_Request(req, url=None, data=None, headers=None, query=None):
req_headers = req.headers.copy()
req_headers.update(headers or {})
req_data = data or req.data
req_url = update_url_query(url or req.get_full_url(), query)
req_get_method = req.get_method()
if req_get_method == 'HEAD':
req_type = HEADRequest
elif req_get_method == 'PUT':
req_type = PUTRequest
else:
req_type = urllib.request.Request
new_req = req_type(
req_url, data=req_data, headers=req_headers,
origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
if hasattr(req, 'timeout'):
new_req.timeout = req.timeout
return new_req
def _multipart_encode_impl(data, boundary):
content_type = 'multipart/form-data; boundary=%s' % boundary
@ -4769,31 +4341,6 @@ def random_ipv4(cls, code_or_block):
struct.pack('!L', random.randint(addr_min, addr_max))))
class PerRequestProxyHandler(urllib.request.ProxyHandler):
def __init__(self, proxies=None):
# Set default handlers
for type in ('http', 'https'):
setattr(self, '%s_open' % type,
lambda r, proxy='__noproxy__', type=type, meth=self.proxy_open:
meth(r, proxy, type))
urllib.request.ProxyHandler.__init__(self, proxies)
def proxy_open(self, req, proxy, type):
req_proxy = req.headers.get('Ytdl-request-proxy')
if req_proxy is not None:
proxy = req_proxy
del req.headers['Ytdl-request-proxy']
if proxy == '__noproxy__':
return None # No Proxy
if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
req.add_header('Ytdl-socks-proxy', proxy)
# yt-dlp's http/https handlers do wrapping the socket with socks
return None
return urllib.request.ProxyHandler.proxy_open(
self, req, proxy, type)
# Both long_to_bytes and bytes_to_long are adapted from PyCrypto, which is
# released into Public Domain
# https://github.com/dlitz/pycrypto/blob/master/lib/Crypto/Util/number.py#L387

View file

@ -0,0 +1,60 @@
import random
def random_user_agent():
_USER_AGENT_TPL = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36'
_CHROME_VERSIONS = (
'90.0.4430.212',
'90.0.4430.24',
'90.0.4430.70',
'90.0.4430.72',
'90.0.4430.85',
'90.0.4430.93',
'91.0.4472.101',
'91.0.4472.106',
'91.0.4472.114',
'91.0.4472.124',
'91.0.4472.164',
'91.0.4472.19',
'91.0.4472.77',
'92.0.4515.107',
'92.0.4515.115',
'92.0.4515.131',
'92.0.4515.159',
'92.0.4515.43',
'93.0.4556.0',
'93.0.4577.15',
'93.0.4577.63',
'93.0.4577.82',
'94.0.4606.41',
'94.0.4606.54',
'94.0.4606.61',
'94.0.4606.71',
'94.0.4606.81',
'94.0.4606.85',
'95.0.4638.17',
'95.0.4638.50',
'95.0.4638.54',
'95.0.4638.69',
'95.0.4638.74',
'96.0.4664.18',
'96.0.4664.45',
'96.0.4664.55',
'96.0.4664.93',
'97.0.4692.20',
)
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
std_headers = {
'User-Agent': random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Sec-Fetch-Mode': 'navigate',
}
def clean_headers(headers):
if 'Youtubedl-no-compression' in headers: # compat
del headers['Youtubedl-no-compression']
headers['Accept-Encoding'] = 'identity'