diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py index b4b680dae1..b4f003d37f 100644 --- a/yt_dlp/downloader/fragment.py +++ b/yt_dlp/downloader/fragment.py @@ -14,6 +14,7 @@ from ..networking import Request from ..networking.exceptions import HTTPError, IncompleteRead from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj from ..utils.networking import HTTPHeaderDict +from ..utils.progress import ProgressCalculator class HttpQuietDownloader(HttpFD): @@ -226,8 +227,7 @@ class FragmentFD(FileDownloader): resume_len = ctx['complete_frags_downloaded_bytes'] total_frags = ctx['total_frags'] ctx_id = ctx.get('ctx_id') - # This dict stores the download progress, it's updated by the progress - # hook + # Stores the download progress, updated by the progress hook state = { 'status': 'downloading', 'downloaded_bytes': resume_len, @@ -237,14 +237,8 @@ class FragmentFD(FileDownloader): 'tmpfilename': ctx['tmpfilename'], } - start = time.time() - ctx.update({ - 'started': start, - 'fragment_started': start, - # Amount of fragment's bytes downloaded by the time of the previous - # frag progress hook invocation - 'prev_frag_downloaded_bytes': 0, - }) + ctx['started'] = time.time() + progress = ProgressCalculator(resume_len) def frag_progress_hook(s): if s['status'] not in ('downloading', 'finished'): @@ -259,38 +253,35 @@ class FragmentFD(FileDownloader): state['max_progress'] = ctx.get('max_progress') state['progress_idx'] = ctx.get('progress_idx') - time_now = time.time() - state['elapsed'] = time_now - start + state['elapsed'] = progress.elapsed frag_total_bytes = s.get('total_bytes') or 0 s['fragment_info_dict'] = s.pop('info_dict', {}) + + # XXX: Fragment resume is not accounted for here if not ctx['live']: estimated_size = ( (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes) / (state['fragment_index'] + 1) * total_frags) - state['total_bytes_estimate'] = estimated_size + progress.total = estimated_size + progress.update(s.get('downloaded_bytes')) + state['total_bytes_estimate'] = progress.total + else: + progress.update(s.get('downloaded_bytes')) if s['status'] == 'finished': state['fragment_index'] += 1 ctx['fragment_index'] = state['fragment_index'] - state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes'] - ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes'] - ctx['speed'] = state['speed'] = self.calc_speed( - ctx['fragment_started'], time_now, frag_total_bytes) - ctx['fragment_started'] = time.time() - ctx['prev_frag_downloaded_bytes'] = 0 - else: - frag_downloaded_bytes = s['downloaded_bytes'] - state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes'] - ctx['speed'] = state['speed'] = self.calc_speed( - ctx['fragment_started'], time_now, frag_downloaded_bytes - ctx.get('frag_resume_len', 0)) - if not ctx['live']: - state['eta'] = self.calc_eta(state['speed'], estimated_size - state['downloaded_bytes']) - ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes + progress.thread_reset() + + state['downloaded_bytes'] = ctx['complete_frags_downloaded_bytes'] = progress.downloaded + state['speed'] = ctx['speed'] = progress.speed.smooth + state['eta'] = progress.eta.smooth + self._hook_progress(state, info_dict) ctx['dl'].add_progress_hook(frag_progress_hook) - return start + return ctx['started'] def _finish_frag_download(self, ctx, info_dict): ctx['dest_stream'].close() @@ -500,7 +491,6 @@ class FragmentFD(FileDownloader): download_fragment(fragment, ctx_copy) return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized') - self.report_warning('The download speed shown is only of one thread. This is a known issue') with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: try: for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments): diff --git a/yt_dlp/utils/progress.py b/yt_dlp/utils/progress.py new file mode 100644 index 0000000000..f254a3887e --- /dev/null +++ b/yt_dlp/utils/progress.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import bisect +import threading +import time + + +class ProgressCalculator: + # Time to calculate the speed over (seconds) + SAMPLING_WINDOW = 3 + # Minimum timeframe before to sample next downloaded bytes (seconds) + SAMPLING_RATE = 0.05 + # Time before showing eta (seconds) + GRACE_PERIOD = 1 + + def __init__(self, initial: int): + self._initial = initial or 0 + self.downloaded = self._initial + + self.elapsed: float = 0 + self.speed = SmoothValue(0, smoothing=0.7) + self.eta = SmoothValue(None, smoothing=0.9) + + self._total = 0 + self._start_time = time.monotonic() + self._last_update = self._start_time + + self._lock = threading.Lock() + self._thread_sizes: dict[int, int] = {} + + self._times = [self._start_time] + self._downloaded = [self.downloaded] + + @property + def total(self): + return self._total + + @total.setter + def total(self, value: int | None): + with self._lock: + if value is not None and value < self.downloaded: + value = self.downloaded + + self._total = value + + def thread_reset(self): + current_thread = threading.get_ident() + with self._lock: + self._thread_sizes[current_thread] = 0 + + def update(self, size: int | None): + if not size: + return + + current_thread = threading.get_ident() + + with self._lock: + last_size = self._thread_sizes.get(current_thread, 0) + self._thread_sizes[current_thread] = size + self._update(size - last_size) + + def _update(self, size: int): + current_time = time.monotonic() + + self.downloaded += size + self.elapsed = current_time - self._start_time + if self.total is not None and self.downloaded > self.total: + self._total = self.downloaded + + if self._last_update + self.SAMPLING_RATE > current_time: + return + self._last_update = current_time + + self._times.append(current_time) + self._downloaded.append(self.downloaded) + + offset = bisect.bisect_left(self._times, current_time - self.SAMPLING_WINDOW) + del self._times[:offset] + del self._downloaded[:offset] + if len(self._times) < 2: + self.speed.reset() + self.eta.reset() + return + + download_time = current_time - self._times[0] + if not download_time: + return + + self.speed.set((self.downloaded - self._downloaded[0]) / download_time) + if self.total and self.speed.value and self.elapsed > self.GRACE_PERIOD: + self.eta.set((self.total - self.downloaded) / self.speed.value) + else: + self.eta.reset() + + +class SmoothValue: + def __init__(self, initial: float | None, smoothing: float): + self.value = self.smooth = self._initial = initial + self._smoothing = smoothing + + def set(self, value: float): + self.value = value + if self.smooth is None: + self.smooth = self.value + else: + self.smooth = (1 - self._smoothing) * value + self._smoothing * self.smooth + + def reset(self): + self.value = self.smooth = self._initial