From 3121512228487c9c690d3d39bfd2579addf96e07 Mon Sep 17 00:00:00 2001 From: Simon Sawicki Date: Thu, 6 Jul 2023 21:51:04 +0530 Subject: [PATCH] [core] Change how `Cookie` headers are handled Cookies are now saved and loaded under `cookies` key in the info dict instead of `http_headers.Cookie`. Cookies passed in headers are auto-scoped to the input URLs with a warning. Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj Authored by: Grub4K --- test/test_YoutubeDL.py | 56 ++++++++++++++++++++++++++ yt_dlp/YoutubeDL.py | 80 +++++++++++++++++++++++++++++++++++-- yt_dlp/downloader/common.py | 7 +++- 3 files changed, 139 insertions(+), 4 deletions(-) diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py index 3fbcdd01f..c15c7704c 100644 --- a/test/test_YoutubeDL.py +++ b/test/test_YoutubeDL.py @@ -1213,6 +1213,62 @@ def _real_extract(self, url): self.assertEqual(downloaded['extractor'], 'Video') self.assertEqual(downloaded['extractor_key'], 'Video') + def test_header_cookies(self): + from http.cookiejar import Cookie + + ydl = FakeYDL() + ydl.report_warning = lambda *_, **__: None + + def cookie(name, value, version=None, domain='', path='', secure=False, expires=None): + return Cookie( + version or 0, name, value, None, False, + domain, bool(domain), bool(domain), path, bool(path), + secure, expires, False, None, None, rest={}) + + _test_url = 'https://yt.dlp/test' + + def test(encoded_cookies, cookies, headers=False, round_trip=None, error=None): + def _test(): + ydl.cookiejar.clear() + ydl._load_cookies(encoded_cookies, from_headers=headers) + if headers: + ydl._apply_header_cookies(_test_url) + data = {'url': _test_url} + ydl._calc_headers(data) + self.assertCountEqual( + map(vars, ydl.cookiejar), map(vars, cookies), + 'Extracted cookiejar.Cookie is not the same') + if not headers: + self.assertEqual( + data.get('cookies'), round_trip or encoded_cookies, + 'Cookie is not the same as round trip') + ydl.__dict__['_YoutubeDL__header_cookies'] = [] + + with self.subTest(msg=encoded_cookies): + if not error: + _test() + return + with self.assertRaisesRegex(Exception, error): + _test() + + test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')]) + test('test=value', [cookie('test', 'value')], error='Unscoped cookies are not allowed') + test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [ + cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'), + cookie('cookie2', 'value2', domain='.yt.dlp', path='/')]) + test('test=value; Domain=.yt.dlp; Path=/test; Secure; Expires=9999999999', [ + cookie('test', 'value', domain='.yt.dlp', path='/test', secure=True, expires=9999999999)]) + test('test="value; "; path=/test; domain=.yt.dlp', [ + cookie('test', 'value; ', domain='.yt.dlp', path='/test')], + round_trip='test="value\\073 "; Domain=.yt.dlp; Path=/test') + test('name=; Domain=.yt.dlp', [cookie('name', '', domain='.yt.dlp')], + round_trip='name=""; Domain=.yt.dlp') + + test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True) + test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error='Invalid syntax') + ydl.deprecated_feature = ydl.report_error + test('test=value', [], headers=True, error='Passing cookies as a header is a potential security risk') + if __name__ == '__main__': unittest.main() diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py index cf0122d4b..7f5571666 100644 --- a/yt_dlp/YoutubeDL.py +++ b/yt_dlp/YoutubeDL.py @@ -1,9 +1,11 @@ import collections import contextlib +import copy import datetime import errno import fileinput import functools +import http.cookiejar import io import itertools import json @@ -25,7 +27,7 @@ from .cache import Cache from .compat import urllib # isort: split from .compat import compat_os_name, compat_shlex_quote -from .cookies import load_cookies +from .cookies import LenientSimpleCookie, load_cookies from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name from .downloader.rtmp import rtmpdump_version from .extractor import gen_extractor_classes, get_info_extractor @@ -673,6 +675,9 @@ def process_color_policy(stream): if auto_init and auto_init != 'no_verbose_header': self.print_debug_header() + self.__header_cookies = [] + self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False)) # compat + def check_deprecated(param, option, suggestion): if self.params.get(param) is not None: self.report_warning(f'{option} is deprecated. Use {suggestion} instead') @@ -1625,8 +1630,60 @@ def progress(msg): self.to_screen('') raise + def _load_cookies(self, data, *, from_headers=True): + """Loads cookies from a `Cookie` header + + This tries to work around the security vulnerability of passing cookies to every domain. + See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj + The unscoped cookies are saved for later to be stored in the jar with a limited scope. + + @param data The Cookie header as string to load the cookies from + @param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required) + """ + for cookie in LenientSimpleCookie(data).values(): + if from_headers and any(cookie.values()): + raise ValueError('Invalid syntax in Cookie Header') + + domain = cookie.get('domain') or '' + expiry = cookie.get('expires') + if expiry == '': # 0 is valid + expiry = None + prepared_cookie = http.cookiejar.Cookie( + cookie.get('version') or 0, cookie.key, cookie.value, None, False, + domain, True, True, cookie.get('path') or '', bool(cookie.get('path')), + cookie.get('secure') or False, expiry, False, None, None, {}) + + if domain: + self.cookiejar.set_cookie(prepared_cookie) + elif from_headers: + self.deprecated_feature( + 'Passing cookies as a header is a potential security risk; ' + 'they will be scoped to the domain of the downloaded urls. ' + 'Please consider loading cookies from a file or browser instead.') + self.__header_cookies.append(prepared_cookie) + else: + self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping', + tb=False, is_error=False) + + def _apply_header_cookies(self, url): + """Applies stray header cookies to the provided url + + This loads header cookies and scopes them to the domain provided in `url`. + While this is not ideal, it helps reduce the risk of them being sent + to an unintended destination while mostly maintaining compatibility. + """ + parsed = urllib.parse.urlparse(url) + if not parsed.hostname: + return + + for cookie in map(copy.copy, self.__header_cookies): + cookie.domain = f'.{parsed.hostname}' + self.cookiejar.set_cookie(cookie) + @_handle_extraction_exceptions def __extract_info(self, url, ie, download, extra_info, process): + self._apply_header_cookies(url) + try: ie_result = ie.extract(url) except UserNotLive as e: @@ -2414,9 +2471,24 @@ def _calc_headers(self, info_dict): if 'Youtubedl-No-Compression' in res: # deprecated res.pop('Youtubedl-No-Compression', None) res['Accept-Encoding'] = 'identity' - cookies = self.cookiejar.get_cookie_header(info_dict['url']) + cookies = self.cookiejar.get_cookies_for_url(info_dict['url']) if cookies: - res['Cookie'] = cookies + encoder = LenientSimpleCookie() + values = [] + for cookie in cookies: + _, value = encoder.value_encode(cookie.value) + values.append(f'{cookie.name}={value}') + if cookie.domain: + values.append(f'Domain={cookie.domain}') + if cookie.path: + values.append(f'Path={cookie.path}') + if cookie.secure: + values.append('Secure') + if cookie.expires: + values.append(f'Expires={cookie.expires}') + if cookie.version: + values.append(f'Version={cookie.version}') + info_dict['cookies'] = '; '.join(values) if 'X-Forwarded-For' not in res: x_forwarded_for_ip = info_dict.get('__x_forwarded_for_ip') @@ -3423,6 +3495,8 @@ def download_with_info_file(self, info_filename): infos = [self.sanitize_info(info, self.params.get('clean_infojson', True)) for info in variadic(json.loads('\n'.join(f)))] for info in infos: + self._load_cookies(info.get('cookies'), from_headers=False) + self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False)) # compat try: self.__download_wrapper(self.process_ie_result)(info, download=True) except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e: diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py index 8fe9d9993..2c404ee90 100644 --- a/yt_dlp/downloader/common.py +++ b/yt_dlp/downloader/common.py @@ -32,6 +32,7 @@ timetuple_from_msec, try_call, ) +from ..utils.traversal import traverse_obj class FileDownloader: @@ -419,7 +420,6 @@ def download(self, filename, info_dict, subtitle=False): """Download to a filename using the info from info_dict Return True on success and False otherwise """ - nooverwrites_and_exists = ( not self.params.get('overwrites', True) and os.path.exists(encodeFilename(filename)) @@ -453,6 +453,11 @@ def download(self, filename, info_dict, subtitle=False): self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...') time.sleep(sleep_interval) + # Filter the `Cookie` header from the info_dict to prevent leaks. + # See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj + info_dict['http_headers'] = dict(traverse_obj(info_dict, ( + 'http_headers', {dict.items}, lambda _, pair: pair[0].lower() != 'cookie'))) or None + ret = self.real_download(filename, info_dict) self._finish_multiline_status() return ret, True