Updated yt_dlp version; added extremly basic dumb cache setup in thumbnailer; moved build and script as well as deb folder to build

This commit is contained in:
2026-01-07 17:34:32 -06:00
parent f58bc53c24
commit 5c808c579a
243 changed files with 6397 additions and 5957 deletions

View File

@@ -104,6 +104,7 @@ INNERTUBE_CLIENTS = {
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 1,
'SUPPORTS_COOKIES': True,
'SUPPORTS_AD_PLAYBACK_CONTEXT': True,
**WEB_PO_TOKEN_POLICIES,
},
# Safari UA returns pre-merged video+audio 144p/240p/360p/720p/1080p HLS formats
@@ -117,6 +118,7 @@ INNERTUBE_CLIENTS = {
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 1,
'SUPPORTS_COOKIES': True,
'SUPPORTS_AD_PLAYBACK_CONTEXT': True,
**WEB_PO_TOKEN_POLICIES,
},
'web_embedded': {
@@ -157,6 +159,7 @@ INNERTUBE_CLIENTS = {
),
},
'SUPPORTS_COOKIES': True,
'SUPPORTS_AD_PLAYBACK_CONTEXT': True,
},
# This client now requires sign-in for every video
'web_creator': {
@@ -313,6 +316,7 @@ INNERTUBE_CLIENTS = {
),
},
'SUPPORTS_COOKIES': True,
'SUPPORTS_AD_PLAYBACK_CONTEXT': True,
},
'tv': {
'INNERTUBE_CONTEXT': {
@@ -327,6 +331,17 @@ INNERTUBE_CLIENTS = {
# See: https://github.com/youtube/cobalt/blob/main/cobalt/browser/user_agent/user_agent_platform_info.cc#L506
'AUTHENTICATED_USER_AGENT': 'Mozilla/5.0 (ChromiumStylePlatform) Cobalt/25.lts.30.1034943-gold (unlike Gecko), Unknown_TV_Unknown_0/Unknown (Unknown, Unknown)',
},
'tv_downgraded': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'TVHTML5',
'clientVersion': '5.20251105',
'userAgent': 'Mozilla/5.0 (ChromiumStylePlatform) Cobalt/Version',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 7,
'SUPPORTS_COOKIES': True,
},
'tv_simply': {
'INNERTUBE_CONTEXT': {
'client': {
@@ -380,11 +395,15 @@ def short_client_name(client_name):
return join_nonempty(main[:4], ''.join(x[0] for x in parts)).upper()
def build_innertube_clients():
THIRD_PARTY = {
def _fix_embedded_ytcfg(ytcfg):
ytcfg['INNERTUBE_CONTEXT'].setdefault('thirdParty', {}).update({
'embedUrl': 'https://www.youtube.com/', # Can be any valid URL
}
BASE_CLIENTS = ('ios', 'web', 'tv', 'mweb', 'android')
})
def build_innertube_clients():
# From highest to lowest priority
BASE_CLIENTS = ('tv', 'web', 'mweb', 'android', 'ios')
priority = qualities(BASE_CLIENTS[::-1])
for client, ytcfg in tuple(INNERTUBE_CLIENTS.items()):
@@ -397,6 +416,7 @@ def build_innertube_clients():
ytcfg.setdefault('SUBS_PO_TOKEN_POLICY', SubsPoTokenPolicy())
ytcfg.setdefault('REQUIRE_AUTH', False)
ytcfg.setdefault('SUPPORTS_COOKIES', False)
ytcfg.setdefault('SUPPORTS_AD_PLAYBACK_CONTEXT', False)
ytcfg.setdefault('PLAYER_PARAMS', None)
ytcfg.setdefault('AUTHENTICATED_USER_AGENT', None)
ytcfg['INNERTUBE_CONTEXT']['client'].setdefault('hl', 'en')
@@ -405,10 +425,7 @@ def build_innertube_clients():
ytcfg['priority'] = 10 * priority(base_client)
if variant == 'embedded':
ytcfg['INNERTUBE_CONTEXT']['thirdParty'] = THIRD_PARTY
ytcfg['priority'] -= 2
elif variant:
ytcfg['priority'] -= 3
_fix_embedded_ytcfg(ytcfg)
build_innertube_clients()
@@ -991,6 +1008,10 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
ytcfg = self.extract_ytcfg(video_id, webpage) or {}
# See https://github.com/yt-dlp/yt-dlp/issues/14826
if _split_innertube_client(client)[2] == 'embedded':
_fix_embedded_ytcfg(ytcfg)
# Workaround for https://github.com/yt-dlp/yt-dlp/issues/12563
# But it's not effective when logged-in
if client == 'tv' and not self.is_authenticated:
@@ -1044,7 +1065,7 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
return next_continuation
return traverse_obj(renderer, (
('contents', 'items', 'rows'), ..., 'continuationItemRenderer',
('contents', 'items', 'rows', 'subThreads'), ..., 'continuationItemRenderer',
('continuationEndpoint', ('button', 'buttonRenderer', 'command')),
), get_all=False, expected_type=cls._extract_continuation_ep_data)

View File

@@ -340,8 +340,9 @@ class YoutubeTabBaseInfoExtractor(YoutubeBaseInfoExtractor):
thumbnails=self._extract_thumbnails(view_model, (
'contentImage', *thumb_keys, 'thumbnailViewModel', 'image'), final_key='sources'),
duration=traverse_obj(view_model, (
'contentImage', 'thumbnailViewModel', 'overlays', ..., 'thumbnailOverlayBadgeViewModel',
'thumbnailBadges', ..., 'thumbnailBadgeViewModel', 'text', {parse_duration}, any)),
'contentImage', 'thumbnailViewModel', 'overlays', ...,
(('thumbnailBottomOverlayViewModel', 'badges'), ('thumbnailOverlayBadgeViewModel', 'thumbnailBadges')),
..., 'thumbnailBadgeViewModel', 'text', {parse_duration}, any)),
timestamp=(traverse_obj(view_model, (
'metadata', 'lockupMetadataViewModel', 'metadata', 'contentMetadataViewModel', 'metadataRows',
..., 'metadataParts', ..., 'text', 'content', {lambda t: self._parse_time_text(t, report_failure=False)}, any))
@@ -381,7 +382,8 @@ class YoutubeTabBaseInfoExtractor(YoutubeBaseInfoExtractor):
('accessibilityText', {lambda x: re.fullmatch(r'(.+), (?:[\d,.]+(?:[KM]| million)?|No) views? - play Short', x)}, 1)), any),
'view_count': ('overlayMetadata', 'secondaryText', 'content', {parse_count}),
}),
thumbnails=self._extract_thumbnails(renderer, 'thumbnail', final_key='sources'))
thumbnails=self._extract_thumbnails(
renderer, ('thumbnailViewModel', 'thumbnailViewModel', 'image'), final_key='sources'))
return
def _video_entry(self, video_renderer):
@@ -1584,7 +1586,6 @@ class YoutubeTabIE(YoutubeTabBaseInfoExtractor):
'playlist_count': 50,
'expected_warnings': ['YouTube Music is not directly supported'],
}, {
# TODO: fix test suite, 208163447408c78673b08c172beafe5c310fb167 broke this test
'note': 'unlisted single video playlist',
'url': 'https://www.youtube.com/playlist?list=PLt5yu3-wZAlQLfIN0MMgp0wVV6MP3bM4_',
'info_dict': {
@@ -1884,8 +1885,6 @@ class YoutubeTabIE(YoutubeTabBaseInfoExtractor):
'playlist_mincount': 30,
}, {
# Shorts url result in shorts tab
# TODO: Fix channel id extraction
# TODO: fix test suite, 208163447408c78673b08c172beafe5c310fb167 broke this test
'url': 'https://www.youtube.com/channel/UCiu-3thuViMebBjw_5nWYrA/shorts',
'info_dict': {
'id': 'UCiu-3thuViMebBjw_5nWYrA',
@@ -1914,7 +1913,6 @@ class YoutubeTabIE(YoutubeTabBaseInfoExtractor):
'params': {'extract_flat': True},
}, {
# Live video status should be extracted
# TODO: fix test suite, 208163447408c78673b08c172beafe5c310fb167 broke this test
'url': 'https://www.youtube.com/channel/UCQvWX73GQygcwXOTSf_VDVg/live',
'info_dict': {
'id': 'UCQvWX73GQygcwXOTSf_VDVg',

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,132 @@
# YoutubeIE JS Challenge Provider Framework
As part of the YouTube extractor, we have a framework for solving n/sig JS Challenges programmatically. This can be used by plugins.
> [!TIP]
> If publishing a JS Challenge Provider plugin to GitHub, add the [yt-dlp-jsc-provider](https://github.com/topics/yt-dlp-jsc-provider) topic to your repository to help users find it.
## Public APIs
- `yt_dlp.extractor.youtube.jsc.provider`
Everything else is **internal-only** and no guarantees are made about the API stability.
> [!WARNING]
> We will try our best to maintain stability with the public APIs.
> However, due to the nature of extractors and YouTube, we may need to remove or change APIs in the future.
> If you are using these APIs outside yt-dlp plugins, please account for this by importing them safely.
## JS Challenge Provider
`yt_dlp.extractor.youtube.jsc.provider`
```python
from yt_dlp.extractor.youtube.jsc.provider import (
register_provider,
register_preference,
JsChallengeProvider,
JsChallengeRequest,
JsChallengeResponse,
JsChallengeProviderError,
JsChallengeProviderRejectedRequest,
JsChallengeType,
JsChallengeProviderResponse,
NChallengeOutput,
)
from yt_dlp.utils import traverse_obj, Popen
import json
import subprocess
import typing
@register_provider
class MyJsChallengeProviderJCP(JsChallengeProvider): # Provider class name must end with "JCP"
PROVIDER_VERSION = '0.2.1'
# Define a unique display name for the provider
PROVIDER_NAME = 'my-provider'
BUG_REPORT_LOCATION = 'https://issues.example.com/report'
# Set supported challenge types.
# If None, the provider will handle all types.
_SUPPORTED_TYPES = [JsChallengeType.N]
def is_available(self) -> bool:
"""
Check if the provider is available (e.g. all required dependencies are available)
This is used to determine if the provider should be used and to provide debug information.
IMPORTANT: This method SHOULD NOT make any network requests or perform any expensive operations.
Since this is called multiple times, we recommend caching the result.
"""
return True
def close(self):
# Optional close hook, called when YoutubeDL is closed.
pass
def _real_bulk_solve(self, requests: list[JsChallengeRequest]) -> typing.Generator[JsChallengeProviderResponse, None, None]:
# If you need to do additional validation on the requests.
# Raise yt_dlp.extractor.youtube.jsc.provider.JsChallengeProviderRejectedRequest if the request is not supported.
if len("something") > 255:
raise JsChallengeProviderRejectedRequest('Challenges longer than 255 are not supported', expected=True)
# Settings are pulled from extractor args passed to yt-dlp with the key `youtubejsc-<PROVIDER_KEY>`.
# For this example, the extractor arg would be:
# `--extractor-args "youtubejsc-myjschallengeprovider:bin_path=/path/to/bin"`
bin_path = self._configuration_arg(
'bin_path', default=['/path/to/bin'])[0]
# See below for logging guidelines
self.logger.trace(f'Using bin path: {bin_path}')
for request in requests:
# You can use the _get_player method to get the player JS code if needed.
# This shares the same caching as the YouTube extractor, so it will not make unnecessary requests.
player_js = self._get_player(request.video_id, request.input.player_url)
cmd = f'{bin_path} {request.input.challenges} {player_js}'
self.logger.info(f'Executing command: {cmd}')
stdout, _, ret = Popen.run(cmd, text=True, shell=True, stdout=subprocess.PIPE)
if ret != 0:
# If there is an error, raise JsChallengeProviderError.
# The request will be sent to the next provider if there is one.
# You can specify whether it is expected or not. If it is unexpected,
# the log will include a link to the bug report location (BUG_REPORT_LOCATION).
# raise JsChallengeProviderError(f'Command returned error code {ret}', expected=False)
# You can also only fail this specific request by returning a JsChallengeProviderResponse with the error.
# This will allow other requests to be processed by this provider.
yield JsChallengeProviderResponse(
request=request,
error=JsChallengeProviderError(f'Command returned error code {ret}', expected=False)
)
yield JsChallengeProviderResponse(
request=request,
response=JsChallengeResponse(
type=JsChallengeType.N,
output=NChallengeOutput(results=traverse_obj(json.loads(stdout))),
))
# If there are multiple JS Challenge Providers that can handle the same JsChallengeRequest(s),
# you can define a preference function to increase/decrease the priority of providers.
@register_preference(MyJsChallengeProviderJCP)
def my_provider_preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 50
```
## Logging Guidelines
- Use the `self.logger` object to log messages.
- When making HTTP requests or any other time-expensive operation, use `self.logger.info` to log a message to standard non-verbose output.
- This lets users know what is happening when a time-expensive operation is taking place.
- Technical information such as a command being executed should be logged to `self.logger.debug`
- Use `self.logger.trace` for very detailed information that is only useful for debugging to avoid cluttering the debug log.
## Debugging
- Use `-v --extractor-args "youtube:jsc_trace=true"` to enable JS Challenge debug output.

View File

@@ -0,0 +1,5 @@
# Trigger import of built-in providers
from ._builtin.bun import BunJCP as _BunJCP # noqa: F401
from ._builtin.deno import DenoJCP as _DenoJCP # noqa: F401
from ._builtin.node import NodeJCP as _NodeJCP # noqa: F401
from ._builtin.quickjs import QuickJSJCP as _QuickJSJCP # noqa: F401

View File

@@ -0,0 +1,150 @@
from __future__ import annotations
import os
import re
import shlex
import subprocess
import urllib.parse
from yt_dlp.extractor.youtube.jsc._builtin.ejs import (
_EJS_WIKI_URL,
EJSBaseJCP,
Script,
ScriptSource,
ScriptType,
ScriptVariant,
)
from yt_dlp.extractor.youtube.jsc._builtin.vendor import load_script
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot.provider import provider_bug_report_message
from yt_dlp.utils import Popen
from yt_dlp.utils.networking import HTTPHeaderDict, clean_proxies
# KNOWN ISSUES:
# - If node_modules is present and includes a requested lib, the version we request is ignored
# and whatever installed in node_modules is used.
# - No way to ignore existing node_modules, lock files, etc.
# - No sandboxing options available
# - Cannot detect if npm packages are cached without potentially downloading them.
# `--no-install` appears to disable the cache.
# - npm auto-install may fail with an integrity error when using HTTP proxies
# - npm auto-install HTTP proxy support may be limited on older Bun versions
# - Cannot disable the transpiler / specify lang for stdin
@register_provider
class BunJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'bun'
JS_RUNTIME_NAME = 'bun'
BUN_NPM_LIB_FILENAME = 'yt.solver.bun.lib.js'
SUPPORTED_PROXY_SCHEMES = ['http', 'https']
def _iter_script_sources(self):
yield from super()._iter_script_sources()
yield ScriptSource.BUILTIN, self._bun_npm_source
def _bun_npm_source(self, script_type: ScriptType, /):
if script_type != ScriptType.LIB:
return None
if 'ejs:npm' not in self.ie.get_param('remote_components', []):
return self._skip_component('ejs:npm')
# Check to see if the environment proxies are compatible with Bun npm source
if unsupported_scheme := self._check_env_proxies(self._get_env_options()):
self.logger.warning(
f'Bun NPM package downloads only support HTTP/HTTPS proxies; skipping remote NPM package downloads. '
f'Provide another distribution of the challenge solver script or use '
f'another JS runtime that supports "{unsupported_scheme}" proxies. '
f'For more information and alternatives, refer to {_EJS_WIKI_URL}')
return None
# Bun-specific lib scripts that uses Bun autoimport
# https://bun.com/docs/runtime/autoimport
error_hook = lambda e: self.logger.warning(
f'Failed to read bun challenge solver lib script: {e}{provider_bug_report_message(self)}')
code = load_script(
self.BUN_NPM_LIB_FILENAME, error_hook=error_hook)
if code:
return Script(script_type, ScriptVariant.BUN_NPM, ScriptSource.BUILTIN, self._SCRIPT_VERSION, code)
return None
def _check_env_proxies(self, env):
# check that the schemes of both HTTP_PROXY and HTTPS_PROXY are supported
for key in ('HTTP_PROXY', 'HTTPS_PROXY'):
proxy = env.get(key)
if not proxy:
continue
scheme = urllib.parse.urlparse(proxy).scheme.lower()
if scheme not in self.SUPPORTED_PROXY_SCHEMES:
return scheme
return None
def _get_env_options(self) -> dict[str, str]:
options = os.environ.copy() # pass through existing bun env vars
request_proxies = self.ie._downloader.proxies.copy()
clean_proxies(request_proxies, HTTPHeaderDict())
# Apply 'all' proxy first, then allow per-scheme overrides
if request_proxies.get('all') is not None:
options['HTTP_PROXY'] = options['HTTPS_PROXY'] = request_proxies['all']
for key, env in (('http', 'HTTP_PROXY'), ('https', 'HTTPS_PROXY')):
val = request_proxies.get(key)
if val is not None:
options[env] = val
if self.ie.get_param('nocheckcertificate'):
options['NODE_TLS_REJECT_UNAUTHORIZED'] = '0'
# Disable Bun transpiler cache
options['BUN_RUNTIME_TRANSPILER_CACHE_PATH'] = '0'
# Prevent segfault: <https://github.com/oven-sh/bun/issues/22901>
options.pop('JSC_useJIT', None)
if self.ejs_setting('jitless', ['false']) != ['false']:
options['BUN_JSC_useJIT'] = '0'
return options
def _run_js_runtime(self, stdin: str, /) -> str:
# https://bun.com/docs/cli/run
options = ['--no-addons', '--prefer-offline']
if self._lib_script.variant == ScriptVariant.BUN_NPM:
# Enable auto-install even if node_modules is present
options.append('--install=fallback')
else:
options.append('--no-install')
cmd = [self.runtime_info.path, '--bun', 'run', *options, '-']
self.logger.debug(f'Running bun: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._get_env_options(),
) as proc:
stdout, stderr = proc.communicate_or_kill(stdin)
stderr = self._clean_stderr(stderr)
if proc.returncode or stderr:
msg = f'Error running bun process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
return stdout
def _clean_stderr(self, stderr):
return '\n'.join(
line for line in stderr.splitlines()
if not re.match(r'^Bun v\d+\.\d+\.\d+ \([\w\s]+\)$', line))
@register_preference(BunJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 800

View File

@@ -0,0 +1,131 @@
from __future__ import annotations
import os
import re
import shlex
import subprocess
from yt_dlp.extractor.youtube.jsc._builtin.ejs import (
EJSBaseJCP,
Script,
ScriptSource,
ScriptType,
ScriptVariant,
)
from yt_dlp.extractor.youtube.jsc._builtin.vendor import load_script
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot.provider import provider_bug_report_message
from yt_dlp.utils import Popen, remove_terminal_sequences
from yt_dlp.utils.networking import HTTPHeaderDict, clean_proxies
# KNOWN ISSUES:
# - Can't avoid analysis cache: https://github.com/yt-dlp/yt-dlp/pull/14849#issuecomment-3475840821
@register_provider
class DenoJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'deno'
JS_RUNTIME_NAME = 'deno'
_DENO_BASE_OPTIONS = [
'--ext=js', '--no-code-cache', '--no-prompt', '--no-remote',
'--no-lock', '--node-modules-dir=none', '--no-config',
]
DENO_NPM_LIB_FILENAME = 'yt.solver.deno.lib.js'
_NPM_PACKAGES_CACHED = False
def _iter_script_sources(self):
yield from super()._iter_script_sources()
yield ScriptSource.BUILTIN, self._deno_npm_source
def _deno_npm_source(self, script_type: ScriptType, /):
if script_type != ScriptType.LIB:
return None
# Deno-specific lib scripts that use Deno NPM imports
error_hook = lambda e: self.logger.warning(
f'Failed to read deno challenge solver lib script: {e}{provider_bug_report_message(self)}')
code = load_script(
self.DENO_NPM_LIB_FILENAME, error_hook=error_hook)
if not code:
return None
if 'ejs:npm' not in self.ie.get_param('remote_components', []):
# We may still be able to continue if the npm packages are available/cached
self._NPM_PACKAGES_CACHED = self._npm_packages_cached(code)
if not self._NPM_PACKAGES_CACHED:
return self._skip_component('ejs:npm')
return Script(script_type, ScriptVariant.DENO_NPM, ScriptSource.BUILTIN, self._SCRIPT_VERSION, code)
def _npm_packages_cached(self, stdin: str) -> bool:
# Check if npm packages are cached, so we can run without --remote-components ejs:npm
self.logger.debug('Checking if npm packages are cached')
try:
self._run_deno(stdin, [*self._DENO_BASE_OPTIONS, '--cached-only'])
except JsChallengeProviderError as e:
self.logger.trace(f'Deno npm packages not cached: {e}')
return False
return True
def _run_js_runtime(self, stdin: str, /) -> str:
options = [*self._DENO_BASE_OPTIONS]
if self._lib_script.variant == ScriptVariant.DENO_NPM and self._NPM_PACKAGES_CACHED:
options.append('--cached-only')
elif self._lib_script.variant != ScriptVariant.DENO_NPM:
options.append('--no-npm')
options.append('--cached-only')
if self.ie.get_param('nocheckcertificate'):
options.append('--unsafely-ignore-certificate-errors')
# XXX: Convert this extractor-arg into a general option if/when a JSI framework is implemented
if self.ejs_setting('jitless', ['false']) != ['false']:
options.append('--v8-flags=--jitless')
return self._run_deno(stdin, options)
def _get_env_options(self) -> dict[str, str]:
options = os.environ.copy() # pass through existing deno env vars
request_proxies = self.ie._downloader.proxies.copy()
clean_proxies(request_proxies, HTTPHeaderDict())
# Apply 'all' proxy first, then allow per-scheme overrides
if 'all' in request_proxies and request_proxies['all'] is not None:
options['HTTP_PROXY'] = options['HTTPS_PROXY'] = request_proxies['all']
for key, env in (('http', 'HTTP_PROXY'), ('https', 'HTTPS_PROXY'), ('no', 'NO_PROXY')):
if key in request_proxies and request_proxies[key] is not None:
options[env] = request_proxies[key]
return options
def _run_deno(self, stdin, options) -> str:
cmd = [self.runtime_info.path, 'run', *options, '-']
self.logger.debug(f'Running deno: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._get_env_options(),
) as proc:
stdout, stderr = proc.communicate_or_kill(stdin)
stderr = self._clean_stderr(stderr)
if proc.returncode or stderr:
msg = f'Error running deno process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
return stdout
def _clean_stderr(self, stderr):
return '\n'.join(
line for line in stderr.splitlines()
if not (
re.match(r'^Download\s+https\S+$', remove_terminal_sequences(line))
or re.match(r'DANGER: TLS certificate validation is disabled for all hostnames', remove_terminal_sequences(line))))
@register_preference(DenoJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 1000

View File

@@ -0,0 +1,328 @@
from __future__ import annotations
import collections
import dataclasses
import enum
import functools
import hashlib
import json
from yt_dlp.dependencies import yt_dlp_ejs as _has_ejs
from yt_dlp.extractor.youtube.jsc._builtin import vendor
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeProviderRejectedRequest,
JsChallengeProviderResponse,
JsChallengeResponse,
JsChallengeType,
NChallengeOutput,
SigChallengeOutput,
)
from yt_dlp.extractor.youtube.pot._provider import configuration_arg
from yt_dlp.extractor.youtube.pot.provider import provider_bug_report_message
from yt_dlp.utils import version_tuple
from yt_dlp.utils._jsruntime import JsRuntimeInfo
if _has_ejs:
import yt_dlp_ejs.yt.solver
TYPE_CHECKING = False
if TYPE_CHECKING:
from collections.abc import Callable, Generator
from yt_dlp.extractor.youtube.jsc.provider import JsChallengeRequest
_EJS_WIKI_URL = 'https://github.com/yt-dlp/yt-dlp/wiki/EJS'
class ScriptType(enum.Enum):
LIB = 'lib'
CORE = 'core'
class ScriptVariant(enum.Enum):
UNKNOWN = 'unknown'
MINIFIED = 'minified'
UNMINIFIED = 'unminified'
DENO_NPM = 'deno_npm'
BUN_NPM = 'bun_npm'
class ScriptSource(enum.Enum):
PYPACKAGE = 'python package' # PyPI, PyInstaller exe, zipimport binary, etc
CACHE = 'cache' # GitHub release assets (cached)
WEB = 'web' # GitHub release assets (downloaded)
BUILTIN = 'builtin' # vendored (full core script; import-only lib script + NPM cache)
@dataclasses.dataclass
class Script:
type: ScriptType
variant: ScriptVariant
source: ScriptSource
version: str
code: str
@functools.cached_property
def hash(self, /) -> str:
return hashlib.sha3_512(self.code.encode()).hexdigest()
def __str__(self, /):
return f'<Script {self.type.value!r} v{self.version} (source: {self.source.value}) variant={self.variant.value!r} size={len(self.code)} hash={self.hash[:7]}...>'
class EJSBaseJCP(JsChallengeProvider):
JS_RUNTIME_NAME: str
_CACHE_SECTION = 'challenge-solver'
_REPOSITORY = 'yt-dlp/ejs'
_SUPPORTED_TYPES = [JsChallengeType.N, JsChallengeType.SIG]
_SCRIPT_VERSION = vendor.VERSION
# TODO: Integration tests for each kind of scripts source
_ALLOWED_HASHES = {
ScriptType.LIB: {
ScriptVariant.UNMINIFIED: vendor.HASHES['yt.solver.lib.js'],
ScriptVariant.MINIFIED: vendor.HASHES['yt.solver.lib.min.js'],
ScriptVariant.DENO_NPM: vendor.HASHES['yt.solver.deno.lib.js'],
ScriptVariant.BUN_NPM: vendor.HASHES['yt.solver.bun.lib.js'],
},
ScriptType.CORE: {
ScriptVariant.MINIFIED: vendor.HASHES['yt.solver.core.min.js'],
ScriptVariant.UNMINIFIED: vendor.HASHES['yt.solver.core.js'],
},
}
_SCRIPT_FILENAMES = {
ScriptType.LIB: 'yt.solver.lib.js',
ScriptType.CORE: 'yt.solver.core.js',
}
_MIN_SCRIPT_FILENAMES = {
ScriptType.LIB: 'yt.solver.lib.min.js',
ScriptType.CORE: 'yt.solver.core.min.js',
}
# currently disabled as files are large and we do not support rotation
_ENABLE_PREPROCESSED_PLAYER_CACHE = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._available = True
self.ejs_settings = self.ie.get_param('extractor_args', {}).get('youtube-ejs', {})
# Note: The following 3 args are for developer use only & intentionally not documented.
# - dev: bypasses verification of script hashes and versions.
# - repo: use a custom GitHub repository to fetch web script from.
# - script_version: use a custom script version.
# E.g. --extractor-args "youtube-ejs:dev=true;script_version=0.1.4"
self.is_dev = self.ejs_setting('dev', ['false'])[0] == 'true'
if self.is_dev:
self.report_dev_option('You have enabled dev mode for EJS JCP Providers.')
custom_repo = self.ejs_setting('repo', [None])[0]
if custom_repo:
self.report_dev_option(f'You have set a custom GitHub repository for EJS JCP Providers ({custom_repo}).')
self._REPOSITORY = custom_repo
custom_version = self.ejs_setting('script_version', [None])[0]
if custom_version:
self.report_dev_option(f'You have set a custom EJS script version for EJS JCP Providers ({custom_version}).')
self._SCRIPT_VERSION = custom_version
def ejs_setting(self, key, *args, **kwargs):
return configuration_arg(self.ejs_settings, key, *args, **kwargs)
def report_dev_option(self, message: str):
self.ie.report_warning(
f'{message} '
f'This is a developer option intended for debugging. \n'
' If you experience any issues while using this option, '
f'{self.ie._downloader._format_err("DO NOT", self.ie._downloader.Styles.ERROR)} open a bug report', only_once=True)
def _run_js_runtime(self, stdin: str, /) -> str:
"""To be implemented by subclasses"""
raise NotImplementedError
def _real_bulk_solve(self, /, requests: list[JsChallengeRequest]):
grouped: dict[str, list[JsChallengeRequest]] = collections.defaultdict(list)
for request in requests:
grouped[request.input.player_url].append(request)
for player_url, grouped_requests in grouped.items():
player = None
if self._ENABLE_PREPROCESSED_PLAYER_CACHE:
player = self.ie.cache.load(self._CACHE_SECTION, f'player:{player_url}')
if player:
cached = True
else:
cached = False
video_id = next((request.video_id for request in grouped_requests), None)
player = self._get_player(video_id, player_url)
# NB: This output belongs after the player request
self.logger.info(f'Solving JS challenges using {self.JS_RUNTIME_NAME}')
stdin = self._construct_stdin(player, cached, grouped_requests)
stdout = self._run_js_runtime(stdin)
output = json.loads(stdout)
if output['type'] == 'error':
raise JsChallengeProviderError(output['error'])
if self._ENABLE_PREPROCESSED_PLAYER_CACHE and (preprocessed := output.get('preprocessed_player')):
self.ie.cache.store(self._CACHE_SECTION, f'player:{player_url}', preprocessed)
for request, response_data in zip(grouped_requests, output['responses'], strict=True):
if response_data['type'] == 'error':
yield JsChallengeProviderResponse(request, None, response_data['error'])
else:
yield JsChallengeProviderResponse(request, JsChallengeResponse(request.type, (
NChallengeOutput(response_data['data']) if request.type is JsChallengeType.N
else SigChallengeOutput(response_data['data']))))
def _construct_stdin(self, player: str, preprocessed: bool, requests: list[JsChallengeRequest], /) -> str:
json_requests = [{
'type': request.type.value,
'challenges': request.input.challenges,
} for request in requests]
data = {
'type': 'preprocessed',
'preprocessed_player': player,
'requests': json_requests,
} if preprocessed else {
'type': 'player',
'player': player,
'requests': json_requests,
'output_preprocessed': True,
}
return f'''\
{self._lib_script.code}
Object.assign(globalThis, lib);
{self._core_script.code}
console.log(JSON.stringify(jsc({json.dumps(data)})));
'''
# region: challenge solver script
@functools.cached_property
def _lib_script(self, /):
return self._get_script(ScriptType.LIB)
@functools.cached_property
def _core_script(self, /):
return self._get_script(ScriptType.CORE)
def _get_script(self, script_type: ScriptType, /) -> Script:
skipped_components: list[_SkippedComponent] = []
for _, from_source in self._iter_script_sources():
script = from_source(script_type)
if not script:
continue
if isinstance(script, _SkippedComponent):
skipped_components.append(script)
continue
if not self.is_dev:
# Matching patch version is expected to have same hash
if version_tuple(script.version, lenient=True)[:2] != version_tuple(self._SCRIPT_VERSION, lenient=True)[:2]:
self.logger.warning(
f'Challenge solver {script_type.value} script version {script.version} '
f'is not supported (source: {script.source.value}, variant: {script.variant}, supported version: {self._SCRIPT_VERSION})')
if script.source is ScriptSource.CACHE:
self.logger.debug('Clearing outdated cached script')
self.ie.cache.store(self._CACHE_SECTION, script_type.value, None)
continue
script_hashes = self._ALLOWED_HASHES[script.type].get(script.variant, [])
if script_hashes and script.hash not in script_hashes:
self.logger.warning(
f'Hash mismatch on challenge solver {script.type.value} script '
f'(source: {script.source.value}, variant: {script.variant}, hash: {script.hash})!{provider_bug_report_message(self)}')
if script.source is ScriptSource.CACHE:
self.logger.debug('Clearing invalid cached script')
self.ie.cache.store(self._CACHE_SECTION, script_type.value, None)
continue
self.logger.debug(
f'Using challenge solver {script.type.value} script v{script.version} '
f'(source: {script.source.value}, variant: {script.variant.value})')
break
else:
self._available = False
raise JsChallengeProviderRejectedRequest(
f'No usable challenge solver {script_type.value} script available',
_skipped_components=skipped_components or None,
)
return script
def _iter_script_sources(self) -> Generator[tuple[ScriptSource, Callable[[ScriptType], Script | None]]]:
yield from [
(ScriptSource.PYPACKAGE, self._pypackage_source),
(ScriptSource.CACHE, self._cached_source),
(ScriptSource.BUILTIN, self._builtin_source),
(ScriptSource.WEB, self._web_release_source)]
def _pypackage_source(self, script_type: ScriptType, /) -> Script | None:
if not _has_ejs:
return None
try:
code = yt_dlp_ejs.yt.solver.core() if script_type is ScriptType.CORE else yt_dlp_ejs.yt.solver.lib()
except Exception as e:
self.logger.warning(
f'Failed to load challenge solver {script_type.value} script from python package: {e}{provider_bug_report_message(self)}')
return None
return Script(script_type, ScriptVariant.MINIFIED, ScriptSource.PYPACKAGE, yt_dlp_ejs.version, code)
def _cached_source(self, script_type: ScriptType, /) -> Script | None:
if data := self.ie.cache.load(self._CACHE_SECTION, script_type.value):
return Script(script_type, ScriptVariant(data['variant']), ScriptSource.CACHE, data['version'], data['code'])
return None
def _builtin_source(self, script_type: ScriptType, /) -> Script | None:
error_hook = lambda _: self.logger.warning(
f'Failed to read builtin challenge solver {script_type.value} script{provider_bug_report_message(self)}')
code = vendor.load_script(
self._SCRIPT_FILENAMES[script_type], error_hook=error_hook)
if code:
return Script(script_type, ScriptVariant.UNMINIFIED, ScriptSource.BUILTIN, self._SCRIPT_VERSION, code)
return None
def _web_release_source(self, script_type: ScriptType, /):
if 'ejs:github' not in (self.ie.get_param('remote_components') or ()):
return self._skip_component('ejs:github')
url = f'https://github.com/{self._REPOSITORY}/releases/download/{self._SCRIPT_VERSION}/{self._MIN_SCRIPT_FILENAMES[script_type]}'
if code := self.ie._download_webpage_with_retries(
url, None, f'[{self.logger.prefix}] Downloading challenge solver {script_type.value} script from {url}',
f'[{self.logger.prefix}] Failed to download challenge solver {script_type.value} script', fatal=False,
):
self.ie.cache.store(self._CACHE_SECTION, script_type.value, {
'version': self._SCRIPT_VERSION,
'variant': ScriptVariant.MINIFIED.value,
'code': code,
})
return Script(script_type, ScriptVariant.MINIFIED, ScriptSource.WEB, self._SCRIPT_VERSION, code)
return None
# endregion: challenge solver script
@property
def runtime_info(self) -> JsRuntimeInfo | None:
runtime = self.ie._downloader._js_runtimes.get(self.JS_RUNTIME_NAME)
if not runtime or not runtime.info or not runtime.info.supported:
return None
return runtime.info
def is_available(self, /) -> bool:
if not self.runtime_info:
return False
return self._available
def _skip_component(self, component: str, /):
return _SkippedComponent(component, self.JS_RUNTIME_NAME)
@dataclasses.dataclass
class _SkippedComponent:
component: str
runtime: str

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
import re
import shlex
import subprocess
from yt_dlp.extractor.youtube.jsc._builtin.ejs import EJSBaseJCP
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.utils import Popen
@register_provider
class NodeJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'node'
JS_RUNTIME_NAME = 'node'
_ARGS = ['-']
def _run_js_runtime(self, stdin: str, /) -> str:
args = []
if self.ejs_setting('jitless', ['false']) != ['false']:
args.append('--v8-flags=--jitless')
# Node permission flag changed from experimental to stable in v23.5.0
if self.runtime_info.version_tuple < (23, 5, 0):
args.append('--experimental-permission')
args.append('--no-warnings=ExperimentalWarning')
else:
args.append('--permission')
cmd = [self.runtime_info.path, *args, *self._ARGS]
self.logger.debug(f'Running node: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
stdout, stderr = proc.communicate_or_kill(stdin)
stderr = self._clean_stderr(stderr)
if proc.returncode or stderr:
msg = f'Error running node process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
return stdout
def _clean_stderr(self, stderr):
return '\n'.join(
line for line in stderr.splitlines()
if not (
re.match(r'^\[stdin\]:', line)
or re.match(r'^var jsc', line)
or '(Use `node --trace-uncaught ...` to show where the exception was thrown)' == line
or re.match(r'^Node\.js v\d+\.\d+\.\d+$', line)))
@register_preference(NodeJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 900

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import pathlib
import shlex
import subprocess
import tempfile
from yt_dlp.extractor.youtube.jsc._builtin.ejs import EJSBaseJCP
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.utils import Popen
@register_provider
class QuickJSJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'quickjs'
JS_RUNTIME_NAME = 'quickjs'
def _run_js_runtime(self, stdin: str, /) -> str:
if self.runtime_info.name == 'quickjs-ng':
self.logger.warning('QuickJS-NG is missing some optimizations making this very slow. Consider using upstream QuickJS instead.')
elif self.runtime_info.version_tuple < (2025, 4, 26):
self.logger.warning('Older QuickJS versions are missing optimizations making this very slow. Consider upgrading.')
# QuickJS does not support reading from stdin, so we have to use a temp file
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False, encoding='utf-8')
try:
temp_file.write(stdin)
temp_file.close()
cmd = [self.runtime_info.path, '--script', temp_file.name]
self.logger.debug(f'Running QuickJS: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
stdout, stderr = proc.communicate_or_kill()
if proc.returncode or stderr:
msg = f'Error running QuickJS process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
finally:
pathlib.Path(temp_file.name).unlink(missing_ok=True)
return stdout
@register_preference(QuickJSJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 850

View File

@@ -0,0 +1,17 @@
import importlib.resources
from yt_dlp.extractor.youtube.jsc._builtin.vendor._info import HASHES, VERSION
__all__ = ['HASHES', 'VERSION', 'load_script']
def load_script(filename, error_hook=None):
file = importlib.resources.files(__package__) / filename
if file.is_file():
try:
return file.read_text(encoding='utf-8')
except (OSError, FileNotFoundError, ModuleNotFoundError) as e:
if error_hook:
error_hook(e)
return None
return None

View File

@@ -0,0 +1,11 @@
# This file is generated by devscripts/update_ejs.py. DO NOT MODIFY!
VERSION = '0.3.2'
HASHES = {
'yt.solver.bun.lib.js': '6ff45e94de9f0ea936a183c48173cfa9ce526ee4b7544cd556428427c1dd53c8073ef0174e79b320252bf0e7c64b0032cc1cf9c4358f3fda59033b7caa01c241',
'yt.solver.core.js': '0cd96b2d3f319dfa62cae689efa7d930ef1706e95f5921794db5089b2262957ec0a17d73938d8975ea35d0309cbfb4c8e4418d5e219837215eee242890c8b64d',
'yt.solver.core.min.js': '370d627703002b4a73b10027702734a3de9484f6b56b739942be1dc2b60fee49dee2aa86ed117d1c8ae1ac55181d326481f1fe2e2e8d5211154d48e2a55dac51',
'yt.solver.deno.lib.js': '9c8ee3ab6c23e443a5a951e3ac73c6b8c1c8fb34335e7058a07bf99d349be5573611de00536dcd03ecd3cf34014c4e9b536081de37af3637c5390c6a6fd6a0f0',
'yt.solver.lib.js': '1ee3753a8222fc855f5c39db30a9ccbb7967dbe1fb810e86dc9a89aa073a0907f294c720e9b65427d560a35aa1ce6af19ef854d9126a05ca00afe03f72047733',
'yt.solver.lib.min.js': '8420c259ad16e99ce004e4651ac1bcabb53b4457bf5668a97a9359be9a998a789fee8ab124ee17f91a2ea8fd84e0f2b2fc8eabcaf0b16a186ba734cf422ad053',
}

View File

@@ -0,0 +1,9 @@
/*!
* SPDX-License-Identifier: Unlicense
* This file was automatically generated by https://github.com/yt-dlp/ejs
*/
const lib = {
meriyah: await import('meriyah@6.1.4'),
astring: await import('astring@1.9.0'),
};
export { lib };

View File

@@ -0,0 +1,550 @@
/*!
* SPDX-License-Identifier: Unlicense
* This file was automatically generated by https://github.com/yt-dlp/ejs
*/
var jsc = (function (meriyah, astring) {
'use strict';
function matchesStructure(obj, structure) {
if (Array.isArray(structure)) {
if (!Array.isArray(obj)) {
return false;
}
return (
structure.length === obj.length &&
structure.every((value, index) => matchesStructure(obj[index], value))
);
}
if (typeof structure === 'object') {
if (!obj) {
return !structure;
}
if ('or' in structure) {
return structure.or.some((node) => matchesStructure(obj, node));
}
if ('anykey' in structure && Array.isArray(structure.anykey)) {
const haystack = Array.isArray(obj) ? obj : Object.values(obj);
return structure.anykey.every((value) =>
haystack.some((el) => matchesStructure(el, value)),
);
}
for (const [key, value] of Object.entries(structure)) {
if (!matchesStructure(obj[key], value)) {
return false;
}
}
return true;
}
return structure === obj;
}
function isOneOf(value, ...of) {
return of.includes(value);
}
function _optionalChain$2(ops) {
let lastAccessLHS = undefined;
let value = ops[0];
let i = 1;
while (i < ops.length) {
const op = ops[i];
const fn = ops[i + 1];
i += 2;
if ((op === 'optionalAccess' || op === 'optionalCall') && value == null) {
return undefined;
}
if (op === 'access' || op === 'optionalAccess') {
lastAccessLHS = value;
value = fn(value);
} else if (op === 'call' || op === 'optionalCall') {
value = fn((...args) => value.call(lastAccessLHS, ...args));
lastAccessLHS = undefined;
}
}
return value;
}
const logicalExpression = {
type: 'ExpressionStatement',
expression: {
type: 'LogicalExpression',
left: { type: 'Identifier' },
right: {
type: 'SequenceExpression',
expressions: [
{
type: 'AssignmentExpression',
left: { type: 'Identifier' },
operator: '=',
right: {
type: 'CallExpression',
callee: { type: 'Identifier' },
arguments: {
or: [
[
{ type: 'Literal' },
{
type: 'CallExpression',
callee: {
type: 'Identifier',
name: 'decodeURIComponent',
},
arguments: [{ type: 'Identifier' }],
optional: false,
},
],
[
{
type: 'CallExpression',
callee: {
type: 'Identifier',
name: 'decodeURIComponent',
},
arguments: [{ type: 'Identifier' }],
optional: false,
},
],
],
},
optional: false,
},
},
{ type: 'CallExpression' },
],
},
operator: '&&',
},
};
const identifier$1 = {
or: [
{
type: 'ExpressionStatement',
expression: {
type: 'AssignmentExpression',
operator: '=',
left: { type: 'Identifier' },
right: { type: 'FunctionExpression', params: [{}, {}, {}] },
},
},
{ type: 'FunctionDeclaration', params: [{}, {}, {}] },
{
type: 'VariableDeclaration',
declarations: {
anykey: [
{
type: 'VariableDeclarator',
init: { type: 'FunctionExpression', params: [{}, {}, {}] },
},
],
},
},
],
};
function extract$1(node) {
if (!matchesStructure(node, identifier$1)) {
return null;
}
let block;
if (
node.type === 'ExpressionStatement' &&
node.expression.type === 'AssignmentExpression' &&
node.expression.right.type === 'FunctionExpression'
) {
block = node.expression.right.body;
} else if (node.type === 'VariableDeclaration') {
for (const decl of node.declarations) {
if (
decl.type === 'VariableDeclarator' &&
_optionalChain$2([
decl,
'access',
(_) => _.init,
'optionalAccess',
(_2) => _2.type,
]) === 'FunctionExpression' &&
_optionalChain$2([
decl,
'access',
(_3) => _3.init,
'optionalAccess',
(_4) => _4.params,
'access',
(_5) => _5.length,
]) === 3
) {
block = decl.init.body;
break;
}
}
} else if (node.type === 'FunctionDeclaration') {
block = node.body;
} else {
return null;
}
const relevantExpression = _optionalChain$2([
block,
'optionalAccess',
(_6) => _6.body,
'access',
(_7) => _7.at,
'call',
(_8) => _8(-2),
]);
if (!matchesStructure(relevantExpression, logicalExpression)) {
return null;
}
if (
_optionalChain$2([
relevantExpression,
'optionalAccess',
(_9) => _9.type,
]) !== 'ExpressionStatement' ||
relevantExpression.expression.type !== 'LogicalExpression' ||
relevantExpression.expression.right.type !== 'SequenceExpression' ||
relevantExpression.expression.right.expressions[0].type !==
'AssignmentExpression'
) {
return null;
}
const call = relevantExpression.expression.right.expressions[0].right;
if (call.type !== 'CallExpression' || call.callee.type !== 'Identifier') {
return null;
}
return {
type: 'ArrowFunctionExpression',
params: [{ type: 'Identifier', name: 'sig' }],
body: {
type: 'CallExpression',
callee: { type: 'Identifier', name: call.callee.name },
arguments:
call.arguments.length === 1
? [{ type: 'Identifier', name: 'sig' }]
: [call.arguments[0], { type: 'Identifier', name: 'sig' }],
optional: false,
},
async: false,
expression: false,
generator: false,
};
}
function _optionalChain$1(ops) {
let lastAccessLHS = undefined;
let value = ops[0];
let i = 1;
while (i < ops.length) {
const op = ops[i];
const fn = ops[i + 1];
i += 2;
if ((op === 'optionalAccess' || op === 'optionalCall') && value == null) {
return undefined;
}
if (op === 'access' || op === 'optionalAccess') {
lastAccessLHS = value;
value = fn(value);
} else if (op === 'call' || op === 'optionalCall') {
value = fn((...args) => value.call(lastAccessLHS, ...args));
lastAccessLHS = undefined;
}
}
return value;
}
const identifier = {
or: [
{
type: 'VariableDeclaration',
kind: 'var',
declarations: {
anykey: [
{
type: 'VariableDeclarator',
id: { type: 'Identifier' },
init: {
type: 'ArrayExpression',
elements: [{ type: 'Identifier' }],
},
},
],
},
},
{
type: 'ExpressionStatement',
expression: {
type: 'AssignmentExpression',
left: { type: 'Identifier' },
operator: '=',
right: {
type: 'ArrayExpression',
elements: [{ type: 'Identifier' }],
},
},
},
],
};
const catchBlockBody = [
{
type: 'ReturnStatement',
argument: {
type: 'BinaryExpression',
left: {
type: 'MemberExpression',
object: { type: 'Identifier' },
computed: true,
property: { type: 'Literal' },
optional: false,
},
right: { type: 'Identifier' },
operator: '+',
},
},
];
function extract(node) {
if (!matchesStructure(node, identifier)) {
let name = null;
let block = null;
switch (node.type) {
case 'ExpressionStatement': {
if (
node.expression.type === 'AssignmentExpression' &&
node.expression.left.type === 'Identifier' &&
node.expression.right.type === 'FunctionExpression' &&
node.expression.right.params.length === 1
) {
name = node.expression.left.name;
block = node.expression.right.body;
}
break;
}
case 'FunctionDeclaration': {
if (node.params.length === 1) {
name = _optionalChain$1([
node,
'access',
(_) => _.id,
'optionalAccess',
(_2) => _2.name,
]);
block = node.body;
}
break;
}
}
if (!block || !name) {
return null;
}
const tryNode = block.body.at(-2);
if (
_optionalChain$1([tryNode, 'optionalAccess', (_3) => _3.type]) !==
'TryStatement' ||
_optionalChain$1([
tryNode,
'access',
(_4) => _4.handler,
'optionalAccess',
(_5) => _5.type,
]) !== 'CatchClause'
) {
return null;
}
const catchBody = tryNode.handler.body.body;
if (matchesStructure(catchBody, catchBlockBody)) {
return makeSolverFuncFromName(name);
}
return null;
}
if (node.type === 'VariableDeclaration') {
for (const declaration of node.declarations) {
if (
declaration.type !== 'VariableDeclarator' ||
!declaration.init ||
declaration.init.type !== 'ArrayExpression' ||
declaration.init.elements.length !== 1
) {
continue;
}
const [firstElement] = declaration.init.elements;
if (firstElement && firstElement.type === 'Identifier') {
return makeSolverFuncFromName(firstElement.name);
}
}
} else if (node.type === 'ExpressionStatement') {
const expr = node.expression;
if (
expr.type === 'AssignmentExpression' &&
expr.left.type === 'Identifier' &&
expr.operator === '=' &&
expr.right.type === 'ArrayExpression' &&
expr.right.elements.length === 1
) {
const [firstElement] = expr.right.elements;
if (firstElement && firstElement.type === 'Identifier') {
return makeSolverFuncFromName(firstElement.name);
}
}
}
return null;
}
function makeSolverFuncFromName(name) {
return {
type: 'ArrowFunctionExpression',
params: [{ type: 'Identifier', name: 'n' }],
body: {
type: 'CallExpression',
callee: { type: 'Identifier', name: name },
arguments: [{ type: 'Identifier', name: 'n' }],
optional: false,
},
async: false,
expression: false,
generator: false,
};
}
const setupNodes = meriyah.parse(
`\nif (typeof globalThis.XMLHttpRequest === "undefined") {\n globalThis.XMLHttpRequest = { prototype: {} };\n}\nconst window = Object.create(null);\nif (typeof URL === "undefined") {\n window.location = {\n hash: "",\n host: "www.youtube.com",\n hostname: "www.youtube.com",\n href: "https://www.youtube.com/watch?v=yt-dlp-wins",\n origin: "https://www.youtube.com",\n password: "",\n pathname: "/watch",\n port: "",\n protocol: "https:",\n search: "?v=yt-dlp-wins",\n username: "",\n };\n} else {\n window.location = new URL("https://www.youtube.com/watch?v=yt-dlp-wins");\n}\nif (typeof globalThis.document === "undefined") {\n globalThis.document = Object.create(null);\n}\nif (typeof globalThis.navigator === "undefined") {\n globalThis.navigator = Object.create(null);\n}\nif (typeof globalThis.self === "undefined") {\n globalThis.self = globalThis;\n}\n`,
).body;
function _optionalChain(ops) {
let lastAccessLHS = undefined;
let value = ops[0];
let i = 1;
while (i < ops.length) {
const op = ops[i];
const fn = ops[i + 1];
i += 2;
if ((op === 'optionalAccess' || op === 'optionalCall') && value == null) {
return undefined;
}
if (op === 'access' || op === 'optionalAccess') {
lastAccessLHS = value;
value = fn(value);
} else if (op === 'call' || op === 'optionalCall') {
value = fn((...args) => value.call(lastAccessLHS, ...args));
lastAccessLHS = undefined;
}
}
return value;
}
function preprocessPlayer(data) {
const ast = meriyah.parse(data);
const body = ast.body;
const block = (() => {
switch (body.length) {
case 1: {
const func = body[0];
if (
_optionalChain([func, 'optionalAccess', (_) => _.type]) ===
'ExpressionStatement' &&
func.expression.type === 'CallExpression' &&
func.expression.callee.type === 'MemberExpression' &&
func.expression.callee.object.type === 'FunctionExpression'
) {
return func.expression.callee.object.body;
}
break;
}
case 2: {
const func = body[1];
if (
_optionalChain([func, 'optionalAccess', (_2) => _2.type]) ===
'ExpressionStatement' &&
func.expression.type === 'CallExpression' &&
func.expression.callee.type === 'FunctionExpression'
) {
const block = func.expression.callee.body;
block.body.splice(0, 1);
return block;
}
break;
}
}
throw 'unexpected structure';
})();
const found = { n: [], sig: [] };
const plainExpressions = block.body.filter((node) => {
const n = extract(node);
if (n) {
found.n.push(n);
}
const sig = extract$1(node);
if (sig) {
found.sig.push(sig);
}
if (node.type === 'ExpressionStatement') {
if (node.expression.type === 'AssignmentExpression') {
return true;
}
return node.expression.type === 'Literal';
}
return true;
});
block.body = plainExpressions;
for (const [name, options] of Object.entries(found)) {
const unique = new Set(options.map((x) => JSON.stringify(x)));
if (unique.size !== 1) {
const message = `found ${unique.size} ${name} function possibilities`;
throw (
message +
(unique.size
? `: ${options.map((x) => astring.generate(x)).join(', ')}`
: '')
);
}
plainExpressions.push({
type: 'ExpressionStatement',
expression: {
type: 'AssignmentExpression',
operator: '=',
left: {
type: 'MemberExpression',
computed: false,
object: { type: 'Identifier', name: '_result' },
property: { type: 'Identifier', name: name },
},
right: options[0],
},
});
}
ast.body.splice(0, 0, ...setupNodes);
return astring.generate(ast);
}
function getFromPrepared(code) {
const resultObj = { n: null, sig: null };
Function('_result', code)(resultObj);
return resultObj;
}
function main(input) {
const preprocessedPlayer =
input.type === 'player'
? preprocessPlayer(input.player)
: input.preprocessed_player;
const solvers = getFromPrepared(preprocessedPlayer);
const responses = input.requests.map((input) => {
if (!isOneOf(input.type, 'n', 'sig')) {
return { type: 'error', error: `Unknown request type: ${input.type}` };
}
const solver = solvers[input.type];
if (!solver) {
return {
type: 'error',
error: `Failed to extract ${input.type} function`,
};
}
try {
return {
type: 'result',
data: Object.fromEntries(
input.challenges.map((challenge) => [challenge, solver(challenge)]),
),
};
} catch (error) {
return {
type: 'error',
error:
error instanceof Error
? `${error.message}\n${error.stack}`
: `${error}`,
};
}
});
const output = { type: 'result', responses: responses };
if (input.type === 'player' && input.output_preprocessed) {
output.preprocessed_player = preprocessedPlayer;
}
return output;
}
return main;
})(meriyah, astring);

View File

@@ -0,0 +1,9 @@
/*!
* SPDX-License-Identifier: Unlicense
* This file was automatically generated by https://github.com/yt-dlp/ejs
*/
const lib = {
meriyah: await import('npm:meriyah@6.1.4'),
astring: await import('npm:astring@1.9.0'),
};
export { lib };

View File

@@ -0,0 +1,287 @@
from __future__ import annotations
import collections
import dataclasses
import typing
from yt_dlp.extractor.youtube.jsc._builtin.ejs import _EJS_WIKI_URL
from yt_dlp.extractor.youtube.jsc._registry import (
_jsc_preferences,
_jsc_providers,
)
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeProviderRejectedRequest,
JsChallengeProviderResponse,
JsChallengeRequest,
JsChallengeResponse,
JsChallengeType,
NChallengeInput,
NChallengeOutput,
SigChallengeInput,
SigChallengeOutput,
)
from yt_dlp.extractor.youtube.pot._director import YoutubeIEContentProviderLogger, provider_display_list
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProviderLogger,
)
from yt_dlp.extractor.youtube.pot.provider import (
provider_bug_report_message,
)
if typing.TYPE_CHECKING:
from collections.abc import Iterable
from yt_dlp.extractor.youtube.jsc._builtin.ejs import _SkippedComponent
from yt_dlp.extractor.youtube.jsc.provider import Preference as JsChallengePreference
class JsChallengeRequestDirector:
def __init__(self, logger: IEContentProviderLogger):
self.providers: dict[str, JsChallengeProvider] = {}
self.preferences: list[JsChallengePreference] = []
self.logger = logger
def register_provider(self, provider: JsChallengeProvider):
self.providers[provider.PROVIDER_KEY] = provider
def register_preference(self, preference: JsChallengePreference):
self.preferences.append(preference)
def _get_providers(self, requests: list[JsChallengeRequest]) -> Iterable[JsChallengeProvider]:
"""Sorts available providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, requests) for pref in self.preferences)
for provider in self.providers.values()
}
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every JS Challenge provider upfront may have some overhead
self.logger.trace(f'JS Challenge Providers: {provider_display_list(self.providers.values())}')
self.logger.trace('JS Challenge Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_NAME}={pref}' for provider, pref in preferences.items())))
return (
provider for provider in sorted(
self.providers.values(), key=preferences.get, reverse=True)
if provider.is_available()
)
def _handle_error(self, e: Exception, provider: JsChallengeProvider, requests: list[JsChallengeRequest]):
if isinstance(e, JsChallengeProviderRejectedRequest):
self.logger.trace(
f'JS Challenge Provider "{provider.PROVIDER_NAME}" rejected '
f'{"this request" if len(requests) == 1 else f"{len(requests)} requests"}, '
f'trying next available provider. Reason: {e}',
)
elif isinstance(e, JsChallengeProviderError):
if len(requests) == 1:
self.logger.warning(
f'Error solving {requests[0].type.value} challenge request using "{provider.PROVIDER_NAME}" provider: {e}.\n'
f' input = {requests[0].input}\n'
f' {(provider_bug_report_message(provider, before="") if not e.expected else "")}')
else:
self.logger.warning(
f'Error solving {len(requests)} challenge requests using "{provider.PROVIDER_NAME}" provider: {e}.\n'
f' requests = {requests}\n'
f' {(provider_bug_report_message(provider, before="") if not e.expected else "")}')
else:
self.logger.error(
f'Unexpected error solving {len(requests)} challenge request(s) using "{provider.PROVIDER_NAME}" provider: {e!r}\n'
f' requests = {requests}\n'
f' {provider_bug_report_message(provider, before="")}', cause=e)
def bulk_solve(self, requests: list[JsChallengeRequest]) -> list[tuple[JsChallengeRequest, JsChallengeResponse]]:
"""Solves multiple JS Challenges in bulk, returning a list of responses"""
if not self.providers:
self.logger.trace('No JS Challenge providers registered')
return []
results = []
next_requests = requests[:]
skipped_components = []
for provider in self._get_providers(next_requests):
if not next_requests:
break
self.logger.trace(
f'Attempting to solve {len(next_requests)} challenges using "{provider.PROVIDER_NAME}" provider')
try:
for response in provider.bulk_solve([dataclasses.replace(request) for request in next_requests]):
if not validate_provider_response(response):
self.logger.warning(
f'JS Challenge Provider "{provider.PROVIDER_NAME}" returned an invalid response:'
f' response = {response!r}\n'
f' {provider_bug_report_message(provider, before="")}')
continue
if response.error:
self._handle_error(response.error, provider, [response.request])
continue
if (vr_msg := validate_response(response.response, response.request)) is not True:
self.logger.warning(
f'Invalid JS Challenge response received from "{provider.PROVIDER_NAME}" provider: {vr_msg or ""}\n'
f' response = {response.response}\n'
f' request = {response.request}\n'
f' {provider_bug_report_message(provider, before="")}')
continue
try:
next_requests.remove(response.request)
except ValueError:
self.logger.warning(
f'JS Challenge Provider "{provider.PROVIDER_NAME}" returned a response for an unknown request:\n'
f' request = {response.request}\n'
f' {provider_bug_report_message(provider, before="")}')
continue
results.append((response.request, response.response))
except Exception as e:
if isinstance(e, JsChallengeProviderRejectedRequest) and e._skipped_components:
skipped_components.extend(e._skipped_components)
self._handle_error(e, provider, next_requests)
continue
if skipped_components:
self.__report_skipped_components(skipped_components)
if len(results) != len(requests):
self.logger.trace(
f'Not all JS Challenges were solved, expected {len(requests)} responses, got {len(results)}')
self.logger.trace(f'Unsolved requests: {next_requests}')
else:
self.logger.trace(f'Solved all {len(requests)} requested JS Challenges')
return results
def __report_skipped_components(self, components: list[_SkippedComponent], /):
runtime_components = collections.defaultdict(list)
for component in components:
runtime_components[component.component].append(component.runtime)
for runtimes in runtime_components.values():
runtimes.sort()
description_lookup = {
'ejs:npm': 'NPM package',
'ejs:github': 'challenge solver script',
}
descriptions = [
f'{description_lookup.get(component, component)} ({", ".join(runtimes)})'
for component, runtimes in runtime_components.items()
if runtimes
]
flags = [
f' --remote-components {f"{component} (recommended)" if component == "ejs:github" else f"{component} "}'
for component, runtimes in runtime_components.items()
if runtimes
]
def join_parts(parts, joiner):
if not parts:
return ''
if len(parts) == 1:
return parts[0]
return f'{", ".join(parts[:-1])} {joiner} {parts[-1]}'
if len(descriptions) == 1:
msg = (
f'Remote component {descriptions[0]} was skipped. '
f'It may be required to solve JS challenges. '
f'You can enable the download with {flags[0]}')
else:
msg = (
f'Remote components {join_parts(descriptions, "and")} were skipped. '
f'These may be required to solve JS challenges. '
f'You can enable these downloads with {join_parts(flags, "or")}, respectively')
self.logger.warning(f'{msg}. For more information and alternatives, refer to {_EJS_WIKI_URL}')
def close(self):
for provider in self.providers.values():
provider.close()
EXTRACTOR_ARG_PREFIX = 'youtubejsc'
def initialize_jsc_director(ie):
assert ie._downloader is not None, 'Downloader not set'
enable_trace = ie._configuration_arg(
'jsc_trace', ['false'], ie_key='youtube', casesense=False)[0] == 'true'
if enable_trace:
log_level = IEContentProviderLogger.LogLevel.TRACE
elif ie.get_param('verbose', False):
log_level = IEContentProviderLogger.LogLevel.DEBUG
else:
log_level = IEContentProviderLogger.LogLevel.INFO
def get_provider_logger_and_settings(provider, logger_key):
logger_prefix = f'{logger_key}:{provider.PROVIDER_NAME}'
extractor_key = f'{EXTRACTOR_ARG_PREFIX}-{provider.PROVIDER_KEY.lower()}'
return (
YoutubeIEContentProviderLogger(ie, logger_prefix, log_level=log_level),
ie.get_param('extractor_args', {}).get(extractor_key, {}))
director = JsChallengeRequestDirector(
logger=YoutubeIEContentProviderLogger(ie, 'jsc', log_level=log_level),
)
ie._downloader.add_close_hook(director.close)
for provider in _jsc_providers.value.values():
logger, settings = get_provider_logger_and_settings(provider, 'jsc')
director.register_provider(provider(ie, logger, settings))
for preference in _jsc_preferences.value:
director.register_preference(preference)
if director.logger.log_level <= director.logger.LogLevel.DEBUG:
# calling is_available() for every JS Challenge provider upfront may have some overhead
director.logger.debug(f'JS Challenge Providers: {provider_display_list(director.providers.values())}')
director.logger.trace(f'Registered {len(director.preferences)} JS Challenge provider preferences')
return director
def validate_provider_response(response: JsChallengeProviderResponse) -> bool:
return (
isinstance(response, JsChallengeProviderResponse)
and isinstance(response.request, JsChallengeRequest)
and (
isinstance(response.response, JsChallengeResponse)
or (response.error is not None and isinstance(response.error, Exception)))
)
def validate_response(response: JsChallengeResponse, request: JsChallengeRequest) -> bool | str:
if not isinstance(response, JsChallengeResponse):
return 'Response is not a JsChallengeResponse'
if request.type == JsChallengeType.N:
return validate_nsig_challenge_output(response.output, request.input)
else:
return validate_sig_challenge_output(response.output, request.input)
def validate_nsig_challenge_output(challenge_output: NChallengeOutput, challenge_input: NChallengeInput) -> bool | str:
if not (
isinstance(challenge_output, NChallengeOutput)
and len(challenge_output.results) == len(challenge_input.challenges)
and all(isinstance(k, str) and isinstance(v, str) for k, v in challenge_output.results.items())
and all(challenge in challenge_output.results for challenge in challenge_input.challenges)
):
return 'Invalid NChallengeOutput'
# Validate n results are valid - if they end with the input challenge then the js function returned with an exception.
for challenge, result in challenge_output.results.items():
if result.endswith(challenge):
return f'n result is invalid for {challenge!r}: {result!r}'
return True
def validate_sig_challenge_output(challenge_output: SigChallengeOutput, challenge_input: SigChallengeInput) -> bool:
return (
isinstance(challenge_output, SigChallengeOutput)
and len(challenge_output.results) == len(challenge_input.challenges)
and all(isinstance(k, str) and isinstance(v, str) for k, v in challenge_output.results.items())
and all(challenge in challenge_output.results for challenge in challenge_input.challenges)
) or 'Invalid SigChallengeOutput'

View File

@@ -0,0 +1,4 @@
from yt_dlp.globals import Indirect
_jsc_providers = Indirect({})
_jsc_preferences = Indirect(set())

View File

@@ -0,0 +1,161 @@
"""PUBLIC API"""
from __future__ import annotations
import abc
import dataclasses
import enum
import typing
from yt_dlp.extractor.youtube.jsc._registry import _jsc_preferences, _jsc_providers
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProvider,
IEContentProviderError,
register_preference_generic,
register_provider_generic,
)
from yt_dlp.utils import ExtractorError
__all__ = [
'JsChallengeProvider',
'JsChallengeProviderError',
'JsChallengeProviderRejectedRequest',
'JsChallengeProviderResponse',
'JsChallengeRequest',
'JsChallengeResponse',
'JsChallengeType',
'NChallengeInput',
'NChallengeOutput',
'SigChallengeInput',
'SigChallengeOutput',
'register_preference',
'register_provider',
]
class JsChallengeType(enum.Enum):
N = 'n'
SIG = 'sig'
@dataclasses.dataclass(frozen=True)
class JsChallengeRequest:
type: JsChallengeType
input: NChallengeInput | SigChallengeInput
video_id: str | None = None
@dataclasses.dataclass(frozen=True)
class NChallengeInput:
player_url: str
challenges: list[str] = dataclasses.field(default_factory=list)
@dataclasses.dataclass(frozen=True)
class SigChallengeInput:
player_url: str
challenges: list[str] = dataclasses.field(default_factory=list)
@dataclasses.dataclass(frozen=True)
class NChallengeOutput:
results: dict[str, str] = dataclasses.field(default_factory=dict)
@dataclasses.dataclass(frozen=True)
class SigChallengeOutput:
results: dict[str, str] = dataclasses.field(default_factory=dict)
@dataclasses.dataclass
class JsChallengeProviderResponse:
request: JsChallengeRequest
response: JsChallengeResponse | None = None
error: Exception | None = None
@dataclasses.dataclass
class JsChallengeResponse:
type: JsChallengeType
output: NChallengeOutput | SigChallengeOutput
class JsChallengeProviderRejectedRequest(IEContentProviderError):
"""Reject the JsChallengeRequest (cannot handle the request)"""
def __init__(self, msg=None, expected: bool = False, *, _skipped_components=None):
super().__init__(msg, expected)
self._skipped_components = _skipped_components
class JsChallengeProviderError(IEContentProviderError):
"""An error occurred while solving the challenge"""
class JsChallengeProvider(IEContentProvider, abc.ABC, suffix='JCP'):
# Set to None to disable the check
_SUPPORTED_TYPES: tuple[JsChallengeType] | None = ()
def __validate_request(self, request: JsChallengeRequest):
if not self.is_available():
raise JsChallengeProviderRejectedRequest(f'{self.PROVIDER_NAME} is not available')
# Validate request using built-in settings
if (
self._SUPPORTED_TYPES is not None
and request.type not in self._SUPPORTED_TYPES
):
raise JsChallengeProviderRejectedRequest(
f'JS Challenge type "{request.type}" is not supported by {self.PROVIDER_NAME}')
def bulk_solve(self, requests: list[JsChallengeRequest]) -> typing.Generator[JsChallengeProviderResponse, None, None]:
"""Solve multiple JS challenges and return the results"""
validated_requests = []
for request in requests:
try:
self.__validate_request(request)
validated_requests.append(request)
except JsChallengeProviderRejectedRequest as e:
yield JsChallengeProviderResponse(request=request, error=e)
continue
yield from self._real_bulk_solve(validated_requests)
@abc.abstractmethod
def _real_bulk_solve(self, requests: list[JsChallengeRequest]) -> typing.Generator[JsChallengeProviderResponse, None, None]:
"""Subclasses can override this method to handle bulk solving"""
raise NotImplementedError(f'{self.PROVIDER_NAME} does not implement bulk solving')
def _get_player(self, video_id, player_url):
try:
return self.ie._load_player(
video_id=video_id,
player_url=player_url,
fatal=True,
)
except ExtractorError as e:
raise JsChallengeProviderError(
f'Failed to load player for JS challenge: {e}') from e
def register_provider(provider: type[JsChallengeProvider]):
"""Register a JsChallengeProvider class"""
return register_provider_generic(
provider=provider,
base_class=JsChallengeProvider,
registry=_jsc_providers.value,
)
def register_preference(*providers: type[JsChallengeProvider]) -> typing.Callable[[Preference], Preference]:
"""Register a preference for a JsChallengeProvider class."""
return register_preference_generic(
JsChallengeProvider,
_jsc_preferences.value,
*providers,
)
if typing.TYPE_CHECKING:
Preference = typing.Callable[[JsChallengeProvider, list[JsChallengeRequest]], int]
__all__.append('Preference')

View File

@@ -6,6 +6,7 @@ import dataclasses
import datetime as dt
import hashlib
import json
import traceback
import typing
import urllib.parse
from collections.abc import Iterable
@@ -58,9 +59,9 @@ class YoutubeIEContentProviderLogger(IEContentProviderLogger):
if self.log_level <= self.LogLevel.TRACE:
self.__ie.write_debug(self._format_msg('TRACE: ' + message))
def debug(self, message: str):
def debug(self, message: str, *, once=False):
if self.log_level <= self.LogLevel.DEBUG:
self.__ie.write_debug(self._format_msg(message))
self.__ie.write_debug(self._format_msg(message), only_once=once)
def info(self, message: str):
if self.log_level <= self.LogLevel.INFO:
@@ -70,9 +71,11 @@ class YoutubeIEContentProviderLogger(IEContentProviderLogger):
if self.log_level <= self.LogLevel.WARNING:
self.__ie.report_warning(self._format_msg(message), only_once=once)
def error(self, message: str):
def error(self, message: str, cause=None):
if self.log_level <= self.LogLevel.ERROR:
self.__ie._downloader.report_error(self._format_msg(message), is_error=False)
self.__ie._downloader.report_error(
self._format_msg(message), is_error=False,
tb=''.join(traceback.format_exception(None, cause, cause.__traceback__)) if cause else None)
class PoTokenCache:

View File

@@ -36,7 +36,7 @@ class IEContentProviderLogger(abc.ABC):
pass
@abc.abstractmethod
def debug(self, message: str):
def debug(self, message: str, *, once=False):
pass
@abc.abstractmethod
@@ -48,7 +48,7 @@ class IEContentProviderLogger(abc.ABC):
pass
@abc.abstractmethod
def error(self, message: str):
def error(self, message: str, cause=None):
pass
@@ -90,7 +90,7 @@ class IEContentProvider(abc.ABC):
@classproperty
def PROVIDER_KEY(cls) -> str:
assert hasattr(cls, '_PROVIDER_KEY_SUFFIX'), 'Content Provider implementation must define a suffix for the provider key'
assert cls.__name__.endswith(cls._PROVIDER_KEY_SUFFIX), f'PoTokenProvider class names must end with "{cls._PROVIDER_KEY_SUFFIX}"'
assert cls.__name__.endswith(cls._PROVIDER_KEY_SUFFIX), f'Class name must end with "{cls._PROVIDER_KEY_SUFFIX}"'
return cls.__name__[:-len(cls._PROVIDER_KEY_SUFFIX)]
@abc.abstractmethod
@@ -114,10 +114,7 @@ class IEContentProvider(abc.ABC):
@param default The default value to return when the key is not present (default: [])
@param casesense When false, the values are converted to lower case
"""
val = traverse_obj(self.settings, key)
if val is None:
return [] if default is NO_DEFAULT else default
return list(val) if casesense else [x.lower() for x in val]
return configuration_arg(self.settings, key, default=default, casesense=casesense)
class BuiltinIEContentProvider(IEContentProvider, abc.ABC):
@@ -125,6 +122,20 @@ class BuiltinIEContentProvider(IEContentProvider, abc.ABC):
BUG_REPORT_MESSAGE = bug_reports_message(before='')
def configuration_arg(config, key, default=NO_DEFAULT, *, casesense=False):
"""
@returns A list of values for the setting given by "key"
or "default" if no such key is present
@param config The configuration dictionary
@param default The default value to return when the key is not present (default: [])
@param casesense When false, the values are converted to lower case
"""
val = traverse_obj(config, key)
if val is None:
return [] if default is NO_DEFAULT else default
return list(val) if casesense else [x.lower() for x in val]
def register_provider_generic(
provider,
base_class,