0
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2024-12-22 06:00:00 +00:00

[dependencies] Simplify Cryptodome

Closes #6292, closes #6272, closes #6338
This commit is contained in:
pukkandan 2023-02-28 23:10:54 +05:30
parent b059188383
commit 65f6e80780
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39
11 changed files with 52 additions and 64 deletions

View file

@ -48,7 +48,7 @@ def test_cbc_decrypt(self):
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd' data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv)) decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if Cryptodome: if Cryptodome.AES:
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv)) decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
@ -78,7 +78,7 @@ def test_gcm_decrypt(self):
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify( decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12])) bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if Cryptodome: if Cryptodome.AES:
decrypted = aes_gcm_decrypt_and_verify_bytes( decrypted = aes_gcm_decrypt_and_verify_bytes(
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12])) data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)

View file

@ -1,30 +1,8 @@
import ast
import os
import sys import sys
from pathlib import Path
from PyInstaller.utils.hooks import collect_submodules from PyInstaller.utils.hooks import collect_submodules
def find_attribute_accesses(node, name, path=()):
if isinstance(node, ast.Attribute):
path = [*path, node.attr]
if isinstance(node.value, ast.Name) and node.value.id == name:
yield path[::-1]
for child in ast.iter_child_nodes(node):
yield from find_attribute_accesses(child, name, path)
def collect_used_submodules(name, level):
for dirpath, _, filenames in os.walk(Path(__file__).parent.parent):
for filename in filenames:
if not filename.endswith('.py'):
continue
with open(Path(dirpath) / filename, encoding='utf8') as f:
for submodule in find_attribute_accesses(ast.parse(f.read()), name):
yield '.'.join(submodule[:level])
def pycryptodome_module(): def pycryptodome_module():
try: try:
import Cryptodome # noqa: F401 import Cryptodome # noqa: F401
@ -41,12 +19,8 @@ def pycryptodome_module():
def get_hidden_imports(): def get_hidden_imports():
yield 'yt_dlp.compat._legacy' yield 'yt_dlp.compat._legacy'
yield pycryptodome_module()
yield from collect_submodules('websockets') yield from collect_submodules('websockets')
crypto = pycryptodome_module()
for sm in set(collect_used_submodules('Cryptodome', 2)):
yield f'{crypto}.{sm}'
# These are auto-detected, but explicitly add them just in case # These are auto-detected, but explicitly add them just in case
yield from ('mutagen', 'brotli', 'certifi') yield from ('mutagen', 'brotli', 'certifi')

View file

