v1.0 first upload

This commit is contained in:
zotify 2023-03-24 23:57:10 +13:00
commit 2a96a12a2a
16 changed files with 1954 additions and 0 deletions

184
zotify/__init__.py Normal file
View file

@ -0,0 +1,184 @@
from pathlib import Path
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import (
ApiClient,
PlayableContentFeeder,
Session as LibrespotSession,
)
from librespot.metadata import EpisodeId, PlayableId, TrackId
from pwinput import pwinput
from requests import HTTPError, get
from zotify.playable import Episode, Track
from zotify.utils import (
API_URL,
Quality,
)
class Api(ApiClient):
def __init__(self, session: LibrespotSession, language: str = "en"):
super(Api, self).__init__(session)
self.__session = session
self.__language = language
def __get_token(self) -> str:
"""Returns user's API token"""
return (
self.__session.tokens()
.get_token(
"playlist-read-private", # Private playlists
"user-follow-read", # Followed artists
"user-library-read", # Liked tracks/episodes/etc.
"user-read-private", # Country
)
.access_token
)
def invoke_url(
self,
url: str,
params: dict = {},
limit: int | None = None,
offset: int | None = None,
) -> dict:
"""
Requests data from api
Args:
url: API url and to get data from
params: parameters to be sent in the request
limit: The maximum number of items in the response
offset: The offset of the items returned
Returns:
Dictionary representation of json response
"""
headers = {
"Authorization": f"Bearer {self.__get_token()}",
"Accept": "application/json",
"Accept-Language": self.__language,
"app-platform": "WebPlayer",
}
if limit:
params["limit"] = limit
if offset:
params["offset"] = offset
response = get(url, headers=headers, params=params)
data = response.json()
try:
raise HTTPError(
f"{url}\nAPI Error {data['error']['status']}: {data['error']['message']}"
)
except KeyError:
return data
class Session:
__api: Api
__country: str
__is_premium: bool
__session: LibrespotSession
def __init__(
self,
cred_file: Path | None = None,
username: str | None = None,
password: str | None = None,
save: bool | None = False,
language: str = "en",
) -> None:
"""
Authenticates user, saves credentials to a file
and generates api token
Args:
cred_file: Path to the credentials file
username: Account username
password: Account password
save: Save given credentials to a file
"""
# Find an existing credentials file
if cred_file is not None and cred_file.is_file():
conf = (
LibrespotSession.Configuration.Builder()
.set_store_credentials(False)
.build()
)
self.__session = (
LibrespotSession.Builder(conf).stored_file(str(cred_file)).create()
)
# Otherwise get new credentials
else:
username = input("Username: ") if username is None else username
password = (
pwinput(prompt="Password: ", mask="*") if password is None else password
)
# Save credentials to file
if save and cred_file:
cred_file.parent.mkdir(parents=True, exist_ok=True)
conf = (
LibrespotSession.Configuration.Builder()
.set_stored_credential_file(str(cred_file))
.build()
)
else:
conf = (
LibrespotSession.Configuration.Builder()
.set_store_credentials(False)
.build()
)
self.__session = (
LibrespotSession.Builder(conf).user_pass(username, password).create()
)
self.__api = Api(self.__session, language)
def __get_playable(
self, playable_id: PlayableId, quality: Quality
) -> PlayableContentFeeder.LoadedStream:
if quality.value is None:
quality = Quality.VERY_HIGH if self.is_premium() else Quality.HIGH
return self.__session.content_feeder().load(
playable_id,
VorbisOnlyAudioQuality(quality.value),
False,
None,
)
def get_track(self, track_id: TrackId, quality: Quality = Quality.AUTO) -> Track:
"""
Gets track/episode data and audio stream
Args:
track_id: Base62 ID of track
quality: Audio quality of track when downloaded
Returns:
Track object
"""
return Track(self.__get_playable(track_id, quality), self.api())
def get_episode(self, episode_id: EpisodeId) -> Episode:
"""
Gets track/episode data and audio stream
Args:
episode: Base62 ID of episode
Returns:
Episode object
"""
return Episode(self.__get_playable(episode_id, Quality.NORMAL), self.api())
def api(self) -> ApiClient:
"""Returns API Client"""
return self.__api
def country(self) -> str:
"""Returns two letter country code of user's account"""
try:
return self.__country
except AttributeError:
self.__country = self.api().invoke_url(API_URL + "me")["country"]
return self.__country
def is_premium(self) -> bool:
"""Returns users premium account status"""
return self.__session.get_user_attribute("type") == "premium"

135
zotify/__main__.py Normal file
View file

@ -0,0 +1,135 @@
#! /usr/bin/env python3
from argparse import ArgumentParser
from pathlib import Path
from zotify.app import client
from zotify.config import CONFIG_PATHS, CONFIG_VALUES
from zotify.utils import OptionalOrFalse
VERSION = "0.9.0"
def main():
parser = ArgumentParser(
prog="zotify",
description="A fast and customizable music and podcast downloader",
)
parser.add_argument(
"-v",
"--version",
action="store_true",
help="Print version and exit",
)
parser.add_argument(
"--config",
type=Path,
default=CONFIG_PATHS["conf"],
help="Specify the config.json location",
)
parser.add_argument(
"-l",
"--library",
type=Path,
help="Specify a path to the root of a music/podcast library",
)
parser.add_argument(
"-o", "--output", type=str, help="Specify the output location/format"
)
parser.add_argument(
"-c",
"--category",
type=str,
choices=["album", "artist", "playlist", "track", "show", "episode"],
default=["album", "artist", "playlist", "track", "show", "episode"],
nargs="+",
help="Searches for only this type",
)
parser.add_argument("--username", type=str, help="Account username")
parser.add_argument("--password", type=str, help="Account password")
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"urls",
type=str,
default="",
nargs="*",
help="Downloads the track, album, playlist, podcast, episode or artist from a URL or URI. Accepts multiple options.",
)
group.add_argument(
"-d",
"--download",
type=str,
help="Downloads tracks, playlists and albums from the URLs written in the file passed.",
)
group.add_argument(
"-f",
"--followed",
action="store_true",
help="Download all songs from your followed artists.",
)
group.add_argument(
"-lt",
"--liked-tracks",
action="store_true",
help="Download all of your liked songs.",
)
group.add_argument(
"-le",
"--liked-episodes",
action="store_true",
help="Download all of your liked episodes.",
)
group.add_argument(
"-p",
"--playlist",
action="store_true",
help="Download a saved playlists from your account.",
)
group.add_argument(
"-s",
"--search",
type=str,
nargs="+",
help="Search for a specific track, album, playlist, artist or podcast",
)
for k, v in CONFIG_VALUES.items():
if v["type"] == bool:
parser.add_argument(
v["arg"],
action=OptionalOrFalse,
default=v["default"],
help=v["help"],
)
else:
try:
parser.add_argument(
v["arg"],
type=v["type"],
choices=v["choices"],
default=None,
help=v["help"],
)
except KeyError:
parser.add_argument(
v["arg"],
type=v["type"],
default=None,
help=v["help"],
)
parser.set_defaults(func=client)
args = parser.parse_args()
if args.version:
print(VERSION)
return
args.func(args)
return
try:
args.func(args)
except Exception as e:
print(f"Fatal Error: {e}")
if __name__ == "__main__":
main()

336
zotify/app.py Normal file
View file

@ -0,0 +1,336 @@
from argparse import Namespace
from enum import Enum
from pathlib import Path
from typing import Any, NamedTuple
from librespot.metadata import (
AlbumId,
ArtistId,
EpisodeId,
PlayableId,
PlaylistId,
ShowId,
TrackId,
)
from librespot.util import bytes_to_hex
from zotify import Session
from zotify.config import Config
from zotify.file import TranscodingError
from zotify.loader import Loader
from zotify.printer import Printer, PrintChannel
from zotify.utils import API_URL, AudioFormat, b62_to_hex
def client(args: Namespace) -> None:
config = Config(args)
Printer(config)
with Loader("Logging in..."):
if config.credentials is False:
session = Session()
else:
session = Session(
cred_file=config.credentials, save=True, language=config.language
)
selection = Selection(session)
try:
if args.search:
ids = selection.search(args.search, args.category)
elif args.playlist:
ids = selection.get("playlists", "items")
elif args.followed:
ids = selection.get("following?type=artist", "artists")
elif args.liked_tracks:
ids = selection.get("tracks", "items")
elif args.liked_episodes:
ids = selection.get("episodes", "items")
elif args.download:
ids = []
for x in args.download:
ids.extend(selection.from_file(x))
elif args.urls:
ids = args.urls
except (FileNotFoundError, ValueError):
Printer.print(PrintChannel.WARNINGS, "there is nothing to do")
return
app = App(config, session)
with Loader("Parsing input..."):
try:
app.parse(ids)
except (IndexError, TypeError) as e:
Printer.print(PrintChannel.ERRORS, str(e))
app.download()
class Selection:
def __init__(self, session: Session):
self.__session = session
def search(
self,
search_text: str,
category: list = [
"track",
"album",
"artist",
"playlist",
"show",
"episode",
],
) -> list[str]:
categories = ",".join(category)
resp = self.__session.api().invoke_url(
API_URL + "search",
{
"q": search_text,
"type": categories,
"include_external": "audio",
"market": self.__session.country(),
},
limit=10,
offset=0,
)
count = 0
links = []
for c in categories.split(","):
label = c + "s"
if len(resp[label]["items"]) > 0:
print(f"\n### {label.capitalize()} ###")
for item in resp[label]["items"]:
links.append(item)
self.__print(count + 1, item)
count += 1
return self.__get_selection(links)
def get(self, item: str, suffix: str) -> list[str]:
resp = self.__session.api().invoke_url(f"{API_URL}me/{item}", limit=50)[suffix]
for i in range(len(resp)):
self.__print(i + 1, resp[i])
return self.__get_selection(resp)
@staticmethod
def from_file(file_path: Path) -> list[str]:
with open(file_path, "r", encoding="utf-8") as f:
return [line.strip() for line in f.readlines()]
@staticmethod
def __get_selection(items: list[dict[str, Any]]) -> list[str]:
print("\nResults to save (eg: 1,2,3 1-3)")
selection = ""
while len(selection) == 0:
selection = input("==> ")
ids = []
selections = selection.split(",")
for i in selections:
if "-" in i:
split = i.split("-")
for x in range(int(split[0]), int(split[1]) + 1):
ids.append(items[x - 1]["uri"])
else:
ids.append(items[int(i) - 1]["uri"])
return ids
@staticmethod
def __print(i: int, item: dict[str, Any]) -> None:
print("{:<2} {:<77}".format(i, item["name"]))
class PlayableType(Enum):
TRACK = "track"
EPISODE = "episode"
class PlayableData(NamedTuple):
type: PlayableType
id: PlayableId
library: Path
output: str
class App:
__playable_list: list[PlayableData]
def __init__(
self,
config: Config,
session: Session,
):
self.__config = config
self.__session = session
self.__playable_list = []
def __parse_album(self, hex_id: str) -> None:
album = self.__session.api().get_metadata_4_album(AlbumId.from_hex(hex_id))
for disc in album.disc:
for track in disc.track:
self.__playable_list.append(
PlayableData(
PlayableType.TRACK,
bytes_to_hex(track.gid),
self.__config.music_library,
self.__config.output_album,
)
)
def __parse_artist(self, hex_id: str) -> None:
artist = self.__session.api().get_metadata_4_artist(ArtistId.from_hex(hex_id))
for album in artist.album_group + artist.single_group:
album = self.__session.api().get_metadata_4_album(
AlbumId.from_hex(album.gid)
)
for disc in album.disc:
for track in disc.track:
self.__playable_list.append(
PlayableData(
PlayableType.TRACK,
bytes_to_hex(track.gid),
self.__config.music_library,
self.__config.output_album,
)
)
def __parse_playlist(self, b62_id: str) -> None:
playlist = self.__session.api().get_playlist(PlaylistId(b62_id))
for item in playlist.contents.items:
split = item.uri.split(":")
playable_type = PlayableType(split[1])
id_map = {PlayableType.TRACK: TrackId, PlayableType.EPISODE: EpisodeId}
playable_id = id_map[playable_type].from_base62(split[2])
self.__playable_list.append(
PlayableData(
playable_type,
playable_id,
self.__config.playlist_library,
self.__config.get(f"output_playlist_{playable_type.value}"),
)
)
def __parse_show(self, hex_id: str) -> None:
show = self.__session.api().get_metadata_4_show(ShowId.from_hex(hex_id))
for episode in show.episode:
self.__playable_list.append(
PlayableData(
PlayableType.EPISODE,
bytes_to_hex(episode.gid),
self.__config.podcast_library,
self.__config.output_podcast,
)
)
def __parse_track(self, hex_id: str) -> None:
self.__playable_list.append(
PlayableData(
PlayableType.TRACK,
TrackId.from_hex(hex_id),
self.__config.music_library,
self.__config.output_album,
)
)
def __parse_episode(self, hex_id: str) -> None:
self.__playable_list.append(
PlayableData(
PlayableType.EPISODE,
EpisodeId.from_hex(hex_id),
self.__config.podcast_library,
self.__config.output_podcast,
)
)
def parse(self, links: list[str]) -> None:
"""
Parses list of selected tracks/playlists/shows/etc...
Args:
links: List of links
"""
for link in links:
link = link.rsplit("?", 1)[0]
try:
split = link.split(link[-23])
_id = split[-1]
id_type = split[-2]
except IndexError:
raise IndexError(f'Parsing Error: Could not parse "{link}"')
if id_type == "album":
self.__parse_album(b62_to_hex(_id))
elif id_type == "artist":
self.__parse_artist(b62_to_hex(_id))
elif id_type == "playlist":
self.__parse_playlist(_id)
elif id_type == "show":
self.__parse_show(b62_to_hex(_id))
elif id_type == "track":
self.__parse_track(b62_to_hex(_id))
elif id_type == "episode":
self.__parse_episode(b62_to_hex(_id))
else:
raise TypeError(f'Parsing Error: Unknown type "{id_type}"')
def get_playable_list(self) -> list[PlayableData]:
"""Returns list of Playable items"""
return self.__playable_list
def download(self) -> None:
"""Downloads playable to local file"""
for playable in self.__playable_list:
if playable.type == PlayableType.TRACK:
with Loader("Fetching track..."):
track = self.__session.get_track(
playable.id, self.__config.download_quality
)
elif playable.type == PlayableType.EPISODE:
with Loader("Fetching episode..."):
track = self.__session.get_episode(playable.id)
else:
Printer.print(
PrintChannel.SKIPS,
f'Download Error: Unknown playable content "{playable.type}"',
)
continue
try:
output = track.create_output(playable.library, playable.output)
except FileExistsError as e:
Printer.print(PrintChannel.SKIPS, str(e))
continue
file = track.write_audio_stream(
output,
self.__config.chunk_size,
)
if self.__config.save_lyrics:
with Loader("Fetching lyrics..."):
try:
track.get_lyrics().save(output)
except FileNotFoundError as e:
Printer.print(PrintChannel.SKIPS, str(e))
Printer.print(PrintChannel.DOWNLOADS, f"\nDownloaded {track.name}")
if self.__config.audio_format != AudioFormat.VORBIS:
try:
with Loader(PrintChannel.PROGRESS, "Converting audio..."):
file.transcode(
self.__config.audio_format,
self.__config.transcode_bitrate
if self.__config.transcode_bitrate > 0
else None,
True,
self.__config.ffmpeg_path
if self.__config.ffmpeg_path != ""
else "ffmpeg",
self.__config.ffmpeg_args.split(),
)
except TranscodingError as e:
Printer.print(PrintChannel.ERRORS, str(e))
if self.__config.save_metadata:
with Loader("Writing metadata..."):
file.write_metadata(track.metadata)
file.write_cover_art(
track.get_cover_art(self.__config.artwork_size)
)

354
zotify/config.py Normal file
View file

@ -0,0 +1,354 @@
from argparse import Namespace
from json import dump, load
from pathlib import Path
from sys import platform as PLATFORM
from typing import Any
from zotify.utils import AudioFormat, ImageSize, Quality
ALL_ARTISTS = "all_artists"
ARTWORK_SIZE = "artwork_size"
AUDIO_FORMAT = "audio_format"
CHUNK_SIZE = "chunk_size"
CREATE_PLAYLIST_FILE = "create_playlist_file"
CREDENTIALS = "credentials"
DOWNLOAD_QUALITY = "download_quality"
FFMPEG_ARGS = "ffmpeg_args"
FFMPEG_PATH = "ffmpeg_path"
LANGUAGE = "language"
LYRICS_ONLY = "lyrics_only"
MUSIC_LIBRARY = "music_library"
OUTPUT = "output"
OUTPUT_ALBUM = "output_album"
OUTPUT_PLAYLIST_TRACK = "output_playlist_track"
OUTPUT_PLAYLIST_EPISODE = "output_playlist_episode"
OUTPUT_PODCAST = "output_podcast"
OUTPUT_SINGLE = "output_single"
PATH_ARCHIVE = "path_archive"
PLAYLIST_LIBRARY = "playlist_library"
PODCAST_LIBRARY = "podcast_library"
PRINT_DOWNLOADS = "print_downloads"
PRINT_ERRORS = "print_errors"
PRINT_PROGRESS = "print_progress"
PRINT_SKIPS = "print_skips"
PRINT_WARNINGS = "print_warnings"
REPLACE_EXISTING = "replace_existing"
SAVE_LYRICS = "save_lyrics"
SAVE_METADATA = "save_metadata"
SAVE_SUBTITLES = "save_subtitles"
SKIP_DUPLICATES = "skip_duplicates"
SKIP_PREVIOUS = "skip_previous"
TRANSCODE_BITRATE = "transcode_bitrate"
SYSTEM_PATHS = {
"win32": Path.home().joinpath("AppData/Roaming/Zotify"),
"linux": Path.home().joinpath(".config/zotify"),
"darwin": Path.home().joinpath("Library/Application Support/Zotify"),
}
LIBRARY_PATHS = {
"music": Path.home().joinpath("Music/Zotify Music"),
"podcast": Path.home().joinpath("Music/Zotify Podcasts"),
"playlist": Path.home().joinpath("Music/Zotify Playlists"),
}
CONFIG_PATHS = {
"conf": SYSTEM_PATHS[PLATFORM].joinpath("config.json"),
"creds": SYSTEM_PATHS[PLATFORM].joinpath("credentials.json"),
"archive": SYSTEM_PATHS[PLATFORM].joinpath("track_archive"),
}
OUTPUT_PATHS = {
"album": "{album_artist}/{album}/{track_number}. {artist} - {title}",
"podcast": "{podcast}/{episode_number} - {title}",
"playlist_track": "{playlist}/{playlist_number}. {artist} - {title}",
"playlist_episode": "{playlist}/{playlist_number}. {episode_number} - {title}",
}
CONFIG_VALUES = {
CREDENTIALS: {
"default": CONFIG_PATHS["creds"],
"type": Path,
"arg": "--credentials",
"help": "Path to credentials file",
},
PATH_ARCHIVE: {
"default": CONFIG_PATHS["archive"],
"type": Path,
"arg": "--archive",
"help": "Path to track archive file",
},
MUSIC_LIBRARY: {
"default": LIBRARY_PATHS["music"],
"type": Path,
"arg": "--music-library",
"help": "Path to root of music library",
},
PODCAST_LIBRARY: {
"default": LIBRARY_PATHS["podcast"],
"type": Path,
"arg": "--podcast-library",
"help": "Path to root of podcast library",
},
PLAYLIST_LIBRARY: {
"default": LIBRARY_PATHS["playlist"],
"type": Path,
"arg": "--playlist-library",
"help": "Path to root of playlist library",
},
OUTPUT_ALBUM: {
"default": OUTPUT_PATHS["album"],
"type": str,
"arg": "--output-album",
"help": "File layout for saved albums",
},
OUTPUT_PLAYLIST_TRACK: {
"default": OUTPUT_PATHS["playlist_track"],
"type": str,
"arg": "--output-playlist-track",
"help": "File layout for tracks in a playlist",
},
OUTPUT_PLAYLIST_EPISODE: {
"default": OUTPUT_PATHS["playlist_episode"],
"type": str,
"arg": "--output-playlist-episode",
"help": "File layout for episodes in a playlist",
},
OUTPUT_PODCAST: {
"default": OUTPUT_PATHS["podcast"],
"type": str,
"arg": "--output-podcast",
"help": "File layout for saved podcasts",
},
DOWNLOAD_QUALITY: {
"default": "auto",
"type": Quality.from_string,
"choices": list(Quality),
"arg": "--download-quality",
"help": "Audio download quality (auto for highest available)",
},
ARTWORK_SIZE: {
"default": "large",
"type": ImageSize.from_string,
"choices": list(ImageSize),
"arg": "--artwork-size",
"help": "Image size of track's cover art",
},
AUDIO_FORMAT: {
"default": "vorbis",
"type": AudioFormat,
"choices": [n.value for n in AudioFormat],
"arg": "--audio-format",
"help": "Audio format of final track output",
},
TRANSCODE_BITRATE: {
"default": -1,
"type": int,
"arg": "--bitrate",
"help": "Transcoding bitrate (-1 to use download rate)",
},
FFMPEG_PATH: {
"default": "",
"type": str,
"arg": "--ffmpeg-path",
"help": "Path to ffmpeg binary",
},
FFMPEG_ARGS: {
"default": "",
"type": str,
"arg": "--ffmpeg-args",
"help": "Additional ffmpeg arguments when transcoding",
},
SAVE_SUBTITLES: {
"default": False,
"type": bool,
"arg": "--save-subtitles",
"help": "Save subtitles from podcasts to a .srt file",
},
LANGUAGE: {
"default": "en",
"type": str,
"arg": "--language",
"help": "Language for metadata"
},
SAVE_LYRICS: {
"default": True,
"type": bool,
"arg": "--save-lyrics",
"help": "Save lyrics to a file",
},
LYRICS_ONLY: {
"default": False,
"type": bool,
"arg": "--lyrics-only",
"help": "Only download lyrics and not actual audio",
},
CREATE_PLAYLIST_FILE: {
"default": True,
"type": bool,
"arg": "--playlist-file",
"help": "Save playlist information to an m3u8 file",
},
SAVE_METADATA: {
"default": True,
"type": bool,
"arg": "--save-metadata",
"help": "Save metadata, required for other metadata options",
},
ALL_ARTISTS: {
"default": True,
"type": bool,
"arg": "--all-artists",
"help": "Add all track artists to artist tag in metadata",
},
REPLACE_EXISTING: {
"default": False,
"type": bool,
"arg": "--replace-existing",
"help": "Overwrite existing files with the same name",
},
SKIP_PREVIOUS: {
"default": True,
"type": bool,
"arg": "--skip-previous",
"help": "Skip previously downloaded songs",
},
SKIP_DUPLICATES: {
"default": True,
"type": bool,
"arg": "--skip-duplicates",
"help": "Skip downloading existing track to different album",
},
CHUNK_SIZE: {
"default": 131072,
"type": int,
"arg": "--chunk-size",
"help": "Number of bytes read at a time during download",
},
PRINT_DOWNLOADS: {
"default": False,
"type": bool,
"arg": "--print-downloads",
"help": "Print messages when a song is finished downloading",
},
PRINT_PROGRESS: {
"default": True,
"type": bool,
"arg": "--print-progress",
"help": "Show progress bars",
},
PRINT_SKIPS: {
"default": True,
"type": bool,
"arg": "--print-skips",
"help": "Show messages if a song is being skipped",
},
PRINT_WARNINGS: {
"default": True,
"type": bool,
"arg": "--print-warnings",
"help": "Show warnings",
},
PRINT_ERRORS: {
"default": True,
"type": bool,
"arg": "--print-errors",
"help": "Show errors",
},
}
class Config:
__config_file: Path | None
artwork_size: ImageSize
audio_format: AudioFormat
chunk_size: int
credentials: Path
download_quality: Quality
ffmpeg_args: str
ffmpeg_path: str
music_library: Path
language: str
output_album: str
output_liked: str
output_podcast: str
output_playlist_track: str
output_playlist_episode: str
playlist_library: Path
podcast_library: Path
print_progress: bool
save_lyrics: bool
save_metadata: bool
transcode_bitrate: int
def __init__(self, args: Namespace = Namespace()):
jsonvalues = {}
if args.config:
self.__config_file = Path(args.config)
# Valid config file found
if self.__config_file.exists():
with open(self.__config_file, "r", encoding="utf-8") as conf:
jsonvalues = load(conf)
# Remove config file and make a new one
else:
self.__config_file.parent.mkdir(parents=True, exist_ok=True)
jsonvalues = {}
for key in CONFIG_VALUES:
if CONFIG_VALUES[key]["type"] in [str, int, bool]:
jsonvalues[key] = CONFIG_VALUES[key]["default"]
else:
jsonvalues[key] = str(CONFIG_VALUES[key]["default"])
with open(self.__config_file, "w+", encoding="utf-8") as conf:
dump(jsonvalues, conf, indent=4)
for key in CONFIG_VALUES:
# Override config with commandline arguments
if key in vars(args) and vars(args)[key] is not None:
setattr(self, key, self.__parse_arg_value(key, vars(args)[key]))
# If no command option specified use config
elif key in jsonvalues:
setattr(self, key, self.__parse_arg_value(key, jsonvalues[key]))
# Use default values for missing keys
else:
setattr(
self,
key,
self.__parse_arg_value(key, CONFIG_VALUES[key]["default"]),
)
else:
self.__config_file = None
# Make "output" arg override all output_* options
if args.output:
self.output_album = args.output
self.output_liked = args.output
self.output_podcast = args.output
self.output_playlist_track = args.output
self.output_playlist_episode = args.output
@staticmethod
def __parse_arg_value(key: str, value: Any) -> Any:
config_type = CONFIG_VALUES[key]["type"]
if type(value) == config_type:
return value
elif config_type == Path:
return Path(value).expanduser()
elif config_type == AudioFormat:
return AudioFormat(value)
elif config_type == ImageSize.from_string:
return ImageSize.from_string(value)
elif config_type == Quality.from_string:
return Quality.from_string(value)
else:
raise TypeError("Invalid Type: " + value)
def get(self, key: str) -> Any:
"""
Gets a value from config
Args:
key: config attribute to return value of
Returns:
Value of key
"""
return getattr(self, key)

126
zotify/file.py Normal file
View file

@ -0,0 +1,126 @@
from errno import ENOENT
from pathlib import Path
from subprocess import Popen, PIPE
from typing import Any
from music_tag import load_file
from mutagen.oggvorbis import OggVorbisHeaderError
from zotify.utils import AudioFormat, ExtMap
# fmt: off
class TranscodingError(RuntimeError): ...
class TargetExistsError(FileExistsError, TranscodingError): ...
class FFmpegNotFoundError(FileNotFoundError, TranscodingError): ...
class FFmpegExecutionError(OSError, TranscodingError): ...
# fmt: on
class LocalFile:
audio_format: AudioFormat
def __init__(
self,
path: Path,
audio_format: AudioFormat | None = None,
bitrate: int | None = None,
):
self.path = path
self.bitrate = bitrate
if audio_format:
self.audio_format = audio_format
def transcode(
self,
audio_format: AudioFormat | None = None,
bitrate: int | None = None,
replace: bool = False,
ffmpeg: str = "ffmpeg",
opt_args: list[str] = [],
) -> None:
"""
Use ffmpeg to transcode a saved audio file
Args:
audio_format: Audio format to transcode file to
bitrate: Bitrate to transcode file to in kbps
replace: Replace existing file
ffmpeg: Location of FFmpeg binary
opt_args: Additional arguments to pass to ffmpeg
"""
if audio_format:
new_ext = ExtMap[audio_format.value]
else:
new_ext = ExtMap[self.audio_format.value]
cmd = [
ffmpeg,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
str(self.path),
]
newpath = self.path.parent.joinpath(
self.path.name.rsplit(".", 1)[0] + new_ext.value
)
if self.path == newpath:
raise TargetExistsError(
f"Transcoding Error: Cannot overwrite source, target file is already a {self.audio_format} file."
)
cmd.extend(["-b:a", str(bitrate) + "k"]) if bitrate else None
cmd.extend(["-c:a", audio_format.value]) if audio_format else None
cmd.extend(opt_args)
cmd.append(str(newpath))
try:
process = Popen(cmd, stdin=PIPE)
process.wait()
except OSError as e:
if e.errno == ENOENT:
raise FFmpegNotFoundError("Transcoding Error: FFmpeg was not found")
else:
raise
if process.returncode != 0:
raise FFmpegExecutionError(
f'Transcoding Error: `{" ".join(cmd)}` failed with error code {process.returncode}'
)
if replace:
Path(self.path).unlink()
self.path = newpath
self.bitrate = bitrate
if audio_format:
self.audio_format = audio_format
def write_metadata(self, metadata: dict[str, Any]) -> None:
"""
Write metadata to file
Args:
metadata: key-value metadata dictionary
"""
f = load_file(self.path)
f.save()
for k, v in metadata.items():
try:
f[k] = str(v)
except KeyError:
pass
try:
f.save()
except OggVorbisHeaderError:
pass # Thrown when using untranscoded file, nothing breaks.
def write_cover_art(self, image: bytes) -> None:
"""
Write cover artwork to file
Args:
image: raw image data
"""
f = load_file(self.path)
f["artwork"] = image
try:
f.save()
except OggVorbisHeaderError:
pass

69
zotify/loader.py Normal file
View file

@ -0,0 +1,69 @@
# load symbol from:
# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running
from __future__ import annotations
from itertools import cycle
from shutil import get_terminal_size
from sys import platform
from threading import Thread
from time import sleep
from zotify.printer import Printer
class Loader:
"""
Busy symbol.
Can be called inside a context:
with Loader("This take some Time..."):
# do something
pass
"""
def __init__(self, desc="Loading...", end="", timeout=0.1, mode="std3") -> None:
"""
A loader-like context manager
Args:
desc (str, optional): The loader's description. Defaults to "Loading...".
end (str, optional): Final print. Defaults to "".
timeout (float, optional): Sleep time between prints. Defaults to 0.1.
"""
self.desc = desc
self.end = end
self.timeout = timeout
self.__thread = Thread(target=self.__animate, daemon=True)
if platform == "win32":
self.steps = ["/", "-", "\\", "|"]
else:
self.steps = ["", "", "", "", "", "", "", ""]
self.done = False
def start(self) -> Loader:
self.__thread.start()
return self
def __animate(self) -> None:
for c in cycle(self.steps):
if self.done:
break
Printer.print_loader(f"\r {c} {self.desc} ")
sleep(self.timeout)
def __enter__(self) -> None:
self.start()
def stop(self) -> None:
self.done = True
cols = get_terminal_size((80, 20)).columns
Printer.print_loader("\r" + " " * cols)
if self.end != "":
Printer.print_loader(f"\r{self.end}")
def __exit__(self, exc_type, exc_value, tb) -> None:
# handle exceptions with those variables ^
self.stop()

235
zotify/playable.py Normal file
View file

