Added yt_dlp directly, added rename format options, added xclip clipboard subproc, added copy name context menu option

This commit is contained in:
2022-12-02 20:00:26 -06:00
parent e4e5e08cb4
commit b84fd38523
976 changed files with 191451 additions and 6 deletions

View File

@@ -0,0 +1,47 @@
# flake8: noqa: F401
from ..utils import load_plugins
from .common import PostProcessor
from .embedthumbnail import EmbedThumbnailPP
from .exec import ExecPP, ExecAfterDownloadPP
from .ffmpeg import (
FFmpegPostProcessor,
FFmpegCopyStreamPP,
FFmpegConcatPP,
FFmpegEmbedSubtitlePP,
FFmpegExtractAudioPP,
FFmpegFixupDuplicateMoovPP,
FFmpegFixupDurationPP,
FFmpegFixupStretchedPP,
FFmpegFixupTimestampPP,
FFmpegFixupM3u8PP,
FFmpegFixupM4aPP,
FFmpegMergerPP,
FFmpegMetadataPP,
FFmpegSubtitlesConvertorPP,
FFmpegThumbnailsConvertorPP,
FFmpegSplitChaptersPP,
FFmpegVideoConvertorPP,
FFmpegVideoRemuxerPP,
)
from .metadataparser import (
MetadataFromFieldPP,
MetadataFromTitlePP,
MetadataParserPP,
)
from .modify_chapters import ModifyChaptersPP
from .movefilesafterdownload import MoveFilesAfterDownloadPP
from .sponskrub import SponSkrubPP
from .sponsorblock import SponsorBlockPP
from .xattrpp import XAttrMetadataPP
_PLUGIN_CLASSES = load_plugins('postprocessor', 'PP', globals())
def get_postprocessor(key):
return globals()[key + 'PP']
__all__ = [name for name in globals().keys() if name.endswith('PP')]
__all__.extend(('PostProcessor', 'FFmpegPostProcessor'))

View File

@@ -0,0 +1,183 @@
from __future__ import unicode_literals
import functools
import os
from ..compat import compat_str
from ..utils import (
_configuration_args,
encodeFilename,
PostProcessingError,
write_string,
)
class PostProcessorMetaClass(type):
@staticmethod
def run_wrapper(func):
@functools.wraps(func)
def run(self, info, *args, **kwargs):
info_copy = self._copy_infodict(info)
self._hook_progress({'status': 'started'}, info_copy)
ret = func(self, info, *args, **kwargs)
if ret is not None:
_, info = ret
self._hook_progress({'status': 'finished'}, info_copy)
return ret
return run
def __new__(cls, name, bases, attrs):
if 'run' in attrs:
attrs['run'] = cls.run_wrapper(attrs['run'])
return type.__new__(cls, name, bases, attrs)
class PostProcessor(metaclass=PostProcessorMetaClass):
"""Post Processor class.
PostProcessor objects can be added to downloaders with their
add_post_processor() method. When the downloader has finished a
successful download, it will take its internal chain of PostProcessors
and start calling the run() method on each one of them, first with
an initial argument and then with the returned value of the previous
PostProcessor.
The chain will be stopped if one of them ever returns None or the end
of the chain is reached.
PostProcessor objects follow a "mutual registration" process similar
to InfoExtractor objects.
Optionally PostProcessor can use a list of additional command-line arguments
with self._configuration_args.
"""
_downloader = None
def __init__(self, downloader=None):
self._progress_hooks = []
self.add_progress_hook(self.report_progress)
self.set_downloader(downloader)
self.PP_NAME = self.pp_key()
@classmethod
def pp_key(cls):
name = cls.__name__[:-2]
return compat_str(name[6:]) if name[:6].lower() == 'ffmpeg' else name
def to_screen(self, text, prefix=True, *args, **kwargs):
tag = '[%s] ' % self.PP_NAME if prefix else ''
if self._downloader:
return self._downloader.to_screen('%s%s' % (tag, text), *args, **kwargs)
def report_warning(self, text, *args, **kwargs):
if self._downloader:
return self._downloader.report_warning(text, *args, **kwargs)
def deprecation_warning(self, text):
if self._downloader:
return self._downloader.deprecation_warning(text)
write_string(f'DeprecationWarning: {text}')
def report_error(self, text, *args, **kwargs):
# Exists only for compatibility. Do not use
if self._downloader:
return self._downloader.report_error(text, *args, **kwargs)
def write_debug(self, text, *args, **kwargs):
if self._downloader:
return self._downloader.write_debug(text, *args, **kwargs)
def get_param(self, name, default=None, *args, **kwargs):
if self._downloader:
return self._downloader.params.get(name, default, *args, **kwargs)
return default
def set_downloader(self, downloader):
"""Sets the downloader for this PP."""
self._downloader = downloader
for ph in getattr(downloader, '_postprocessor_hooks', []):
self.add_progress_hook(ph)
def _copy_infodict(self, info_dict):
return getattr(self._downloader, '_copy_infodict', dict)(info_dict)
@staticmethod
def _restrict_to(*, video=True, audio=True, images=True):
allowed = {'video': video, 'audio': audio, 'images': images}
def decorator(func):
@functools.wraps(func)
def wrapper(self, info):
format_type = (
'video' if info.get('vcodec') != 'none'
else 'audio' if info.get('acodec') != 'none'
else 'images')
if allowed[format_type]:
return func(self, info)
else:
self.to_screen('Skipping %s' % format_type)
return [], info
return wrapper
return decorator
def run(self, information):
"""Run the PostProcessor.
The "information" argument is a dictionary like the ones
composed by InfoExtractors. The only difference is that this
one has an extra field called "filepath" that points to the
downloaded file.
This method returns a tuple, the first element is a list of the files
that can be deleted, and the second of which is the updated
information.
In addition, this method may raise a PostProcessingError
exception if post processing fails.
"""
return [], information # by default, keep file and do nothing
def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
try:
os.utime(encodeFilename(path), (atime, mtime))
except Exception:
self.report_warning(errnote)
def _configuration_args(self, exe, *args, **kwargs):
return _configuration_args(
self.pp_key(), self.get_param('postprocessor_args'), exe, *args, **kwargs)
def _hook_progress(self, status, info_dict):
if not self._progress_hooks:
return
status.update({
'info_dict': info_dict,
'postprocessor': self.pp_key(),
})
for ph in self._progress_hooks:
ph(status)
def add_progress_hook(self, ph):
# See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface
self._progress_hooks.append(ph)
def report_progress(self, s):
s['_default_template'] = '%(postprocessor)s %(status)s' % s
progress_dict = s.copy()
progress_dict.pop('info_dict')
progress_dict = {'info': s['info_dict'], 'progress': progress_dict}
progress_template = self.get_param('progress_template', {})
tmpl = progress_template.get('postprocess')
if tmpl:
self._downloader.to_stdout(self._downloader.evaluate_outtmpl(tmpl, progress_dict))
self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
progress_dict))
class AudioConversionError(PostProcessingError):
pass

View File

@@ -0,0 +1,240 @@
# coding: utf-8
from __future__ import unicode_literals
import base64
import imghdr
import os
import subprocess
import re
try:
from mutagen.flac import Picture, FLAC
from mutagen.mp4 import MP4, MP4Cover
from mutagen.oggopus import OggOpus
from mutagen.oggvorbis import OggVorbis
has_mutagen = True
except ImportError:
has_mutagen = False
from .common import PostProcessor
from .ffmpeg import (
FFmpegPostProcessor,
FFmpegThumbnailsConvertorPP,
)
from ..utils import (
check_executable,
encodeArgument,
encodeFilename,
error_to_compat_str,
Popen,
PostProcessingError,
prepend_extension,
shell_quote,
)
class EmbedThumbnailPPError(PostProcessingError):
pass
class EmbedThumbnailPP(FFmpegPostProcessor):
def __init__(self, downloader=None, already_have_thumbnail=False):
FFmpegPostProcessor.__init__(self, downloader)
self._already_have_thumbnail = already_have_thumbnail
def _get_thumbnail_resolution(self, filename, thumbnail_dict):
def guess():
width, height = thumbnail_dict.get('width'), thumbnail_dict.get('height')
if width and height:
return width, height
try:
size_regex = r',\s*(?P<w>\d+)x(?P<h>\d+)\s*[,\[]'
size_result = self.run_ffmpeg(filename, None, ['-hide_banner'], expected_retcodes=(1,))
mobj = re.search(size_regex, size_result)
if mobj is None:
return guess()
except PostProcessingError as err:
self.report_warning('unable to find the thumbnail resolution; %s' % error_to_compat_str(err))
return guess()
return int(mobj.group('w')), int(mobj.group('h'))
def _report_run(self, exe, filename):
self.to_screen('%s: Adding thumbnail to "%s"' % (exe, filename))
@PostProcessor._restrict_to(images=False)
def run(self, info):
filename = info['filepath']
temp_filename = prepend_extension(filename, 'temp')
if not info.get('thumbnails'):
self.to_screen('There aren\'t any thumbnails to embed')
return [], info
idx = next((-i for i, t in enumerate(info['thumbnails'][::-1], 1) if t.get('filepath')), None)
if idx is None:
self.to_screen('There are no thumbnails on disk')
return [], info
thumbnail_filename = info['thumbnails'][idx]['filepath']
if not os.path.exists(encodeFilename(thumbnail_filename)):
self.report_warning('Skipping embedding the thumbnail because the file is missing.')
return [], info
# Correct extension for WebP file with wrong extension (see #25687, #25717)
convertor = FFmpegThumbnailsConvertorPP(self._downloader)
convertor.fixup_webp(info, idx)
original_thumbnail = thumbnail_filename = info['thumbnails'][idx]['filepath']
# Convert unsupported thumbnail formats to PNG (see #25687, #25717)
# Original behavior was to convert to JPG, but since JPG is a lossy
# format, there will be some additional data loss.
# PNG, on the other hand, is lossless.
thumbnail_ext = os.path.splitext(thumbnail_filename)[1][1:]
if thumbnail_ext not in ('jpg', 'jpeg', 'png'):
thumbnail_filename = convertor.convert_thumbnail(thumbnail_filename, 'png')
thumbnail_ext = 'png'
mtime = os.stat(encodeFilename(filename)).st_mtime
success = True
if info['ext'] == 'mp3':
options = [
'-c', 'copy', '-map', '0:0', '-map', '1:0', '-id3v2_version', '3',
'-metadata:s:v', 'title="Album cover"', '-metadata:s:v', 'comment="Cover (front)"']
self._report_run('ffmpeg', filename)
self.run_ffmpeg_multiple_files([filename, thumbnail_filename], temp_filename, options)
elif info['ext'] in ['mkv', 'mka']:
options = list(self.stream_copy_opts())
mimetype = 'image/%s' % ('png' if thumbnail_ext == 'png' else 'jpeg')
old_stream, new_stream = self.get_stream_number(
filename, ('tags', 'mimetype'), mimetype)
if old_stream is not None:
options.extend(['-map', '-0:%d' % old_stream])
new_stream -= 1
options.extend([
'-attach', thumbnail_filename,
'-metadata:s:%d' % new_stream, 'mimetype=%s' % mimetype,
'-metadata:s:%d' % new_stream, 'filename=cover.%s' % thumbnail_ext])
self._report_run('ffmpeg', filename)
self.run_ffmpeg(filename, temp_filename, options)
elif info['ext'] in ['m4a', 'mp4', 'mov']:
prefer_atomicparsley = 'embed-thumbnail-atomicparsley' in self.get_param('compat_opts', [])
# Method 1: Use mutagen
if not has_mutagen or prefer_atomicparsley:
success = False
else:
try:
self._report_run('mutagen', filename)
meta = MP4(filename)
# NOTE: the 'covr' atom is a non-standard MPEG-4 atom,
# Apple iTunes 'M4A' files include the 'moov.udta.meta.ilst' atom.
f = {'jpeg': MP4Cover.FORMAT_JPEG, 'png': MP4Cover.FORMAT_PNG}[imghdr.what(thumbnail_filename)]
with open(thumbnail_filename, 'rb') as thumbfile:
thumb_data = thumbfile.read()
meta.tags['covr'] = [MP4Cover(data=thumb_data, imageformat=f)]
meta.save()
temp_filename = filename
except Exception as err:
self.report_warning('unable to embed using mutagen; %s' % error_to_compat_str(err))
success = False
# Method 2: Use AtomicParsley
if not success:
success = True
atomicparsley = next((
x for x in ['AtomicParsley', 'atomicparsley']
if check_executable(x, ['-v'])), None)
if atomicparsley is None:
self.to_screen('Neither mutagen nor AtomicParsley was found. Falling back to ffmpeg')
success = False
else:
if not prefer_atomicparsley:
self.to_screen('mutagen was not found. Falling back to AtomicParsley')
cmd = [encodeFilename(atomicparsley, True),
encodeFilename(filename, True),
encodeArgument('--artwork'),
encodeFilename(thumbnail_filename, True),
encodeArgument('-o'),
encodeFilename(temp_filename, True)]
cmd += [encodeArgument(o) for o in self._configuration_args('AtomicParsley')]
self._report_run('atomicparsley', filename)
self.write_debug('AtomicParsley command line: %s' % shell_quote(cmd))
p = Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate_or_kill()
if p.returncode != 0:
msg = stderr.decode('utf-8', 'replace').strip()
self.report_warning(f'Unable to embed thumbnails using AtomicParsley; {msg}')
# for formats that don't support thumbnails (like 3gp) AtomicParsley
# won't create to the temporary file
if b'No changes' in stdout:
self.report_warning('The file format doesn\'t support embedding a thumbnail')
success = False
# Method 3: Use ffmpeg+ffprobe
# Thumbnails attached using this method doesn't show up as cover in some cases
# See https://github.com/yt-dlp/yt-dlp/issues/2125, https://github.com/yt-dlp/yt-dlp/issues/411
if not success:
success = True
try:
options = [*self.stream_copy_opts(), '-map', '1']
old_stream, new_stream = self.get_stream_number(
filename, ('disposition', 'attached_pic'), 1)
if old_stream is not None:
options.extend(['-map', '-0:%d' % old_stream])
new_stream -= 1
options.extend(['-disposition:%s' % new_stream, 'attached_pic'])
self._report_run('ffmpeg', filename)
self.run_ffmpeg_multiple_files([filename, thumbnail_filename], temp_filename, options)
except PostProcessingError as err:
success = False
raise EmbedThumbnailPPError(f'Unable to embed using ffprobe & ffmpeg; {err}')
elif info['ext'] in ['ogg', 'opus', 'flac']:
if not has_mutagen:
raise EmbedThumbnailPPError('module mutagen was not found. Please install using `python -m pip install mutagen`')
self._report_run('mutagen', filename)
f = {'opus': OggOpus, 'flac': FLAC, 'ogg': OggVorbis}[info['ext']](filename)
pic = Picture()
pic.mime = 'image/%s' % imghdr.what(thumbnail_filename)
with open(thumbnail_filename, 'rb') as thumbfile:
pic.data = thumbfile.read()
pic.type = 3 # front cover
res = self._get_thumbnail_resolution(thumbnail_filename, info['thumbnails'][idx])
if res is not None:
pic.width, pic.height = res
if info['ext'] == 'flac':
f.add_picture(pic)
else:
# https://wiki.xiph.org/VorbisComment#METADATA_BLOCK_PICTURE
f['METADATA_BLOCK_PICTURE'] = base64.b64encode(pic.write()).decode('ascii')
f.save()
temp_filename = filename
else:
raise EmbedThumbnailPPError('Supported filetypes for thumbnail embedding are: mp3, mkv/mka, ogg/opus/flac, m4a/mp4/mov')
if success and temp_filename != filename:
os.replace(temp_filename, filename)
self.try_utime(filename, mtime, mtime)
files_to_delete = [thumbnail_filename]
if self._already_have_thumbnail:
if original_thumbnail == thumbnail_filename:
files_to_delete = []
elif original_thumbnail != thumbnail_filename:
files_to_delete.append(original_thumbnail)
return files_to_delete, info

View File

@@ -0,0 +1,49 @@
from __future__ import unicode_literals
import subprocess
from .common import PostProcessor
from ..compat import compat_shlex_quote
from ..utils import (
encodeArgument,
PostProcessingError,
variadic,
)
class ExecPP(PostProcessor):
def __init__(self, downloader, exec_cmd):
PostProcessor.__init__(self, downloader)
self.exec_cmd = variadic(exec_cmd)
def parse_cmd(self, cmd, info):
tmpl, tmpl_dict = self._downloader.prepare_outtmpl(cmd, info)
if tmpl_dict: # if there are no replacements, tmpl_dict = {}
return self._downloader.escape_outtmpl(tmpl) % tmpl_dict
filepath = info.get('filepath', info.get('_filename'))
# If video, and no replacements are found, replace {} for backard compatibility
if filepath:
if '{}' not in cmd:
cmd += ' {}'
cmd = cmd.replace('{}', compat_shlex_quote(filepath))
return cmd
def run(self, info):
for tmpl in self.exec_cmd:
cmd = self.parse_cmd(tmpl, info)
self.to_screen('Executing command: %s' % cmd)
retCode = subprocess.call(encodeArgument(cmd), shell=True)
if retCode != 0:
raise PostProcessingError('Command returned error code %d' % retCode)
return [], info
# Deprecated
class ExecAfterDownloadPP(ExecPP):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.deprecation_warning(
'yt_dlp.postprocessor.ExecAfterDownloadPP is deprecated '
'and may be removed in a future version. Use yt_dlp.postprocessor.ExecPP instead')

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,121 @@
import re
from enum import Enum
from .common import PostProcessor
class MetadataParserPP(PostProcessor):
class Actions(Enum):
INTERPRET = 'interpretter'
REPLACE = 'replacer'
def __init__(self, downloader, actions):
PostProcessor.__init__(self, downloader)
self._actions = []
for f in actions:
action = f[0]
assert isinstance(action, self.Actions)
self._actions.append(getattr(self, action.value)(*f[1:]))
@classmethod
def validate_action(cls, action, *data):
''' Each action can be:
(Actions.INTERPRET, from, to) OR
(Actions.REPLACE, field, search, replace)
'''
if not isinstance(action, cls.Actions):
raise ValueError(f'{action!r} is not a valid action')
getattr(cls, action.value)(cls, *data)
@staticmethod
def field_to_template(tmpl):
if re.match(r'[a-zA-Z_]+$', tmpl):
return f'%({tmpl})s'
return tmpl
@staticmethod
def format_to_regex(fmt):
r"""
Converts a string like
'%(title)s - %(artist)s'
to a regex like
'(?P<title>.+)\ \-\ (?P<artist>.+)'
"""
if not re.search(r'%\(\w+\)s', fmt):
return fmt
lastpos = 0
regex = ''
# replace %(..)s with regex group and escape other string parts
for match in re.finditer(r'%\((\w+)\)s', fmt):
regex += re.escape(fmt[lastpos:match.start()])
regex += rf'(?P<{match.group(1)}>.+)'
lastpos = match.end()
if lastpos < len(fmt):
regex += re.escape(fmt[lastpos:])
return regex
def run(self, info):
for f in self._actions:
f(info)
return [], info
def interpretter(self, inp, out):
def f(info):
data_to_parse = self._downloader.evaluate_outtmpl(template, info)
self.write_debug(f'Searching for {out_re.pattern!r} in {template!r}')
match = out_re.search(data_to_parse)
if match is None:
self.to_screen(f'Could not interpret {inp!r} as {out!r}')
return
for attribute, value in match.groupdict().items():
info[attribute] = value
self.to_screen('Parsed %s from %r: %r' % (attribute, template, value if value is not None else 'NA'))
template = self.field_to_template(inp)
out_re = re.compile(self.format_to_regex(out))
return f
def replacer(self, field, search, replace):
def f(info):
val = info.get(field)
if val is None:
self.to_screen(f'Video does not have a {field}')
return
elif not isinstance(val, str):
self.report_warning(f'Cannot replace in field {field} since it is a {type(val).__name__}')
return
self.write_debug(f'Replacing all {search!r} in {field} with {replace!r}')
info[field], n = search_re.subn(replace, val)
if n:
self.to_screen(f'Changed {field} to: {info[field]}')
else:
self.to_screen(f'Did not find {search!r} in {field}')
search_re = re.compile(search)
return f
class MetadataFromFieldPP(MetadataParserPP):
@classmethod
def to_action(cls, f):
match = re.match(r'(?s)(?P<in>.*?)(?<!\\):(?P<out>.+)$', f)
if match is None:
raise ValueError(f'it should be FROM:TO, not {f!r}')
return (
cls.Actions.INTERPRET,
match.group('in').replace('\\:', ':'),
match.group('out'),
)
def __init__(self, downloader, formats):
super().__init__(downloader, [self.to_action(f) for f in formats])
# Deprecated
class MetadataFromTitlePP(MetadataParserPP):
def __init__(self, downloader, titleformat):
super().__init__(downloader, [(self.Actions.INTERPRET, 'title', titleformat)])
self.deprecation_warning(
'yt_dlp.postprocessor.MetadataFromTitlePP is deprecated '
'and may be removed in a future version. Use yt_dlp.postprocessor.MetadataFromFieldPP instead')

