This commit is contained in:
Simon Sawicki 2024-04-28 16:15:31 +05:30 committed by GitHub
commit b68425c5f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 187 additions and 0 deletions

View File

@ -2500,3 +2500,4 @@ from .zingmp3 import (
)
from .zoom import ZoomIE
from .zype import ZypeIE
from .lazy import LazyExtractorIE

33
yt_dlp/extractor/lazy.py Normal file
View File

@ -0,0 +1,33 @@
from .common import InfoExtractor
from ..utils.lazy import lazy_ie, lazy_fields
@lazy_ie
class LazyExtractorIE(InfoExtractor):
IE_NAME = 'lazy'
_VALID_URL = r"lazy://(?P<id>.*)"
def _lazy_webpage(self, storage):
return self._download_webpage(storage.url, storage.id)
@lazy_fields("creator")
def _extract_other(self, storage):
self.to_screen("Extracting something else from webpage")
return {
"creator": storage.webpage.partition(" - ")[0],
}
@lazy_fields("title", "description")
def _extract_website(self, storage):
self.to_screen("Extracting title and description from webpage")
title, _, description = storage.webpage.partition("\n")
return {
"title": title,
"description": description,
}
# Fake downloading the webpage for testing purposes
def _download_webpage(self, url_or_request, video_id, *args, **kwargs):
self.to_screen(f"[{video_id}] Downloaded webpage ({url_or_request})")
return "<creator> - Fake Webpage title\nThis is the description.\n..."

153
yt_dlp/utils/lazy.py Normal file
View File

@ -0,0 +1,153 @@
from __future__ import annotations
import functools
from collections.abc import MutableMapping
from ..utils import try_call
from ..extractor.common import InfoExtractor
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Callable, Any
class _LazyStorage:
def __init__(self, ie, **kwargs):
self._ie = ie
self._cache = kwargs
def __setattr__(self, name, value, /) -> None:
if name.startswith("_"):
super().__setattr__(name, value)
else:
self._cache[name] = value
def __getattr__(self, name: str):
if name in self._cache:
return self._cache[name]
resolver = getattr(self._ie, f"_lazy_{name}")
result = try_call(resolver, args=(self,))
self._cache[name] = result
return result
def __delattr__(self, name: str) -> None:
if name.startswith("_"):
super().__delattr__(name)
elif name in self._cache:
del self._cache[name]
class _LazyInfoDict(MutableMapping):
def __init__(self, data: dict, lazy: dict, ie: InfoExtractor, **kwargs):
self._data = data
self._lazy = lazy
self._ie = ie
self._storage = _LazyStorage(self._ie, **kwargs)
for key in self._data.keys() & self._lazy.keys():
del self._lazy[key]
self._data.update(dict.fromkeys(self._lazy.keys()))
def __contains__(self, key):
return key in self._data
def __getitem__(self, key):
if key in self._lazy:
compute_func = self._lazy[key]
# updates = try_call(compute_func, args=(self._storage,), expected_type=dict) or {}
updates = compute_func(self._ie, self._storage)
self._data.update(updates)
for field in updates:
self._lazy.pop(field, None)
fields = getattr(compute_func, lazy_fields._field_name, None) or ()
for field in fields:
self._lazy.pop(field, None)
return self._data[key]
def __setitem__(self, key, value):
if key in self._lazy:
del self._lazy[key]
self._data[key] = value
def __delitem__(self, key):
if key in self._lazy:
del self._lazy[key]
del self._data[key]
def __iter__(self):
return iter(self._data)
def __len__(self):
return len(self._data)
def __repr__(self):
if self._lazy:
lazy = ", ".join(f"{key!r}: ..." for key in self._lazy.keys())
data = ", ".join(f"{key!r}: {value!r}" for key, value in self._data.items() if key not in self._lazy)
data = f"{{{data}}}, lazy={{{lazy}}}"
else:
data = f"{self._data!r}"
return f"{type(self).__name__}({data})"
def _default_lazy_extract(self, url):
return dict(id=self._match_id(url))
def lazy_ie(klass: type[InfoExtractor] | None = None, /):
if not klass:
return lazy_ie
_old_extract = klass._real_extract
if _old_extract is InfoExtractor._real_extract:
_old_extract = _default_lazy_extract
lazy_members = {}
for name in dir(klass):
if not name.startswith("_"):
continue
func = getattr(klass, name)
fields = getattr(func, lazy_fields._field_name, None)
if not isinstance(fields, tuple):
continue
for field in fields:
lazy_members[field] = func
@functools.wraps(klass._real_extract)
def _real_extract(self, url):
result = _old_extract(self, url)
assert isinstance(result, dict), 'Lazy extractors need to return a dict'
return _LazyInfoDict(result, lazy_members, self, url=url, **result)
klass._real_extract = _real_extract
return klass
def lazy_fields(*fields: str) -> Callable[[Callable[[Any, _LazyStorage], dict[str, Any]]], Callable[[Any, _LazyStorage], dict[str, Any]]]:
def _lazy_fields(func):
setattr(func, lazy_fields._field_name, fields)
return func
return _lazy_fields
lazy_fields._field_name = "_lazy_fields"
if __name__ == '__main__':
from yt_dlp import YoutubeDL
with YoutubeDL() as ydl:
result = ydl.extract_info("lazy://<URL>", process=False)
assert result
for name in "id", "title", "creator", "description":
print(f"{name:<10} = {result[name]!r}")