@ -0,0 +1,235 @@
from math import floor
from pathlib import Path
from typing import Any
from librespot.core import PlayableContentFeeder
from librespot.metadata import AlbumId
from librespot.util import bytes_to_hex
from librespot.structure import GeneralAudioStream
from requests import get
from zotify.file import LocalFile
from zotify.printer import Printer
from zotify.utils import (
IMG_URL,
LYRICS_URL,
AudioFormat,
ImageSize,
bytes_to_base62,
fix_filename,
)
class Lyrics:
def __init__(self, lyrics: dict, **kwargs):
self.lines = []
self.sync_type = lyrics["syncType"]
for line in lyrics["lines"]:
self.lines.append(line["words"] + "\n")
if self.sync_type == "line_synced":
self.lines_synced = []
for line in lyrics["lines"]:
timestamp = int(line["start_time_ms"])
ts_minutes = str(floor(timestamp / 60000)).zfill(2)
ts_seconds = str(floor((timestamp % 60000) / 1000)).zfill(2)
ts_millis = str(floor(timestamp % 1000))[:2].zfill(2)
self.lines_synced.append(
f"[{ts_minutes}:{ts_seconds}.{ts_millis}]{line.words}\n"
)
def save(self, path: Path, prefer_synced: bool = True) -> None:
"""
Saves lyrics to file
Args:
location: path to target lyrics file
prefer_synced: Use line synced lyrics if available
"""
if self.sync_type == "line_synced" and prefer_synced:
with open(f"{path}.lrc", "w+", encoding="utf-8") as f:
f.writelines(self.lines_synced)
else:
with open(f"{path}.txt", "w+", encoding="utf-8") as f:
f.writelines(self.lines[:-1])
class Playable:
cover_images: list[Any]
metadata: dict[str, Any]
name: str
input_stream: GeneralAudioStream
def create_output(self, library: Path, output: str, replace: bool = False) -> Path:
"""
Creates save directory for the output file
Args:
library: Path to root content library
output: Template for the output filepath
replace: Replace existing files with same output
Returns:
File path for the track
"""
for k, v in self.metadata.items():
output = output.replace(
"{" + k + "}", fix_filename(str(v).replace("\0", ","))
)
file_path = library.joinpath(output).expanduser()
if file_path.exists() and not replace:
raise FileExistsError("Output Creation Error: File already downloaded")
else:
file_path.parent.mkdir(parents=True, exist_ok=True)
return file_path
def write_audio_stream(
self,
output: Path,
chunk_size: int = 128 * 1024,
) -> LocalFile:
"""
Writes audio stream to file
Args:
output: File path of saved audio stream
chunk_size: maximum number of bytes to read at a time
Returns:
LocalFile object
"""
file = f"{output}.ogg"
with open(file, "wb") as f, Printer.progress(
desc=self.name,
total=self.input_stream.size,
unit="B",
unit_scale=True,
unit_divisor=1024,
position=0,
leave=False,
) as p_bar:
chunk = None
while chunk != b"":
chunk = self.input_stream.stream().read(chunk_size)
p_bar.update(f.write(chunk))
return LocalFile(Path(file), AudioFormat.VORBIS)
def get_cover_art(self, size: ImageSize = ImageSize.LARGE) -> bytes:
"""
Returns image data of cover art
Args:
size: Size of cover art
Returns:
Image data of cover art
"""
return get(
IMG_URL + bytes_to_hex(self.cover_images[size.value].file_id)
).content
class Track(PlayableContentFeeder.LoadedStream, Playable):
lyrics: Lyrics
def __init__(self, track: PlayableContentFeeder.LoadedStream, api):
super(Track, self).__init__(
track.track,
track.input_stream,
track.normalization_data,
track.metrics,
)
self.__api = api
try:
isinstance(self.track.album.genre, str)
except AttributeError:
self.album = self.__api.get_metadata_4_album(
AlbumId.from_hex(bytes_to_hex(self.track.album.gid))
)
self.cover_images = self.album.cover_group.image
self.metadata = self.__default_metadata()
def __getattr__(self, name):
try:
return super().__getattribute__(name)
except AttributeError:
return super().__getattribute__("track").__getattribute__(name)
def __default_metadata(self) -> dict[str, Any]:
date = self.album.date
return {
"album": self.album.name,
"album_artist": "\0".join([a.name for a in self.album.artist]),
"artist": self.artist[0].name,
"artists": "\0".join([a.name for a in self.artist]),
"date": f"{date.year}-{date.month}-{date.day}",
"release_date": f"{date.year}-{date.month}-{date.day}",
"disc_number": self.disc_number,
"duration": self.duration,
"explicit": self.explicit,
"genre": self.album.genre,
"isrc": self.external_id[0].id,
"licensor": self.licensor,
"popularity": self.popularity,
"track_number": self.number,
"replaygain_track_gain": self.normalization_data.track_gain_db,
"replaygain_track_peak": self.normalization_data.track_peak,
"replaygain_album_gain": self.normalization_data.album_gain_db,
"replaygain_album_prak": self.normalization_data.album_peak,
"title": self.name,
"track_title": self.name,
# "year": self.album.date.year,
}
def get_lyrics(self) -> Lyrics:
"""
Fetch lyrics from track if available
Returns:
Instance of track lyrics
"""
if not self.track.has_lyrics:
raise FileNotFoundError(
f"No lyrics available for {self.track.artist[0].name} - {self.track.name}"
)
try:
return self.lyrics
except AttributeError:
self.lyrics = Lyrics(
self.__api.invoke_url(LYRICS_URL + bytes_to_base62(self.track.gid))[
"lyrics"
]
)
return self.lyrics
class Episode(PlayableContentFeeder.LoadedStream, Playable):
def __init__(self, episode: PlayableContentFeeder.LoadedStream, api):
super(Episode, self).__init__(
episode.episode,
episode.input_stream,
episode.normalization_data,
episode.metrics,
)
self.__api = api
self.cover_images = self.episode.cover_image.image
self.metadata = self.__default_metadata()
def __getattr__(self, name):
try:
return super().__getattribute__(name)
except AttributeError:
return super().__getattribute__("episode").__getattribute__(name)
def __default_metadata(self) -> dict[str, Any]:
return {
"description": self.description,
"duration": self.duration,
"episode_number": self.number,
"explicit": self.explicit,
"language": self.language,
"podcast": self.show.name,
"date": self.publish_time,
"title": self.name,
}
def can_download_direct(self) -> bool:
"""Returns true if episode can be downloaded from its original external source"""
return bool(self.episode.is_externally_hosted)
def download_direct(self) -> LocalFile:
"""Downloads episode from original source"""
if not self.can_download_directly():
raise RuntimeError("Podcast cannot be downloaded direct")
raise NotImplementedError()

80
zotify/printer.py Normal file
View file

@ -0,0 +1,80 @@
from enum import Enum
from sys import stderr
from tqdm import tqdm
from zotify.config import (
Config,
PRINT_SKIPS,
PRINT_PROGRESS,
PRINT_ERRORS,
PRINT_WARNINGS,
PRINT_DOWNLOADS,
)
class PrintChannel(Enum):
SKIPS = PRINT_SKIPS
PROGRESS = PRINT_PROGRESS
ERRORS = PRINT_ERRORS
WARNINGS = PRINT_WARNINGS
DOWNLOADS = PRINT_DOWNLOADS
class Printer:
__config: Config
@classmethod
def __init__(cls, config: Config):
cls.__config = config
@classmethod
def print(cls, channel: PrintChannel, msg: str) -> None:
"""
Prints a message to console if the print channel is enabled
Args:
channel: PrintChannel to print to
msg: Message to print
"""
if cls.__config.get(channel.value):
if channel == PrintChannel.ERRORS:
print(msg, file=stderr)
else:
print(msg)
@classmethod
def progress(
cls,
iterable=None,
desc=None,
total=None,
leave=False,
position=0,
unit="it",
unit_scale=False,
unit_divisor=1000,
) -> tqdm:
"""
Prints progress bar
Returns:
tqdm decorated iterable
"""
return tqdm(
iterable=iterable,
desc=desc,
total=total,
disable=False, # cls.__config.print_progress,
leave=leave,
position=position,
unit=unit,
unit_scale=unit_scale,
unit_divisor=unit_divisor,
)
@staticmethod
def print_loader(msg: str) -> None:
"""
Prints animated loading symbol
Args:
msg: Message to print
"""
print(msg, flush=True, end="")

166
zotify/utils.py Normal file
View file

@ -0,0 +1,166 @@
from argparse import Action, ArgumentError
from enum import Enum, IntEnum
from re import IGNORECASE, sub
from sys import platform as PLATFORM
from librespot.audio.decoders import AudioQuality
from librespot.util import Base62, bytes_to_hex
from requests import get
API_URL = "https://api.sp" + "otify.com/v1/"
IMG_URL = "https://i.s" + "cdn.co/image/"
LYRICS_URL = "https://sp" + "client.wg.sp" + "otify.com/color-lyrics/v2/track/"
BASE62 = Base62.create_instance_with_inverted_character_set()
class AudioFormat(Enum):
AAC = "aac"
FDK_AAC = "fdk_aac"
FLAC = "flac"
MP3 = "mp3"
OPUS = "opus"
VORBIS = "vorbis"
WAV = "wav"
WV = "wavpack"
class ExtMap(Enum):
AAC = "m4a"
FDK_AAC = "m4a"
FLAC = "flac"
MP3 = "mp3"
OPUS = "ogg"
VORBIS = "ogg"
WAV = "wav"
WAVPACK = "wv"
class Quality(Enum):
NORMAL = AudioQuality.NORMAL # ~96kbps
HIGH = AudioQuality.HIGH # ~160kbps
VERY_HIGH = AudioQuality.VERY_HIGH # ~320kbps
AUTO = None # Highest quality available for account
def __str__(self):
return self.name.lower()
def __repr__(self):
return str(self)
@staticmethod
def from_string(s):
try:
return Quality[s.upper()]
except Exception:
return s
class ImageSize(IntEnum):
SMALL = 0 # 64px
MEDIUM = 1 # 300px
LARGE = 2 # 640px
def __str__(self):
return self.name.lower()
def __repr__(self):
return str(self)
@staticmethod
def from_string(s):
try:
return ImageSize[s.upper()]
except Exception:
return s
class OptionalOrFalse(Action):
def __init__(
self,
option_strings,
dest,
nargs="?",
default=None,
type=None,
choices=None,
required=False,
help=None,
metavar=None,
):
_option_strings = []
for option_string in option_strings:
_option_strings.append(option_string)
if option_string.startswith("--"):
option_string = "--no-" + option_string[2:]
_option_strings.append(option_string)
super().__init__(
option_strings=_option_strings,
dest=dest,
nargs=nargs,
default=default,
type=type,
choices=choices,
required=required,
help=help,
metavar=metavar,
)
def __call__(self, parser, namespace, values, option_string=None):
if values is None and not option_string.startswith("--no-"):
raise ArgumentError(self, "expected 1 argument")
setattr(
namespace,
self.dest,
values if not option_string.startswith("--no-") else False,
)
def fix_filename(filename: str, substitute: str = "_", platform: str = PLATFORM) -> str:
"""
Replace invalid characters on Linux/Windows/MacOS with underscores.
Original list from https://stackoverflow.com/a/31976060/819417
Trailing spaces & periods are ignored on Windows.
Args:
filename: The name of the file to repair
platform: Host operating system
substitute: Replacement character for disallowed characters
Returns:
Filename with replaced characters
"""
if platform == "linux":
regex = r"[/\0]|^(?![^.])|[\s]$"
elif platform == "darwin":
regex = r"[/\0:]|^(?![^.])|[\s]$"
else:
regex = r"[/\\:|<>\"?*\0-\x1f]|^(AUX|COM[1-9]|CON|LPT[1-9]|NUL|PRN)(?![^.])|^\s|[\s.]$"
return sub(regex, substitute, str(filename), flags=IGNORECASE)
def download_cover_art(images: list, size: ImageSize) -> bytes:
"""
Returns image data of cover art
Args:
images: list of retrievable images
size: Desired size in pixels of cover art, can be 640, 300, or 64
Returns:
Image data of cover art
"""
return get(images[size.value]["url"]).content
def str_to_bool(value: str) -> bool:
if value.lower() in ["yes", "y", "true"]:
return True
if value.lower() in ["no", "n", "false"]:
return False
raise TypeError("Not a boolean: " + value)
def bytes_to_base62(id: bytes) -> str:
return BASE62.encode(id, 22).decode()
def b62_to_hex(base62: str) -> str:
return bytes_to_hex(BASE62.decode(base62.encode(), 16))