View File

@@ -0,0 +1,338 @@
import copy
import heapq
import os
from .common import PostProcessor
from .ffmpeg import (
FFmpegPostProcessor,
FFmpegSubtitlesConvertorPP
)
from .sponsorblock import SponsorBlockPP
from ..utils import (
orderedSet,
PostProcessingError,
prepend_extension,
)
_TINY_CHAPTER_DURATION = 1
DEFAULT_SPONSORBLOCK_CHAPTER_TITLE = '[SponsorBlock]: %(category_names)l'
class ModifyChaptersPP(FFmpegPostProcessor):
def __init__(self, downloader, remove_chapters_patterns=None, remove_sponsor_segments=None, remove_ranges=None,
*, sponsorblock_chapter_title=DEFAULT_SPONSORBLOCK_CHAPTER_TITLE, force_keyframes=False):
FFmpegPostProcessor.__init__(self, downloader)
self._remove_chapters_patterns = set(remove_chapters_patterns or [])
self._remove_sponsor_segments = set(remove_sponsor_segments or []) - set(SponsorBlockPP.POI_CATEGORIES.keys())
self._ranges_to_remove = set(remove_ranges or [])
self._sponsorblock_chapter_title = sponsorblock_chapter_title
self._force_keyframes = force_keyframes
@PostProcessor._restrict_to(images=False)
def run(self, info):
# Chapters must be preserved intact when downloading multiple formats of the same video.
chapters, sponsor_chapters = self._mark_chapters_to_remove(
copy.deepcopy(info.get('chapters')) or [],
copy.deepcopy(info.get('sponsorblock_chapters')) or [])
if not chapters and not sponsor_chapters:
return [], info
real_duration = self._get_real_video_duration(info['filepath'])
if not chapters:
chapters = [{'start_time': 0, 'end_time': real_duration, 'title': info['title']}]
info['chapters'], cuts = self._remove_marked_arrange_sponsors(chapters + sponsor_chapters)
if not cuts:
return [], info
if self._duration_mismatch(real_duration, info.get('duration')):
if not self._duration_mismatch(real_duration, info['chapters'][-1]['end_time']):
self.to_screen(f'Skipping {self.pp_key()} since the video appears to be already cut')
return [], info
if not info.get('__real_download'):
raise PostProcessingError('Cannot cut video since the real and expected durations mismatch. '
'Different chapters may have already been removed')
else:
self.write_debug('Expected and actual durations mismatch')
concat_opts = self._make_concat_opts(cuts, real_duration)
self.write_debug('Concat spec = %s' % ', '.join(f'{c.get("inpoint", 0.0)}-{c.get("outpoint", "inf")}' for c in concat_opts))
def remove_chapters(file, is_sub):
return file, self.remove_chapters(file, cuts, concat_opts, self._force_keyframes and not is_sub)
in_out_files = [remove_chapters(info['filepath'], False)]
in_out_files.extend(remove_chapters(in_file, True) for in_file in self._get_supported_subs(info))
# Renaming should only happen after all files are processed
files_to_remove = []
for in_file, out_file in in_out_files:
uncut_file = prepend_extension(in_file, 'uncut')
os.replace(in_file, uncut_file)
os.replace(out_file, in_file)
files_to_remove.append(uncut_file)
return files_to_remove, info
def _mark_chapters_to_remove(self, chapters, sponsor_chapters):
if self._remove_chapters_patterns:
warn_no_chapter_to_remove = True
if not chapters:
self.to_screen('Chapter information is unavailable')
warn_no_chapter_to_remove = False
for c in chapters:
if any(regex.search(c['title']) for regex in self._remove_chapters_patterns):
c['remove'] = True
warn_no_chapter_to_remove = False
if warn_no_chapter_to_remove:
self.to_screen('There are no chapters matching the regex')
if self._remove_sponsor_segments:
warn_no_chapter_to_remove = True
if not sponsor_chapters:
self.to_screen('SponsorBlock information is unavailable')
warn_no_chapter_to_remove = False
for c in sponsor_chapters:
if c['category'] in self._remove_sponsor_segments:
c['remove'] = True
warn_no_chapter_to_remove = False
if warn_no_chapter_to_remove:
self.to_screen('There are no matching SponsorBlock chapters')
sponsor_chapters.extend({
'start_time': start,
'end_time': end,
'category': 'manually_removed',
'_categories': [('manually_removed', start, end)],
'remove': True,
} for start, end in self._ranges_to_remove)
return chapters, sponsor_chapters
def _get_supported_subs(self, info):
for sub in (info.get('requested_subtitles') or {}).values():
sub_file = sub.get('filepath')
# The file might have been removed by --embed-subs
if not sub_file or not os.path.exists(sub_file):
continue
ext = sub['ext']
if ext not in FFmpegSubtitlesConvertorPP.SUPPORTED_EXTS:
self.report_warning(f'Cannot remove chapters from external {ext} subtitles; "{sub_file}" is now out of sync')
continue
# TODO: create __real_download for subs?
yield sub_file
def _remove_marked_arrange_sponsors(self, chapters):
# Store cuts separately, since adjacent and overlapping cuts must be merged.
cuts = []
def append_cut(c):
assert 'remove' in c, 'Not a cut is appended to cuts'
last_to_cut = cuts[-1] if cuts else None
if last_to_cut and last_to_cut['end_time'] >= c['start_time']:
last_to_cut['end_time'] = max(last_to_cut['end_time'], c['end_time'])
else:
cuts.append(c)
return len(cuts) - 1
def excess_duration(c):
# Cuts that are completely within the chapter reduce chapters' duration.
# Since cuts can overlap, excess duration may be less that the sum of cuts' durations.
# To avoid that, chapter stores the index to the fist cut within the chapter,
# instead of storing excess duration. append_cut ensures that subsequent cuts (if any)
# will be merged with previous ones (if necessary).
cut_idx, excess = c.pop('cut_idx', len(cuts)), 0
while cut_idx < len(cuts):
cut = cuts[cut_idx]
if cut['start_time'] >= c['end_time']:
break
if cut['end_time'] > c['start_time']:
excess += min(cut['end_time'], c['end_time'])
excess -= max(cut['start_time'], c['start_time'])
cut_idx += 1
return excess
new_chapters = []
def append_chapter(c):
assert 'remove' not in c, 'Cut is appended to chapters'
length = c['end_time'] - c['start_time'] - excess_duration(c)
# Chapter is completely covered by cuts or sponsors.
if length <= 0:
return
start = new_chapters[-1]['end_time'] if new_chapters else 0
c.update(start_time=start, end_time=start + length)
new_chapters.append(c)
# Turn into a priority queue, index is a tie breaker.
# Plain stack sorted by start_time is not enough: after splitting the chapter,
# the part returned to the stack is not guaranteed to have start_time
# less than or equal to the that of the stack's head.
chapters = [(c['start_time'], i, c) for i, c in enumerate(chapters)]
heapq.heapify(chapters)
_, cur_i, cur_chapter = heapq.heappop(chapters)
while chapters:
_, i, c = heapq.heappop(chapters)
# Non-overlapping chapters or cuts can be appended directly. However,
# adjacent non-overlapping cuts must be merged, which is handled by append_cut.
if cur_chapter['end_time'] <= c['start_time']:
(append_chapter if 'remove' not in cur_chapter else append_cut)(cur_chapter)
cur_i, cur_chapter = i, c
continue
# Eight possibilities for overlapping chapters: (cut, cut), (cut, sponsor),
# (cut, normal), (sponsor, cut), (normal, cut), (sponsor, sponsor),
# (sponsor, normal), and (normal, sponsor). There is no (normal, normal):
# normal chapters are assumed not to overlap.
if 'remove' in cur_chapter:
# (cut, cut): adjust end_time.
if 'remove' in c:
cur_chapter['end_time'] = max(cur_chapter['end_time'], c['end_time'])
# (cut, sponsor/normal): chop the beginning of the later chapter
# (if it's not completely hidden by the cut). Push to the priority queue
# to restore sorting by start_time: with beginning chopped, c may actually
# start later than the remaining chapters from the queue.
elif cur_chapter['end_time'] < c['end_time']:
c['start_time'] = cur_chapter['end_time']
c['_was_cut'] = True
heapq.heappush(chapters, (c['start_time'], i, c))
# (sponsor/normal, cut).
elif 'remove' in c:
cur_chapter['_was_cut'] = True
# Chop the end of the current chapter if the cut is not contained within it.
# Chopping the end doesn't break start_time sorting, no PQ push is necessary.
if cur_chapter['end_time'] <= c['end_time']:
cur_chapter['end_time'] = c['start_time']
append_chapter(cur_chapter)
cur_i, cur_chapter = i, c
continue
# Current chapter contains the cut within it. If the current chapter is
# a sponsor chapter, check whether the categories before and after the cut differ.
if '_categories' in cur_chapter:
after_c = dict(cur_chapter, start_time=c['end_time'], _categories=[])
cur_cats = []
for cat_start_end in cur_chapter['_categories']:
if cat_start_end[1] < c['start_time']:
cur_cats.append(cat_start_end)
if cat_start_end[2] > c['end_time']:
after_c['_categories'].append(cat_start_end)
cur_chapter['_categories'] = cur_cats
if cur_chapter['_categories'] != after_c['_categories']:
# Categories before and after the cut differ: push the after part to PQ.
heapq.heappush(chapters, (after_c['start_time'], cur_i, after_c))
cur_chapter['end_time'] = c['start_time']
append_chapter(cur_chapter)
cur_i, cur_chapter = i, c
continue
# Either sponsor categories before and after the cut are the same or
# we're dealing with a normal chapter. Just register an outstanding cut:
# subsequent append_chapter will reduce the duration.
cur_chapter.setdefault('cut_idx', append_cut(c))
# (sponsor, normal): if a normal chapter is not completely overlapped,
# chop the beginning of it and push it to PQ.
elif '_categories' in cur_chapter and '_categories' not in c:
if cur_chapter['end_time'] < c['end_time']:
c['start_time'] = cur_chapter['end_time']
c['_was_cut'] = True
heapq.heappush(chapters, (c['start_time'], i, c))
# (normal, sponsor) and (sponsor, sponsor)
else:
assert '_categories' in c, 'Normal chapters overlap'
cur_chapter['_was_cut'] = True
c['_was_cut'] = True
# Push the part after the sponsor to PQ.
if cur_chapter['end_time'] > c['end_time']:
# deepcopy to make categories in after_c and cur_chapter/c refer to different lists.
after_c = dict(copy.deepcopy(cur_chapter), start_time=c['end_time'])
heapq.heappush(chapters, (after_c['start_time'], cur_i, after_c))
# Push the part after the overlap to PQ.
elif c['end_time'] > cur_chapter['end_time']:
after_cur = dict(copy.deepcopy(c), start_time=cur_chapter['end_time'])
heapq.heappush(chapters, (after_cur['start_time'], cur_i, after_cur))
c['end_time'] = cur_chapter['end_time']
# (sponsor, sponsor): merge categories in the overlap.
if '_categories' in cur_chapter:
c['_categories'] = cur_chapter['_categories'] + c['_categories']
# Inherit the cuts that the current chapter has accumulated within it.
if 'cut_idx' in cur_chapter:
c['cut_idx'] = cur_chapter['cut_idx']
cur_chapter['end_time'] = c['start_time']
append_chapter(cur_chapter)
cur_i, cur_chapter = i, c
(append_chapter if 'remove' not in cur_chapter else append_cut)(cur_chapter)
return self._remove_tiny_rename_sponsors(new_chapters), cuts
def _remove_tiny_rename_sponsors(self, chapters):
new_chapters = []
for i, c in enumerate(chapters):
# Merge with the previous/next if the chapter is tiny.
# Only tiny chapters resulting from a cut can be skipped.
# Chapters that were already tiny in the original list will be preserved.
if (('_was_cut' in c or '_categories' in c)
and c['end_time'] - c['start_time'] < _TINY_CHAPTER_DURATION):
if not new_chapters:
# Prepend tiny chapter to the next one if possible.
if i < len(chapters) - 1:
chapters[i + 1]['start_time'] = c['start_time']
continue
else:
old_c = new_chapters[-1]
if i < len(chapters) - 1:
next_c = chapters[i + 1]
# Not a typo: key names in old_c and next_c are really different.
prev_is_sponsor = 'categories' in old_c
next_is_sponsor = '_categories' in next_c
# Preferentially prepend tiny normals to normals and sponsors to sponsors.
if (('_categories' not in c and prev_is_sponsor and not next_is_sponsor)
or ('_categories' in c and not prev_is_sponsor and next_is_sponsor)):
next_c['start_time'] = c['start_time']
continue
old_c['end_time'] = c['end_time']
continue
c.pop('_was_cut', None)
cats = c.pop('_categories', None)
if cats:
category = min(cats, key=lambda c: c[2] - c[1])[0]
cats = orderedSet(x[0] for x in cats)
c.update({
'category': category,
'categories': cats,
'name': SponsorBlockPP.CATEGORIES[category],
'category_names': [SponsorBlockPP.CATEGORIES[c] for c in cats]
})
c['title'] = self._downloader.evaluate_outtmpl(self._sponsorblock_chapter_title, c.copy())
# Merge identically named sponsors.
if (new_chapters and 'categories' in new_chapters[-1]
and new_chapters[-1]['title'] == c['title']):
new_chapters[-1]['end_time'] = c['end_time']
continue
new_chapters.append(c)
return new_chapters
def remove_chapters(self, filename, ranges_to_cut, concat_opts, force_keyframes=False):
in_file = filename
out_file = prepend_extension(in_file, 'temp')
if force_keyframes:
in_file = self.force_keyframes(in_file, (t for c in ranges_to_cut for t in (c['start_time'], c['end_time'])))
self.to_screen(f'Removing chapters from {filename}')
self.concat_files([in_file] * len(concat_opts), out_file, concat_opts)
if in_file != filename:
os.remove(in_file)
return out_file
@staticmethod
def _make_concat_opts(chapters_to_remove, duration):
opts = [{}]
for s in chapters_to_remove:
# Do not create 0 duration chunk at the beginning.
if s['start_time'] == 0:
opts[-1]['inpoint'] = f'{s["end_time"]:.6f}'
continue
opts[-1]['outpoint'] = f'{s["start_time"]:.6f}'
# Do not create 0 duration chunk at the end.
if s['end_time'] < duration:
opts.append({'inpoint': f'{s["end_time"]:.6f}'})
return opts

View File

@@ -0,0 +1,54 @@
from __future__ import unicode_literals
import os
import shutil
from .common import PostProcessor
from ..utils import (
decodeFilename,
encodeFilename,
make_dir,
PostProcessingError,
)
class MoveFilesAfterDownloadPP(PostProcessor):
def __init__(self, downloader=None, downloaded=True):
PostProcessor.__init__(self, downloader)
self._downloaded = downloaded
@classmethod
def pp_key(cls):
return 'MoveFiles'
def run(self, info):
dl_path, dl_name = os.path.split(encodeFilename(info['filepath']))
finaldir = info.get('__finaldir', dl_path)
finalpath = os.path.join(finaldir, dl_name)
if self._downloaded:
info['__files_to_move'][info['filepath']] = decodeFilename(finalpath)
make_newfilename = lambda old: decodeFilename(os.path.join(finaldir, os.path.basename(encodeFilename(old))))
for oldfile, newfile in info['__files_to_move'].items():
if not newfile:
newfile = make_newfilename(oldfile)
if os.path.abspath(encodeFilename(oldfile)) == os.path.abspath(encodeFilename(newfile)):
continue
if not os.path.exists(encodeFilename(oldfile)):
self.report_warning('File "%s" cannot be found' % oldfile)
continue
if os.path.exists(encodeFilename(newfile)):
if self.get_param('overwrites', True):
self.report_warning('Replacing existing file "%s"' % newfile)
os.remove(encodeFilename(newfile))
else:
self.report_warning(
'Cannot move file "%s" out of temporary directory since "%s" already exists. '
% (oldfile, newfile))
continue
make_dir(newfile, PostProcessingError)
self.to_screen('Moving file "%s" to "%s"' % (oldfile, newfile))
shutil.move(oldfile, newfile) # os.rename cannot move between volumes
info['filepath'] = finalpath
return [], info

View File

@@ -0,0 +1,101 @@
from __future__ import unicode_literals
import os
import subprocess
from .common import PostProcessor
from ..compat import compat_shlex_split
from ..utils import (
check_executable,
cli_option,
encodeArgument,
encodeFilename,
shell_quote,
str_or_none,
Popen,
PostProcessingError,
prepend_extension,
)
# Deprecated in favor of the native implementation
class SponSkrubPP(PostProcessor):
_temp_ext = 'spons'
_exe_name = 'sponskrub'
def __init__(self, downloader, path='', args=None, ignoreerror=False, cut=False, force=False, _from_cli=False):
PostProcessor.__init__(self, downloader)
self.force = force
self.cutout = cut
self.args = str_or_none(args) or '' # For backward compatibility
self.path = self.get_exe(path)
if not _from_cli:
self.deprecation_warning(
'yt_dlp.postprocessor.SponSkrubPP support is deprecated and may be removed in a future version. '
'Use yt_dlp.postprocessor.SponsorBlock and yt_dlp.postprocessor.ModifyChaptersPP instead')
if not ignoreerror and self.path is None:
if path:
raise PostProcessingError('sponskrub not found in "%s"' % path)
else:
raise PostProcessingError('sponskrub not found. Please install or provide the path using --sponskrub-path')
def get_exe(self, path=''):
if not path or not check_executable(path, ['-h']):
path = os.path.join(path, self._exe_name)
if not check_executable(path, ['-h']):
return None
return path
@PostProcessor._restrict_to(images=False)
def run(self, information):
if self.path is None:
return [], information
filename = information['filepath']
if not os.path.exists(encodeFilename(filename)): # no download
return [], information
if information['extractor_key'].lower() != 'youtube':
self.to_screen('Skipping sponskrub since it is not a YouTube video')
return [], information
if self.cutout and not self.force and not information.get('__real_download', False):
self.report_warning(
'Skipping sponskrub since the video was already downloaded. '
'Use --sponskrub-force to run sponskrub anyway')
return [], information
self.to_screen('Trying to %s sponsor sections' % ('remove' if self.cutout else 'mark'))
if self.cutout:
self.report_warning('Cutting out sponsor segments will cause the subtitles to go out of sync.')
if not information.get('__real_download', False):
self.report_warning('If sponskrub is run multiple times, unintended parts of the video could be cut out.')
temp_filename = prepend_extension(filename, self._temp_ext)
if os.path.exists(encodeFilename(temp_filename)):
os.remove(encodeFilename(temp_filename))
cmd = [self.path]
if not self.cutout:
cmd += ['-chapter']
cmd += cli_option(self._downloader.params, '-proxy', 'proxy')
cmd += compat_shlex_split(self.args) # For backward compatibility
cmd += self._configuration_args(self._exe_name, use_compat=False)
cmd += ['--', information['id'], filename, temp_filename]
cmd = [encodeArgument(i) for i in cmd]
self.write_debug('sponskrub command line: %s' % shell_quote(cmd))
pipe = None if self.get_param('verbose') else subprocess.PIPE
p = Popen(cmd, stdout=pipe)
stdout = p.communicate_or_kill()[0]
if p.returncode == 0:
os.replace(temp_filename, filename)
self.to_screen('Sponsor sections have been %s' % ('removed' if self.cutout else 'marked'))
elif p.returncode == 3:
self.to_screen('No segments in the SponsorBlock database')
else:
msg = stdout.decode('utf-8', 'replace').strip() if stdout else ''
msg = msg.split('\n')[0 if msg.lower().startswith('unrecognised') else -1]
raise PostProcessingError(msg if msg else 'sponskrub failed with error code %s' % p.returncode)
return [], information