@ -5,14 +5,14 @@
from .dependencies import Cryptodome from .dependencies import Cryptodome
from .utils import bytes_to_intlist, intlist_to_bytes from .utils import bytes_to_intlist, intlist_to_bytes
if Cryptodome: if Cryptodome.AES:
def aes_cbc_decrypt_bytes(data, key, iv): def aes_cbc_decrypt_bytes(data, key, iv):
""" Decrypt bytes with AES-CBC using pycryptodome """ """ Decrypt bytes with AES-CBC using pycryptodome """
return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_CBC, iv).decrypt(data) return Cryptodome.AES.new(key, Cryptodome.AES.MODE_CBC, iv).decrypt(data)
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce): def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
""" Decrypt bytes with AES-GCM using pycryptodome """ """ Decrypt bytes with AES-GCM using pycryptodome """
return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag) return Cryptodome.AES.new(key, Cryptodome.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
else: else:
def aes_cbc_decrypt_bytes(data, key, iv): def aes_cbc_decrypt_bytes(data, key, iv):

View file

@ -32,9 +32,9 @@
from . import compat_expanduser, compat_HTMLParseError, compat_realpath from . import compat_expanduser, compat_HTMLParseError, compat_realpath
from .compat_utils import passthrough_module from .compat_utils import passthrough_module
from ..dependencies import Cryptodome_AES as compat_pycrypto_AES # noqa: F401
from ..dependencies import brotli as compat_brotli # noqa: F401 from ..dependencies import brotli as compat_brotli # noqa: F401
from ..dependencies import websockets as compat_websockets # noqa: F401 from ..dependencies import websockets as compat_websockets # noqa: F401
from ..dependencies.Cryptodome import AES as compat_pycrypto_AES # noqa: F401
passthrough_module(__name__, '...utils', ('WINDOWS_VT_MODE', 'windows_enable_vt_mode')) passthrough_module(__name__, '...utils', ('WINDOWS_VT_MODE', 'windows_enable_vt_mode'))

View file

@ -48,7 +48,7 @@ def passthrough_module(parent, child, allowed_attributes=(..., ), *, callback=la
"""Passthrough parent module into a child module, creating the parent if necessary""" """Passthrough parent module into a child module, creating the parent if necessary"""
def __getattr__(attr): def __getattr__(attr):
if _is_package(parent): if _is_package(parent):
with contextlib.suppress(ImportError): with contextlib.suppress(ModuleNotFoundError):
return importlib.import_module(f'.{attr}', parent.__name__) return importlib.import_module(f'.{attr}', parent.__name__)
ret = from_child(attr) ret = from_child(attr)

View file

@ -1,8 +1,5 @@
import types import types
from ..compat import functools
from ..compat.compat_utils import passthrough_module
try: try:
import Cryptodome as _parent import Cryptodome as _parent
except ImportError: except ImportError:
@ -12,19 +9,36 @@
_parent = types.ModuleType('no_Cryptodome') _parent = types.ModuleType('no_Cryptodome')
__bool__ = lambda: False __bool__ = lambda: False
passthrough_module(__name__, _parent, (..., '__version__')) __version__ = ''
del passthrough_module AES = PKCS1_v1_5 = Blowfish = PKCS1_OAEP = SHA1 = CMAC = RSA = None
try:
if _parent.__name__ == 'Cryptodome':
@property from Cryptodome import __version__
@functools.cache from Cryptodome.Cipher import AES
def _yt_dlp__identifier(): from Cryptodome.Cipher import PKCS1_v1_5
if _parent.__name__ == 'Crypto': from Cryptodome.Cipher import Blowfish
from Cryptodome.Cipher import PKCS1_OAEP
from Cryptodome.Hash import SHA1
from Cryptodome.Hash import CMAC
from Cryptodome.PublicKey import RSA
elif _parent.__name__ == 'Crypto':
from Crypto import __version__
from Crypto.Cipher import AES from Crypto.Cipher import AES
try: from Crypto.Cipher import PKCS1_v1_5
# In pycrypto, mode defaults to ECB. See: from Crypto.Cipher import Blowfish
# https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode from Crypto.Cipher import PKCS1_OAEP
AES.new(b'abcdefghijklmnop') from Crypto.Hash import SHA1
except TypeError: from Crypto.Hash import CMAC
return 'pycrypto' from Crypto.PublicKey import RSA
return _parent.__name__ except ImportError:
__version__ = f'broken {__version__}'.strip()
_yt_dlp__identifier = _parent.__name__
if AES and _yt_dlp__identifier == 'Crypto':
try:
# In pycrypto, mode defaults to ECB. See:
# https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode
AES.new(b'abcdefghijklmnop')
except TypeError:
_yt_dlp__identifier = 'pycrypto'

View file

@ -73,7 +73,7 @@
# Deprecated # Deprecated
Cryptodome_AES = Cryptodome.Cipher.AES if Cryptodome else None Cryptodome_AES = Cryptodome.AES
__all__ = [ __all__ = [

View file

@ -70,7 +70,7 @@ def real_download(self, filename, info_dict):
can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None
if can_download: if can_download:
has_ffmpeg = FFmpegFD.available() has_ffmpeg = FFmpegFD.available()
no_crypto = not Cryptodome and '#EXT-X-KEY:METHOD=AES-128' in s no_crypto = not Cryptodome.AES and '#EXT-X-KEY:METHOD=AES-128' in s
if no_crypto and has_ffmpeg: if no_crypto and has_ffmpeg:
can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available' can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available'
elif no_crypto: elif no_crypto:

View file

@ -894,15 +894,15 @@ def _parse_video_metadata(self, video_data):
} }
def _perform_login(self, username, password): def _perform_login(self, username, password):
if not Cryptodome: if not Cryptodome.RSA:
raise ExtractorError('pycryptodomex not found. Please install', expected=True) raise ExtractorError('pycryptodomex not found. Please install', expected=True)
key_data = self._download_json( key_data = self._download_json(
'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None, 'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None,
note='Downloading login key', errnote='Unable to download login key')['data'] note='Downloading login key', errnote='Unable to download login key')['data']
public_key = Cryptodome.PublicKey.RSA.importKey(key_data['key']) public_key = Cryptodome.RSA.importKey(key_data['key'])
password_hash = Cryptodome.Cipher.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8')) password_hash = Cryptodome.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8'))
login_post = self._download_json( login_post = self._download_json(
'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({ 'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({
'username': username, 'username': username,

View file

@ -91,7 +91,7 @@ def _real_extract(self, url):
for site in (353, 183): for site in (353, 183):
content_data = (data % site).encode() content_data = (data % site).encode()
if site == 353: if site == 353:
if not Cryptodome: if not Cryptodome.CMAC:
continue continue
timestamp = (self._download_json( timestamp = (self._download_json(
@ -105,8 +105,8 @@ def _real_extract(self, url):
query = { query = {
'ts': timestamp, 'ts': timestamp,
'sign': Cryptodome.Hash.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data, 'sign': Cryptodome.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data,
Cryptodome.Cipher.Blowfish).hexdigest(), Cryptodome.Blowfish).hexdigest(),
} }
else: else:
query = {} query = {}
@ -126,7 +126,7 @@ def _real_extract(self, url):
extractor_msg = 'Video %s does not exist' extractor_msg = 'Video %s does not exist'
elif site == 353: elif site == 353:
continue continue
elif not Cryptodome: elif not Cryptodome.CMAC:
raise ExtractorError('pycryptodomex not found. Please install', expected=True) raise ExtractorError('pycryptodomex not found. Please install', expected=True)
elif message: elif message:
extractor_msg += ': ' + message extractor_msg += ': ' + message

View file

@ -50,10 +50,10 @@ def _call_api(self, video_id, param='', msg='API', auth=True, data=None, query={
data=data, headers=headers, query=query, fatal=fatal) data=data, headers=headers, query=query, fatal=fatal)
def _call_encrypted_api(self, video_id, param='', msg='API', data={}, query={}, fatal=True): def _call_encrypted_api(self, video_id, param='', msg='API', data={}, query={}, fatal=True):
if not Cryptodome: if not Cryptodome.RSA:
raise ExtractorError('pycryptodomex not found. Please install', expected=True) raise ExtractorError('pycryptodomex not found. Please install', expected=True)
private_key = Cryptodome.PublicKey.RSA.generate(2048) private_key = Cryptodome.RSA.generate(2048)
cipher = Cryptodome.Cipher.PKCS1_OAEP.new(private_key, hashAlgo=Cryptodome.Hash.SHA1) cipher = Cryptodome.PKCS1_OAEP.new(private_key, hashAlgo=Cryptodome.SHA1)
def decrypt(data): def decrypt(data):
if not data: if not data: