mirror of
https://github.com/yt-dlp/yt-dlp
synced 2025-01-01 06:21:09 +01:00
add96eb9f8
Authored by: seproDev Reviewed-by: bashonly <88596187+bashonly@users.noreply.github.com> Reviewed-by: Simon Sawicki <contact@grub4k.xyz>
398 lines
11 KiB
Python
398 lines
11 KiB
Python
"""
|
|
A partial parser for WebVTT segments. Interprets enough of the WebVTT stream
|
|
to be able to assemble a single stand-alone subtitle file, suitably adjusting
|
|
timestamps on the way, while everything else is passed through unmodified.
|
|
|
|
Regular expressions based on the W3C WebVTT specification
|
|
<https://www.w3.org/TR/webvtt1/>. The X-TIMESTAMP-MAP extension is described
|
|
in RFC 8216 §3.5 <https://tools.ietf.org/html/rfc8216#section-3.5>.
|
|
"""
|
|
|
|
import io
|
|
import re
|
|
|
|
from .utils import int_or_none, timetuple_from_msec
|
|
|
|
|
|
class _MatchParser:
|
|
"""
|
|
An object that maintains the current parsing position and allows
|
|
conveniently advancing it as syntax elements are successfully parsed.
|
|
"""
|
|
|
|
def __init__(self, string):
|
|
self._data = string
|
|
self._pos = 0
|
|
|
|
def match(self, r):
|
|
if isinstance(r, re.Pattern):
|
|
return r.match(self._data, self._pos)
|
|
if isinstance(r, str):
|
|
if self._data.startswith(r, self._pos):
|
|
return len(r)
|
|
return None
|
|
raise ValueError(r)
|
|
|
|
def advance(self, by):
|
|
if by is None:
|
|
amt = 0
|
|
elif isinstance(by, re.Match):
|
|
amt = len(by.group(0))
|
|
elif isinstance(by, str):
|
|
amt = len(by)
|
|
elif isinstance(by, int):
|
|
amt = by
|
|
else:
|
|
raise ValueError(by)
|
|
self._pos += amt
|
|
return by
|
|
|
|
def consume(self, r):
|
|
return self.advance(self.match(r))
|
|
|
|
def child(self):
|
|
return _MatchChildParser(self)
|
|
|
|
|
|
class _MatchChildParser(_MatchParser):
|
|
"""
|
|
A child parser state, which advances through the same data as
|
|
its parent, but has an independent position. This is useful when
|
|
advancing through syntax elements we might later want to backtrack
|
|
from.
|
|
"""
|
|
|
|
def __init__(self, parent):
|
|
super().__init__(parent._data)
|
|
self.__parent = parent
|
|
self._pos = parent._pos
|
|
|
|
def commit(self):
|
|
"""
|
|
Advance the parent state to the current position of this child state.
|
|
"""
|
|
self.__parent._pos = self._pos
|
|
return self.__parent
|
|
|
|
|
|
class ParseError(Exception):
|
|
def __init__(self, parser):
|
|
data = parser._data[parser._pos:parser._pos + 100]
|
|
super().__init__(f'Parse error at position {parser._pos} (near {data!r})')
|
|
|
|
|
|
# While the specification <https://www.w3.org/TR/webvtt1/#webvtt-timestamp>
|
|
# prescribes that hours must be *2 or more* digits, timestamps with a single
|
|
# digit for the hour part has been seen in the wild.
|
|
# See https://github.com/yt-dlp/yt-dlp/issues/921
|
|
_REGEX_TS = re.compile(r'''(?x)
|
|
(?:([0-9]{1,}):)?
|
|
([0-9]{2}):
|
|
([0-9]{2})\.
|
|
([0-9]{3})?
|
|
''')
|
|
_REGEX_EOF = re.compile(r'\Z')
|
|
_REGEX_NL = re.compile(r'(?:\r\n|[\r\n]|$)')
|
|
_REGEX_BLANK = re.compile(r'(?:\r\n|[\r\n])+')
|
|
_REGEX_OPTIONAL_WHITESPACE = re.compile(r'[ \t]*')
|
|
|
|
|
|
def _parse_ts(ts):
|
|
"""
|
|
Convert a parsed WebVTT timestamp (a re.Match obtained from _REGEX_TS)
|
|
into an MPEG PES timestamp: a tick counter at 90 kHz resolution.
|
|
"""
|
|
return 90 * sum(
|
|
int(part or 0) * mult for part, mult in zip(ts.groups(), (3600_000, 60_000, 1000, 1)))
|
|
|
|
|
|
def _format_ts(ts):
|
|
"""
|
|
Convert an MPEG PES timestamp into a WebVTT timestamp.
|
|
This will lose sub-millisecond precision.
|
|
"""
|
|
return '%02u:%02u:%02u.%03u' % timetuple_from_msec(int((ts + 45) // 90))
|
|
|
|
|
|
class Block:
|
|
"""
|
|
An abstract WebVTT block.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
for key, val in kwargs.items():
|
|
setattr(self, key, val)
|
|
|
|
@classmethod
|
|
def parse(cls, parser):
|
|
m = parser.match(cls._REGEX)
|
|
if not m:
|
|
return None
|
|
parser.advance(m)
|
|
return cls(raw=m.group(0))
|
|
|
|
def write_into(self, stream):
|
|
stream.write(self.raw)
|
|
|
|
|
|
class HeaderBlock(Block):
|
|
"""
|
|
A WebVTT block that may only appear in the header part of the file,
|
|
i.e. before any cue blocks.
|
|
"""
|
|
pass
|
|
|
|
|
|
class Magic(HeaderBlock):
|
|
_REGEX = re.compile(r'\ufeff?WEBVTT([ \t][^\r\n]*)?(?:\r\n|[\r\n])')
|
|
|
|
# XXX: The X-TIMESTAMP-MAP extension is described in RFC 8216 §3.5
|
|
# <https://tools.ietf.org/html/rfc8216#section-3.5>, but the RFC
|
|
# doesn't specify the exact grammar nor where in the WebVTT
|
|
# syntax it should be placed; the below has been devised based
|
|
# on usage in the wild
|
|
#
|
|
# And strictly speaking, the presence of this extension violates
|
|
# the W3C WebVTT spec. Oh well.
|
|
|
|
_REGEX_TSMAP = re.compile(r'X-TIMESTAMP-MAP=')
|
|
_REGEX_TSMAP_LOCAL = re.compile(r'LOCAL:')
|
|
_REGEX_TSMAP_MPEGTS = re.compile(r'MPEGTS:([0-9]+)')
|
|
_REGEX_TSMAP_SEP = re.compile(r'[ \t]*,[ \t]*')
|
|
|
|
# This was removed from the spec in the 2017 revision;
|
|
# the last spec draft to describe this syntax element is
|
|
# <https://www.w3.org/TR/2015/WD-webvtt1-20151208/#webvtt-metadata-header>.
|
|
# Nevertheless, YouTube keeps serving those
|
|
_REGEX_META = re.compile(r'(?:(?!-->)[^\r\n])+:(?:(?!-->)[^\r\n])+(?:\r\n|[\r\n])')
|
|
|
|
@classmethod
|
|
def __parse_tsmap(cls, parser):
|
|
parser = parser.child()
|
|
|
|
while True:
|
|
m = parser.consume(cls._REGEX_TSMAP_LOCAL)
|
|
if m:
|
|
m = parser.consume(_REGEX_TS)
|
|
if m is None:
|
|
raise ParseError(parser)
|
|
local = _parse_ts(m)
|
|
if local is None:
|
|
raise ParseError(parser)
|
|
else:
|
|
m = parser.consume(cls._REGEX_TSMAP_MPEGTS)
|
|
if m:
|
|
mpegts = int_or_none(m.group(1))
|
|
if mpegts is None:
|
|
raise ParseError(parser)
|
|
else:
|
|
raise ParseError(parser)
|
|
if parser.consume(cls._REGEX_TSMAP_SEP):
|
|
continue
|
|
if parser.consume(_REGEX_NL):
|
|
break
|
|
raise ParseError(parser)
|
|
|
|
parser.commit()
|
|
return local, mpegts
|
|
|
|
@classmethod
|
|
def parse(cls, parser):
|
|
parser = parser.child()
|
|
|
|
m = parser.consume(cls._REGEX)
|
|
if not m:
|
|
raise ParseError(parser)
|
|
|
|
extra = m.group(1)
|
|
local, mpegts, meta = None, None, ''
|
|
while not parser.consume(_REGEX_NL):
|
|
if parser.consume(cls._REGEX_TSMAP):
|
|
local, mpegts = cls.__parse_tsmap(parser)
|
|
continue
|
|
m = parser.consume(cls._REGEX_META)
|
|
if m:
|
|
meta += m.group(0)
|
|
continue
|
|
raise ParseError(parser)
|
|
parser.commit()
|
|
return cls(extra=extra, mpegts=mpegts, local=local, meta=meta)
|
|
|
|
def write_into(self, stream):
|
|
stream.write('WEBVTT')
|
|
if self.extra is not None:
|
|
stream.write(self.extra)
|
|
stream.write('\n')
|
|
if self.local or self.mpegts:
|
|
stream.write('X-TIMESTAMP-MAP=LOCAL:')
|
|
stream.write(_format_ts(self.local if self.local is not None else 0))
|
|
stream.write(',MPEGTS:')
|
|
stream.write(str(self.mpegts if self.mpegts is not None else 0))
|
|
stream.write('\n')
|
|
if self.meta:
|
|
stream.write(self.meta)
|
|
stream.write('\n')
|
|
|
|
|
|
class StyleBlock(HeaderBlock):
|
|
_REGEX = re.compile(r'''(?x)
|
|
STYLE[\ \t]*(?:\r\n|[\r\n])
|
|
((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
|
|
(?:\r\n|[\r\n])
|
|
''')
|
|
|
|
|
|
class RegionBlock(HeaderBlock):
|
|
_REGEX = re.compile(r'''(?x)
|
|
REGION[\ \t]*
|
|
((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
|
|
(?:\r\n|[\r\n])
|
|
''')
|
|
|
|
|
|
class CommentBlock(Block):
|
|
_REGEX = re.compile(r'''(?x)
|
|
NOTE(?:\r\n|[\ \t\r\n])
|
|
((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
|
|
(?:\r\n|[\r\n])
|
|
''')
|
|
|
|
|
|
class CueBlock(Block):
|
|
"""
|
|
A cue block. The payload is not interpreted.
|
|
"""
|
|
|
|
_REGEX_ID = re.compile(r'((?:(?!-->)[^\r\n])+)(?:\r\n|[\r\n])')
|
|
_REGEX_ARROW = re.compile(r'[ \t]+-->[ \t]+')
|
|
_REGEX_SETTINGS = re.compile(r'[ \t]+((?:(?!-->)[^\r\n])+)')
|
|
_REGEX_PAYLOAD = re.compile(r'[^\r\n]+(?:\r\n|[\r\n])?')
|
|
|
|
@classmethod
|
|
def parse(cls, parser):
|
|
parser = parser.child()
|
|
|
|
id_ = None
|
|
m = parser.consume(cls._REGEX_ID)
|
|
if m:
|
|
id_ = m.group(1)
|
|
|
|
m0 = parser.consume(_REGEX_TS)
|
|
if not m0:
|
|
return None
|
|
if not parser.consume(cls._REGEX_ARROW):
|
|
return None
|
|
m1 = parser.consume(_REGEX_TS)
|
|
if not m1:
|
|
return None
|
|
m2 = parser.consume(cls._REGEX_SETTINGS)
|
|
parser.consume(_REGEX_OPTIONAL_WHITESPACE)
|
|
if not parser.consume(_REGEX_NL):
|
|
return None
|
|
|
|
start = _parse_ts(m0)
|
|
end = _parse_ts(m1)
|
|
settings = m2.group(1) if m2 is not None else None
|
|
|
|
text = io.StringIO()
|
|
while True:
|
|
m = parser.consume(cls._REGEX_PAYLOAD)
|
|
if not m:
|
|
break
|
|
text.write(m.group(0))
|
|
|
|
parser.commit()
|
|
return cls(
|
|
id=id_,
|
|
start=start, end=end, settings=settings,
|
|
text=text.getvalue(),
|
|
)
|
|
|
|
def write_into(self, stream):
|
|
if self.id is not None:
|
|
stream.write(self.id)
|
|
stream.write('\n')
|
|
stream.write(_format_ts(self.start))
|
|
stream.write(' --> ')
|
|
stream.write(_format_ts(self.end))
|
|
if self.settings is not None:
|
|
stream.write(' ')
|
|
stream.write(self.settings)
|
|
stream.write('\n')
|
|
stream.write(self.text)
|
|
stream.write('\n')
|
|
|
|
@property
|
|
def as_json(self):
|
|
return {
|
|
'id': self.id,
|
|
'start': self.start,
|
|
'end': self.end,
|
|
'text': self.text,
|
|
'settings': self.settings,
|
|
}
|
|
|
|
def __eq__(self, other):
|
|
return self.as_json == other.as_json
|
|
|
|
@classmethod
|
|
def from_json(cls, json):
|
|
return cls(
|
|
id=json['id'],
|
|
start=json['start'],
|
|
end=json['end'],
|
|
text=json['text'],
|
|
settings=json['settings'],
|
|
)
|
|
|
|
def hinges(self, other):
|
|
if self.text != other.text:
|
|
return False
|
|
if self.settings != other.settings:
|
|
return False
|
|
return self.start <= self.end == other.start <= other.end
|
|
|
|
|
|
def parse_fragment(frag_content):
|
|
"""
|
|
A generator that yields (partially) parsed WebVTT blocks when given
|
|
a bytes object containing the raw contents of a WebVTT file.
|
|
"""
|
|
|
|
parser = _MatchParser(frag_content.decode())
|
|
|
|
yield Magic.parse(parser)
|
|
|
|
while not parser.match(_REGEX_EOF):
|
|
if parser.consume(_REGEX_BLANK):
|
|
continue
|
|
|
|
block = RegionBlock.parse(parser)
|
|
if block:
|
|
yield block
|
|
continue
|
|
block = StyleBlock.parse(parser)
|
|
if block:
|
|
yield block
|
|
continue
|
|
block = CommentBlock.parse(parser)
|
|
if block:
|
|
yield block # XXX: or skip
|
|
continue
|
|
|
|
break
|
|
|
|
while not parser.match(_REGEX_EOF):
|
|
if parser.consume(_REGEX_BLANK):
|
|
continue
|
|
|
|
block = CommentBlock.parse(parser)
|
|
if block:
|
|
yield block # XXX: or skip
|
|
continue
|
|
block = CueBlock.parse(parser)
|
|
if block:
|
|
yield block
|
|
continue
|
|
|
|
raise ParseError(parser)
|