View File

@@ -0,0 +1,117 @@
from hashlib import sha256
import itertools
import json
import re
import time
from .ffmpeg import FFmpegPostProcessor
from ..compat import compat_urllib_parse_urlencode, compat_HTTPError
from ..utils import PostProcessingError, network_exceptions, sanitized_Request
class SponsorBlockPP(FFmpegPostProcessor):
# https://wiki.sponsor.ajay.app/w/Types
EXTRACTORS = {
'Youtube': 'YouTube',
}
POI_CATEGORIES = {
'poi_highlight': 'Highlight',
}
CATEGORIES = {
'sponsor': 'Sponsor',
'intro': 'Intermission/Intro Animation',
'outro': 'Endcards/Credits',
'selfpromo': 'Unpaid/Self Promotion',
'preview': 'Preview/Recap',
'filler': 'Filler Tangent',
'interaction': 'Interaction Reminder',
'music_offtopic': 'Non-Music Section',
**POI_CATEGORIES,
}
def __init__(self, downloader, categories=None, api='https://sponsor.ajay.app'):
FFmpegPostProcessor.__init__(self, downloader)
self._categories = tuple(categories or self.CATEGORIES.keys())
self._API_URL = api if re.match('^https?://', api) else 'https://' + api
def run(self, info):
extractor = info['extractor_key']
if extractor not in self.EXTRACTORS:
self.to_screen(f'SponsorBlock is not supported for {extractor}')
return [], info
self.to_screen('Fetching SponsorBlock segments')
info['sponsorblock_chapters'] = self._get_sponsor_chapters(info, info['duration'])
return [], info
def _get_sponsor_chapters(self, info, duration):
segments = self._get_sponsor_segments(info['id'], self.EXTRACTORS[info['extractor_key']])
def duration_filter(s):
start_end = s['segment']
# Ignore milliseconds difference at the start.
if start_end[0] <= 1:
start_end[0] = 0
# Make POI chapters 1 sec so that we can properly mark them
if s['category'] in self.POI_CATEGORIES.keys():
start_end[1] += 1
# Ignore milliseconds difference at the end.
# Never allow the segment to exceed the video.
if duration and duration - start_end[1] <= 1:
start_end[1] = duration
# SponsorBlock duration may be absent or it may deviate from the real one.
return s['videoDuration'] == 0 or not duration or abs(duration - s['videoDuration']) <= 1
duration_match = [s for s in segments if duration_filter(s)]
if len(duration_match) != len(segments):
self.report_warning('Some SponsorBlock segments are from a video of different duration, maybe from an old version of this video')
def to_chapter(s):
(start, end), cat = s['segment'], s['category']
return {
'start_time': start,
'end_time': end,
'category': cat,
'title': self.CATEGORIES[cat],
'_categories': [(cat, start, end)]
}
sponsor_chapters = [to_chapter(s) for s in duration_match]
if not sponsor_chapters:
self.to_screen('No segments were found in the SponsorBlock database')
else:
self.to_screen(f'Found {len(sponsor_chapters)} segments in the SponsorBlock database')
return sponsor_chapters
def _get_sponsor_segments(self, video_id, service):
hash = sha256(video_id.encode('ascii')).hexdigest()
# SponsorBlock API recommends using first 4 hash characters.
url = f'{self._API_URL}/api/skipSegments/{hash[:4]}?' + compat_urllib_parse_urlencode({
'service': service,
'categories': json.dumps(self._categories),
})
self.write_debug(f'SponsorBlock query: {url}')
for d in self._get_json(url):
if d['videoID'] == video_id:
return d['segments']
return []
def _get_json(self, url):
# While this is not an extractor, it behaves similar to one and
# so obey extractor_retries and sleep_interval_requests
max_retries = self.get_param('extractor_retries', 3)
sleep_interval = self.get_param('sleep_interval_requests') or 0
for retries in itertools.count():
try:
rsp = self._downloader.urlopen(sanitized_Request(url))
return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8'))
except network_exceptions as e:
if isinstance(e, compat_HTTPError) and e.code == 404:
return []
if retries < max_retries:
self.report_warning(f'{e}. Retrying...')
if sleep_interval > 0:
self.to_screen(f'Sleeping {sleep_interval} seconds ...')
time.sleep(sleep_interval)
continue
raise PostProcessingError(f'Unable to communicate with SponsorBlock API: {e}')

