From 83dedc83691ab82c625131082b45f03bb3704b3d Mon Sep 17 00:00:00 2001 From: kclauhk Date: Tue, 17 Sep 2024 03:29:56 +0800 Subject: [PATCH] [ie/extrememusic] Add extractor --- yt_dlp/extractor/_extractors.py | 5 + yt_dlp/extractor/extrememusic.py | 344 +++++++++++++++++++++++++++++++ 2 files changed, 349 insertions(+) create mode 100644 yt_dlp/extractor/extrememusic.py diff --git a/yt_dlp/extractor/_extractors.py b/yt_dlp/extractor/_extractors.py index d8abf0b5d3..a6773869bf 100644 --- a/yt_dlp/extractor/_extractors.py +++ b/yt_dlp/extractor/_extractors.py @@ -620,6 +620,11 @@ from .europeantour import EuropeanTourIE from .eurosport import EurosportIE from .euscreen import EUScreenIE from .expressen import ExpressenIE +from .extrememusic import ( + ExtremeMusicAIE, + ExtremeMusicIE, + ExtremeMusicPIE, +) from .eyedotv import EyedoTVIE from .facebook import ( FacebookAdsIE, diff --git a/yt_dlp/extractor/extrememusic.py b/yt_dlp/extractor/extrememusic.py new file mode 100644 index 0000000000..5523a520ff --- /dev/null +++ b/yt_dlp/extractor/extrememusic.py @@ -0,0 +1,344 @@ +import itertools +import re + +from .common import InfoExtractor +from ..utils import ( + determine_ext, + int_or_none, + join_nonempty, + merge_dicts, + str_or_none, + traverse_obj, + unified_strdate, + url_or_none, +) + + +class ExtremeMusicBaseIE(InfoExtractor): + _API_URL = 'https://snapi.extrememusic.com' + _REQUEST_HEADERS = None + + def _process_version_arg(self, arg): + self._version_requested = arg + + def _set_request_headers(self, video_id, country=None): + if not self._REQUEST_HEADERS: + # the site serves different versions of the same playlist id according to ISO country code, + # so use user's own country code or user-provided country code (extractor argument "country") + if not country: + country = [self._download_webpage('https://ipapi.co/country_code', video_id)] + env = self._download_json('https://www.extrememusic.com/env', video_id) + self._REQUEST_HEADERS = { + 'X-API-Auth': env['token'], + 'X-Viewer-Country': country[0], + } + return self._REQUEST_HEADERS + + def _get_album_data(self, album_id, video_id): + self._process_version_arg(self._configuration_arg('ver') or self._configuration_arg('version')) + headers = self._set_request_headers(video_id) + album = self._download_json(f'{self._API_URL}/albums/{album_id}', video_id, + note='Downloading album data', headers=headers) + if video_id == album_id: + bio = self._download_json(f'{self._API_URL}/albums/{album_id}/bio', video_id, fatal=False, + note='Downloading album data', headers=headers) + return merge_dicts(album, bio or {}) + else: + return album + + def _extract_track(self, album_data, track_id=None, version_id=None): + album_data = album_data or {} + if 'tracks' in album_data and 'track_sounds' in album_data: + if not track_id and version_id: + track_id = traverse_obj(album_data['track_sounds'], + (lambda _, v: v['id'] == int(version_id), 'track_id', {int}), get_all=False) + track = traverse_obj(album_data['tracks'], + (lambda _, v: v['id'] == int(track_id), {dict}), get_all=False) + info = {**traverse_obj(track, { + 'track': ('title', {str}), + 'track_number': ('sort_order', {lambda v: v + 1}, {int}), + 'track_id': ('track_no', {str}), + 'description': ('description', {lambda v: str_or_none(v) or None}), + 'artists': ('artists', {lambda v: v or traverse_obj(album_data, ('album', 'artist'))}, + {lambda v: (v if isinstance(v, list) else [v]) if v else None}), + 'composers': ('composers', ..., 'name'), + 'genres': (('genre', 'subgenre'), ..., 'label'), + 'tag': ('keywords', ..., 'label'), + 'album': ('album_title', {lambda v: str_or_none(v) or None}), + }), **traverse_obj(album_data, { + 'album_artists': ('album', 'artist', {lambda v: [v] if v else None}), + 'upload_date': ('album', 'created', {unified_strdate}), + })} + entries, thumbnails = [], [] + for image in traverse_obj(track, ('images', 'default')): + thumbnails.append(traverse_obj(image, { + 'url': ('url', {url_or_none}), + 'width': ('width', {int_or_none}), + 'height': ('height', {int_or_none}), + })) + for idx, sound_id in enumerate([version_id] if version_id else track['track_sound_ids']): + if sound := traverse_obj(album_data['track_sounds'], + (lambda _, v: v['id'] == int(sound_id) and v['track_id'] == int(track_id), + {dict}), get_all=False): + if (version_id + or (not version_id and ((not self._version_requested and idx == 0) + or 'all' in self._version_requested + or sound['version_type'].lower() in self._version_requested))): + formats = [] + for audio_url in traverse_obj(sound, ('assets', 'audio', ('preview_url', + 'preview_url_hls'))): + if determine_ext(audio_url) == 'm3u8': + m3u8_url = re.sub(r'\.m3u8\?.*', '/HLS/128_v4.m3u8', audio_url) + for f in self._extract_m3u8_formats(m3u8_url, sound_id, 'mpeg', fatal=False): + formats.append({ + **f, + 'vcodec': 'none', + 'perference': -2, + }) + else: + formats.append({ + 'url': audio_url, + 'vcodec': 'none', + }) + entries.append({ + 'id': str(sound_id), + 'title': join_nonempty('title', 'version_type', from_dict=sound, delim=' - '), + 'alt_title': sound['version_type'], + **info, + 'thumbnails': thumbnails, + 'duration': sound.get('duration'), + 'formats': formats, + 'webpage_url': f"https://www.extrememusic.com/albums/{track['album_id']}?item={track_id}&ver={sound_id}", + }) + + if len(entries) > 1: + return { + 'id': track_id, + **info, + 'entries': entries, + '_type': 'playlist', + } + elif len(entries) == 1: + return entries[0] + return [] + + +class ExtremeMusicIE(ExtremeMusicBaseIE): + _VALID_URL = r'https?://(?:www\.)?extrememusic\.com/albums/(?P\d+)\?(.*item=(?P\d+))?(.*ver=(?P\d+))?' + _TESTS = [{ + 'url': 'https://www.extrememusic.com/albums/15875?item=263381&ver=1265009&sharedTrack=dHJ1ZQ==', + 'info_dict': { + 'id': '1265009', + 'ext': 'mp3', + 'title': 'FOLLOW - Instrumental', + 'alt_title': 'Instrumental', + 'track': 'FOLLOW', + 'track_number': 5, + 'track_id': 'HPE316_05', + 'artists': ['PRAERS'], + 'composers': ['Joseph Andrew Banfi', 'Thomas Louis James White'], + 'genres': ['POP', 'DREAM', 'INDIE'], + 'tag': 'count:7', + 'album': 'AVALON', + 'album_artists': ['PRAERS'], + 'upload_date': '20240729', + 'thumbnail': 'https://d2oet5a29f64lj.cloudfront.net/img-data/w/2480/album/600/HPE316.jpg', + 'duration': 246, + }, + }, { + 'url': 'https://www.extrememusic.com/albums/15823?ver=1262087', + 'info_dict': { + 'id': '1262087', + 'ext': 'mp3', + 'title': 'MAGICAL HIGHWAY - VOCALS', + 'alt_title': 'VOCALS', + 'track': 'MAGICAL HIGHWAY', + 'track_number': 2, + 'track_id': 'ASM0002_02', + 'description': 'Full version - a fun, happy and upbeat pop track with a medium - fast tempo - electronic, bouncy, bright', + 'composers': ['ENB'], + 'genres': ['POP', 'ELECTRO', 'JPOP'], + 'tag': 'count:8', + 'album': 'TOKYO POPPIN\'', + 'upload_date': '20240709', + 'thumbnail': 'https://d2oet5a29f64lj.cloudfront.net/img-data/w/2480/album/600/ASM0002.jpg', + 'duration': 265, + }, + }, { + 'url': 'https://www.extrememusic.com/albums/15064?item=254704', + 'info_dict': { + 'id': '1178851', + 'ext': 'mp3', + 'title': 'SWEET TOOTH - Full Version', + 'alt_title': 'Full Version', + 'track': 'SWEET TOOTH', + 'track_number': 2, + 'track_id': 'HPE263_02', + 'artists': ['PILOT PAISLEY-ROSE'], + 'composers': ['PILOT PAISLEY ROSE SARACENO', 'SAMUEL JAMES BRANDT'], + 'genres': ['POP', 'ELECTRO', 'ROCK'], + 'tag': 'count:7', + 'album': 'ADDICTED', + 'album_artists': ['PILOT PAISLEY-ROSE'], + 'upload_date': '20230629', + 'thumbnail': 'https://d2oet5a29f64lj.cloudfront.net/img-data/w/2480/album/600/HPE263.jpg', + 'duration': 161, + }, + }, { + 'url': 'https://www.extrememusic.com/albums/1315?item=24795', + 'info_dict': { + 'id': '61003', + 'ext': 'mp3', + 'title': 'JOY TO THE WORLD (INST) - Instrumental', + 'alt_title': 'Instrumental', + 'track': 'JOY TO THE WORLD (INST)', + 'track_number': 6, + 'track_id': 'XEL016_06', + 'composers': ['TRADITIONAL'], + 'genres': ['HOLIDAY', 'CHRISTMAS'], + 'tag': 'count:5', + 'album': 'CHRISTMAS SPARKLE', + 'upload_date': '20041001', + 'thumbnail': 'https://d2oet5a29f64lj.cloudfront.net/img-data/w/2480/album/600/XEL016.jpg', + 'duration': 132, + }, + }] + + def _real_extract(self, url): + album_id, track_id, version_id = self._match_valid_url(url).group('album', 'id', 'ver') + album_data = self._get_album_data(album_id, track_id or version_id) + if result := self._extract_track(album_data, track_id, version_id): + return result + else: + self.raise_no_formats('No formats were found') + + +class ExtremeMusicAIE(ExtremeMusicBaseIE): + IE_NAME = 'ExtremeMusic:album' + _VALID_URL = r'https?://(?:www\.)?extrememusic\.com/albums/(?P\d+)(?!.*(item|ver)=)' + _TESTS = [{ + 'url': 'https://www.extrememusic.com/albums/6778', + 'info_dict': { + 'id': '6778', + 'album': 'Ethereal Voices', + }, + 'playlist_count': 11, + }, { + 'url': 'https://www.extrememusic.com/albums/15835', + 'info_dict': { + 'id': '15835', + 'album': 'BIGGEST BANG', + 'description': 'Minus Aura, a minimalist duo who create deep drama and emotion to put you under their spell.', + 'artists': ['MINUS AURA'], + 'genres': ['ELECTRONICA', 'POP', 'SYNTH'], + 'tag': ['ELECTRONIC', 'STRUGGLE'], + }, + 'playlist_count': 4, + }] + + def _real_extract(self, url): + album_id = self._match_id(url) + album_data = self._get_album_data(album_id, album_id) + + entries = [] + for track_id in traverse_obj(album_data, ('tracks', ..., 'id')): + if track := self._extract_track(album_data, track_id=track_id): + if track.get('entries'): + entries.extend(track['entries']) + else: + entries.append(track) + + subgenres = traverse_obj(album_data, ('album', 'subgenres', {str_or_none})) + if entries: + return merge_dicts(traverse_obj(album_data.get('album'), { + 'id': ('id', {lambda v: str(v)}), + 'album': ('title', {str_or_none}), + 'description': ('description', {lambda v: str_or_none(v) or None}), + 'artists': ('artist', {lambda v: [v] if v else None}), + 'genres': ('genres', {str_or_none}, {lambda v: join_nonempty(v, subgenres, delim=', ')}, + {lambda v: v.split(', ') if v else None}), + 'tag': ('keywords', {lambda v: v.split(', ') if v else None}), + }), { + 'description': traverse_obj(album_data, ('bio', 'description', {lambda v: str_or_none(v) or None})), + 'entries': entries, + '_type': 'playlist', + }) + else: + self.raise_no_formats('No formats were found') + + +class ExtremeMusicPIE(ExtremeMusicBaseIE): + IE_NAME = 'ExtremeMusic:playlist' + _VALID_URL = r'https?://(?:www\.)?extrememusic\.com/playlists/(?P[^?]+)' + _TESTS = [{ + 'url': 'https://www.extrememusic.com/playlists/Kf3fAppAKK2UpAUUp7KK1pBDBMrC62c_Kf8UKAAppUUKppK2UAp92K7Appp8xMx', + 'info_dict': { + 'id': 'Kf3fAppAKK2UpAUUp7KK1pBDBMrC62c_Kf8UKAAppUUKppK2UAp92K7Appp8xMx', + 'title': 'NICE', + 'thumbnail': 'https://d2oet5a29f64lj.cloudfront.net/img-data/w/2480/featureditem/square/thumbnail_PLAYLIST_Nice-square-(formerly ChristmasTraditional).jpg', + }, + 'playlist_mincount': 35, + }, { + 'url': 'https://www.extrememusic.com/playlists/fUKKU5KAfK61pAAKp4U4KpKUxsRk2ki_fU117KpUUAAUKAUfpA6UAfAKK8Ul5ji', + 'info_dict': { + 'id': 'fUKKU5KAfK61pAAKp4U4KpKUxsRk2ki_fU117KpUUAAUKAUfpA6UAfAKK8Ul5ji', + 'title': 'NEO CLASSICAL', + 'thumbnail': 'https://d2oet5a29f64lj.cloudfront.net/img-data/w/2480/featureditem/square/NeoClassical.jpg', + }, + 'playlist_mincount': 50, + }] + + def _real_extract(self, url): + playlist_id = self._match_id(url) + headers = self._set_request_headers(playlist_id, self._configuration_arg('country')) + + def playlist_query(playlist_id, offset, limit): + # playlist api: https://snapi.extrememusic.com/playlists?id={playlist_id}&range={offset}%2C{limit}' + return self._download_json( + 'https://snapi.extrememusic.com/playlists', playlist_id, + note=f'Downloading item {offset + 1}-{offset + limit}', query={ + 'id': playlist_id, + 'range': f'{offset},{limit}', + }, headers=headers) + + thumbnails, entries = [], [] + album_data, track_done, limit = {}, [], 50 + for i in itertools.count(): + playlist = playlist_query(playlist_id, i * limit, limit) + if len(playlist['playlist_items']) == 0: + break + else: + track_ids = traverse_obj(playlist, ('playlist_items', ..., 'track_id')) + for track_id in list(dict.fromkeys(track_ids)): + if track_id not in track_done: + album_id = traverse_obj(playlist, + ('tracks', lambda _, v: v['id'] == track_id, 'album_id', {int}), get_all=False) + if album_id not in album_data: + album_data[album_id] = self._get_album_data(album_id, track_id) + playlist['album'] = traverse_obj(album_data, (album_id, 'album', {dict})) + if track := self._extract_track(playlist, track_id=track_id): + if track.get('entries'): + entries.extend(track['entries']) + else: + entries.append(track) + track_done.append(track_id) + + if entries: + for image in traverse_obj(playlist['playlist'], ('images', 'square')): + thumbnails.append(traverse_obj(image, { + 'url': ('url', {url_or_none}), + 'width': ('width', {int_or_none}), + 'height': ('height', {int_or_none}), + })) + + return {k: v for k, v in { + 'id': playlist['playlist']['id'], + 'title': playlist['playlist']['title'], + 'thumbnail': traverse_obj(thumbnails, (0, 'url', {url_or_none})), + 'thumbnails': thumbnails, + 'uploader': playlist['playlist']['owner_name'], + 'entries': entries, + '_type': 'playlist', + }.items() if v} + else: + self.raise_no_formats('No formats were found')