View File

@@ -0,0 +1,78 @@
from __future__ import unicode_literals
from .common import PostProcessor
from ..compat import compat_os_name
from ..utils import (
hyphenate_date,
write_xattr,
PostProcessingError,
XAttrMetadataError,
XAttrUnavailableError,
)
class XAttrMetadataPP(PostProcessor):
#
# More info about extended attributes for media:
# http://freedesktop.org/wiki/CommonExtendedAttributes/
# http://www.freedesktop.org/wiki/PhreedomDraft/
# http://dublincore.org/documents/usageguide/elements.shtml
#
# TODO:
# * capture youtube keywords and put them in 'user.dublincore.subject' (comma-separated)
# * figure out which xattrs can be used for 'duration', 'thumbnail', 'resolution'
#
def run(self, info):
""" Set extended attributes on downloaded file (if xattr support is found). """
# Write the metadata to the file's xattrs
self.to_screen('Writing metadata to file\'s xattrs')
filename = info['filepath']
try:
xattr_mapping = {
'user.xdg.referrer.url': 'webpage_url',
# 'user.xdg.comment': 'description',
'user.dublincore.title': 'title',
'user.dublincore.date': 'upload_date',
'user.dublincore.description': 'description',
'user.dublincore.contributor': 'uploader',
'user.dublincore.format': 'format',
}
num_written = 0
for xattrname, infoname in xattr_mapping.items():
value = info.get(infoname)
if value:
if infoname == 'upload_date':
value = hyphenate_date(value)
byte_value = value.encode('utf-8')
write_xattr(filename, xattrname, byte_value)
num_written += 1
return [], info
except XAttrUnavailableError as e:
raise PostProcessingError(str(e))
except XAttrMetadataError as e:
if e.reason == 'NO_SPACE':
self.report_warning(
'There\'s no disk space left, disk quota exceeded or filesystem xattr limit exceeded. '
+ (('Some ' if num_written else '') + 'extended attributes are not written.').capitalize())
elif e.reason == 'VALUE_TOO_LONG':
self.report_warning(
'Unable to write extended attributes due to too long values.')
else:
msg = 'This filesystem doesn\'t support extended attributes. '
if compat_os_name == 'nt':
msg += 'You need to use NTFS.'
else:
msg += '(You may have to enable them in your /etc/fstab)'
raise PostProcessingError(str(e))
return [], info