More accurate search results

This commit is contained in:
zotify 2023-05-29 23:58:06 +12:00
parent 2908dadc5b
commit 30721125ef
11 changed files with 269 additions and 226 deletions

View file

@ -1,20 +1,16 @@
from __future__ import annotations
from pathlib import Path
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import (
ApiClient,
PlayableContentFeeder,
Session as LibrespotSession,
)
from librespot.core import ApiClient, PlayableContentFeeder
from librespot.core import Session as LibrespotSession
from librespot.metadata import EpisodeId, PlayableId, TrackId
from pwinput import pwinput
from requests import HTTPError, get
from zotify.playable import Episode, Track
from zotify.utils import (
API_URL,
Quality,
)
from zotify.utils import API_URL, Quality
class Api(ApiClient):
@ -40,8 +36,8 @@ class Api(ApiClient):
self,
url: str,
params: dict = {},
limit: int | None = None,
offset: int | None = None,
limit: int = 20,
offset: int = 0,
) -> dict:
"""
Requests data from api
@ -59,10 +55,8 @@ class Api(ApiClient):
"Accept-Language": self.__language,
"app-platform": "WebPlayer",
}
if limit:
params["limit"] = limit
if offset:
params["offset"] = offset
params["limit"] = limit
params["offset"] = offset
response = get(url, headers=headers, params=params)
data = response.json()
@ -78,61 +72,82 @@ class Api(ApiClient):
class Session:
__api: Api
__country: str
__is_premium: bool
__language: str
__session: LibrespotSession
__session_builder: LibrespotSession.Builder
def __init__(
self,
cred_file: Path | None = None,
username: str | None = None,
password: str | None = None,
save: bool | None = False,
session_builder: LibrespotSession.Builder,
language: str = "en",
) -> None:
"""
Authenticates user, saves credentials to a file
and generates api token
Authenticates user, saves credentials to a file and generates api token.
Args:
session_builder: An instance of the Librespot Session.Builder
langauge: ISO 639-1 language code
"""
self.__session_builder = session_builder
self.__session = self.__session_builder.create()
self.__language = language
self.__api = Api(self.__session, language)
@staticmethod
def from_file(cred_file: Path, langauge: str = "en") -> Session:
"""
Creates session using saved credentials file
Args:
cred_file: Path to credentials file
langauge: ISO 639-1 language code
Returns:
Zotify session
"""
conf = (
LibrespotSession.Configuration.Builder()
.set_store_credentials(False)
.build()
)
return Session(
LibrespotSession.Builder(conf).stored_file(str(cred_file)), langauge
)
@staticmethod
def from_userpass(
username: str = "",
password: str = "",
save_file: Path | None = None,
language: str = "en",
) -> Session:
"""
Creates session using username & password
Args:
cred_file: Path to the credentials file
username: Account username
password: Account password
save: Save given credentials to a file
save_file: Path to save login credentials to, optional.
langauge: ISO 639-1 language code
Returns:
Zotify session
"""
# Find an existing credentials file
if cred_file is not None and cred_file.is_file():
username = input("Username: ") if username == "" else username
password = (
pwinput(prompt="Password: ", mask="*") if password == "" else password
)
if save_file:
save_file.parent.mkdir(parents=True, exist_ok=True)
conf = (
LibrespotSession.Configuration.Builder()
.set_stored_credential_file(str(save_file))
.build()
)
else:
conf = (
LibrespotSession.Configuration.Builder()
.set_store_credentials(False)
.build()
)
self.__session = (
LibrespotSession.Builder(conf).stored_file(str(cred_file)).create()
)
# Otherwise get new credentials
else:
username = input("Username: ") if username is None else username
password = (
pwinput(prompt="Password: ", mask="*") if password is None else password
)
# Save credentials to file
if save and cred_file:
cred_file.parent.mkdir(parents=True, exist_ok=True)
conf = (
LibrespotSession.Configuration.Builder()
.set_stored_credential_file(str(cred_file))
.build()
)
else:
conf = (
LibrespotSession.Configuration.Builder()
.set_store_credentials(False)
.build()
)
self.__session = (
LibrespotSession.Builder(conf).user_pass(username, password).create()
)
self.__api = Api(self.__session, language)
return Session(
LibrespotSession.Builder(conf).user_pass(username, password), language
)
def __get_playable(
self, playable_id: PlayableId, quality: Quality
@ -182,3 +197,7 @@ class Session:
def is_premium(self) -> bool:
"""Returns users premium account status"""
return self.__session.get_user_attribute("type") == "premium"
def clone(self) -> Session:
"""Creates a copy of the session for use in a parallel thread"""
return Session(session_builder=self.__session_builder, language=self.__language)

View file

@ -7,7 +7,7 @@ from zotify.app import App
from zotify.config import CONFIG_PATHS, CONFIG_VALUES
from zotify.utils import OptionalOrFalse
VERSION = "0.9.1"
VERSION = "0.9.2"
def main():
@ -21,6 +21,11 @@ def main():
action="store_true",
help="Print version and exit",
)
parser.add_argument(
"--debug",
action="store_true",
help="Don't hide tracebacks",
)
parser.add_argument(
"--config",
type=Path,
@ -31,7 +36,7 @@ def main():
"-l",
"--library",
type=Path,
help="Specify a path to the root of a music/podcast library",
help="Specify a path to the root of a music/playlist/podcast library",
)
parser.add_argument(
"-o", "--output", type=str, help="Specify the output location/format"
@ -45,8 +50,8 @@ def main():
nargs="+",
help="Searches for only this type",
)
parser.add_argument("--username", type=str, help="Account username")
parser.add_argument("--password", type=str, help="Account password")
parser.add_argument("--username", type=str, default="", help="Account username")
parser.add_argument("--password", type=str, default="", help="Account password")
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"urls",
@ -123,12 +128,15 @@ def main():
if args.version:
print(VERSION)
return
args.func(args)
return
try:
if args.debug:
args.func(args)
except Exception as e:
print(f"Fatal Error: {e}")
else:
try:
args.func(args)
except Exception:
from traceback import format_exc
print(format_exc().splitlines()[-1])
if __name__ == "__main__":

View file

@ -18,8 +18,7 @@ from zotify import Session
from zotify.config import Config
from zotify.file import TranscodingError
from zotify.loader import Loader
from zotify.playable import Track
from zotify.printer import Printer, PrintChannel
from zotify.printer import PrintChannel, Printer
from zotify.utils import API_URL, AudioFormat, b62_to_hex
@ -174,39 +173,46 @@ class App:
__session: Session
__playable_list: list[PlayableData] = []
def __init__(
self,
args: Namespace,
):
def __init__(self, args: Namespace):
self.__config = Config(args)
Printer(self.__config)
if self.__config.audio_format == AudioFormat.VORBIS and (self.__config.ffmpeg_args != "" or self.__config.ffmpeg_path != ""):
Printer.print(PrintChannel.WARNINGS, "FFmpeg options will be ignored since no transcoding is required")
if self.__config.audio_format == AudioFormat.VORBIS and (
self.__config.ffmpeg_args != "" or self.__config.ffmpeg_path != ""
):
Printer.print(
PrintChannel.WARNINGS,
"FFmpeg options will be ignored since no transcoding is required",
)
with Loader("Logging in..."):
if self.__config.credentials is False:
self.__session = Session()
if (
args.username != "" and args.password != ""
) or not self.__config.credentials.is_file():
self.__session = Session.from_userpass(
args.username,
args.password,
self.__config.credentials,
self.__config.language,
)
else:
self.__session = Session(
cred_file=self.__config.credentials,
save=True,
language=self.__config.language,
self.__session = Session.from_file(
self.__config.credentials, self.__config.language
)
ids = self.get_selection(args)
with Loader("Parsing input..."):
try:
self.parse(ids)
except (IndexError, TypeError) as e:
except ParsingError as e:
Printer.print(PrintChannel.ERRORS, str(e))
self.download()
self.download_all()
def get_selection(self, args: Namespace) -> list[str]:
selection = Selection(self.__session)
try:
if args.search:
return selection.search(args.search, args.category)
return selection.search(" ".join(args.search), args.category)
elif args.playlist:
return selection.get("playlists", "items")
elif args.followed:
@ -222,7 +228,7 @@ class App:
return ids
elif args.urls:
return args.urls
except (FileNotFoundError, ValueError):
except (FileNotFoundError, ValueError, KeyboardInterrupt):
pass
Printer.print(PrintChannel.WARNINGS, "there is nothing to do")
exit()
@ -340,63 +346,56 @@ class App:
"""Returns list of Playable items"""
return self.__playable_list
def download(self) -> None:
def download_all(self) -> None:
"""Downloads playable to local file"""
for playable in self.__playable_list:
if playable.type == PlayableType.TRACK:
with Loader("Fetching track..."):
track = self.__session.get_track(
playable.id, self.__config.download_quality
)
elif playable.type == PlayableType.EPISODE:
with Loader("Fetching episode..."):
track = self.__session.get_episode(playable.id)
else:
Printer.print(
PrintChannel.SKIPS,
f'Download Error: Unknown playable content "{playable.type}"',
self.__download(playable)
def __download(self, playable: PlayableData) -> None:
if playable.type == PlayableType.TRACK:
with Loader("Fetching track..."):
track = self.__session.get_track(
playable.id, self.__config.download_quality
)
continue
try:
output = track.create_output(playable.library, playable.output)
except FileExistsError as e:
Printer.print(PrintChannel.SKIPS, str(e))
continue
file = track.write_audio_stream(
output,
self.__config.chunk_size,
elif playable.type == PlayableType.EPISODE:
with Loader("Fetching episode..."):
track = self.__session.get_episode(playable.id)
else:
Printer.print(
PrintChannel.SKIPS,
f'Download Error: Unknown playable content "{playable.type}"',
)
if self.__config.save_lyrics and isinstance(track, Track):
with Loader("Fetching lyrics..."):
try:
track.get_lyrics().save(output)
except FileNotFoundError as e:
Printer.print(PrintChannel.SKIPS, str(e))
return
Printer.print(PrintChannel.DOWNLOADS, f"\nDownloaded {track.name}")
output = track.create_output(playable.library, playable.output)
file = track.write_audio_stream(
output,
self.__config.chunk_size,
)
if self.__config.audio_format != AudioFormat.VORBIS:
if self.__config.save_lyrics and playable.type == PlayableType.TRACK:
with Loader("Fetching lyrics..."):
try:
with Loader(PrintChannel.PROGRESS, "Converting audio..."):
file.transcode(
self.__config.audio_format,
self.__config.transcode_bitrate
if self.__config.transcode_bitrate > 0
else None,
True,
self.__config.ffmpeg_path
if self.__config.ffmpeg_path != ""
else "ffmpeg",
self.__config.ffmpeg_args.split(),
)
except TranscodingError as e:
Printer.print(PrintChannel.ERRORS, str(e))
track.get_lyrics().save(output)
except FileNotFoundError as e:
Printer.print(PrintChannel.SKIPS, str(e))
if self.__config.save_metadata:
with Loader("Writing metadata..."):
file.write_metadata(track.metadata)
file.write_cover_art(
track.get_cover_art(self.__config.artwork_size)
Printer.print(PrintChannel.DOWNLOADS, f"\nDownloaded {track.name}")
if self.__config.audio_format != AudioFormat.VORBIS:
try:
with Loader(PrintChannel.PROGRESS, "Converting audio..."):
file.transcode(
self.__config.audio_format,
self.__config.transcode_bitrate,
True,
self.__config.ffmpeg_path,
self.__config.ffmpeg_args.split(),
)
except TranscodingError as e:
Printer.print(PrintChannel.ERRORS, str(e))
if self.__config.save_metadata:
with Loader("Writing metadata..."):
file.write_metadata(track.metadata)
file.write_cover_art(track.get_cover_art(self.__config.artwork_size))

View file

@ -6,7 +6,6 @@ from typing import Any
from zotify.utils import AudioFormat, ImageSize, Quality
ALL_ARTISTS = "all_artists"
ARTWORK_SIZE = "artwork_size"
AUDIO_FORMAT = "audio_format"
@ -60,9 +59,9 @@ CONFIG_PATHS = {
}
OUTPUT_PATHS = {
"album": "{album_artist}/{album}/{track_number}. {artist} - {title}",
"album": "{album_artist}/{album}/{track_number}. {artists} - {title}",
"podcast": "{podcast}/{episode_number} - {title}",
"playlist_track": "{playlist}/{playlist_number}. {artist} - {title}",
"playlist_track": "{playlist}/{playlist_number}. {artists} - {title}",
"playlist_episode": "{playlist}/{playlist_number}. {episode_number} - {title}",
}
@ -170,7 +169,7 @@ CONFIG_VALUES = {
"default": "en",
"type": str,
"arg": "--language",
"help": "Language for metadata"
"help": "Language for metadata",
},
SAVE_LYRICS: {
"default": True,
@ -239,7 +238,7 @@ CONFIG_VALUES = {
"help": "Show progress bars",
},
PRINT_SKIPS: {
"default": True,
"default": False,
"type": bool,
"arg": "--print-skips",
"help": "Show messages if a song is being skipped",

View file

@ -1,6 +1,6 @@
from errno import ENOENT
from pathlib import Path
from subprocess import Popen, PIPE
from subprocess import PIPE, Popen
from typing import Any
from music_tag import load_file
@ -9,12 +9,8 @@ from mutagen.oggvorbis import OggVorbisHeaderError
from zotify.utils import AudioFormat
# fmt: off
class TranscodingError(RuntimeError): ...
class TargetExistsError(FileExistsError, TranscodingError): ...
class FFmpegNotFoundError(FileNotFoundError, TranscodingError): ...
class FFmpegExecutionError(OSError, TranscodingError): ...
# fmt: on
class TranscodingError(RuntimeError):
...
class LocalFile:
@ -22,19 +18,18 @@ class LocalFile:
self,
path: Path,
audio_format: AudioFormat | None = None,
bitrate: int | None = None,
bitrate: int = -1,
):
self.__path = path
self.__audio_format = audio_format
self.__bitrate = bitrate
if audio_format:
self.__audio_format = audio_format
def transcode(
self,
audio_format: AudioFormat | None = None,
bitrate: int | None = None,
bitrate: int = -1,
replace: bool = False,
ffmpeg: str = "ffmpeg",
ffmpeg: str = "",
opt_args: list[str] = [],
) -> None:
"""
@ -46,12 +41,15 @@ class LocalFile:
ffmpeg: Location of FFmpeg binary
opt_args: Additional arguments to pass to ffmpeg
"""
if audio_format is not None:
new_ext = audio_format.value.ext
if not audio_format:
audio_format = self.__audio_format
if audio_format:
ext = audio_format.value.ext
else:
new_ext = self.__audio_format.value.ext
ext = self.__path.suffix[1:]
cmd = [
ffmpeg,
ffmpeg if ffmpeg != "" else "ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
@ -59,38 +57,35 @@ class LocalFile:
"-i",
str(self.__path),
]
newpath = self.__path.parent.joinpath(
self.__path.name.rsplit(".", 1)[0] + new_ext
)
if self.__path == newpath:
raise TargetExistsError(
f"Transcoding Error: Cannot overwrite source, target file is already a {self.__audio_format} file."
path = self.__path.parent.joinpath(self.__path.name.rsplit(".", 1)[0] + ext)
if self.__path == path:
raise TranscodingError(
f"Cannot overwrite source, target file {path} already exists."
)
cmd.extend(["-b:a", str(bitrate) + "k"]) if bitrate else None
cmd.extend(["-b:a", str(bitrate) + "k"]) if bitrate > 0 else None
cmd.extend(["-c:a", audio_format.value.name]) if audio_format else None
cmd.extend(opt_args)
cmd.append(str(newpath))
cmd.append(str(path))
try:
process = Popen(cmd, stdin=PIPE)
process.wait()
except OSError as e:
if e.errno == ENOENT:
raise FFmpegNotFoundError("Transcoding Error: FFmpeg was not found")
raise TranscodingError("FFmpeg was not found")
else:
raise
if process.returncode != 0:
raise FFmpegExecutionError(
f'Transcoding Error: `{" ".join(cmd)}` failed with error code {process.returncode}'
raise TranscodingError(
f'`{" ".join(cmd)}` failed with error code {process.returncode}'
)
if replace:
self.__path.unlink()
self.__path = newpath
self.__path = path
self.__audio_format = audio_format
self.__bitrate = bitrate
if audio_format:
self.__audio_format = audio_format
def write_metadata(self, metadata: dict[str, Any]) -> None:
"""
@ -121,4 +116,4 @@ class LocalFile:
try:
f.save()
except OggVorbisHeaderError:
pass
pass # Thrown when using untranscoded file, nothing breaks.

View file

@ -3,8 +3,8 @@ from pathlib import Path
from typing import Any
from librespot.core import PlayableContentFeeder
from librespot.util import bytes_to_hex
from librespot.structure import GeneralAudioStream
from librespot.util import bytes_to_hex
from requests import get
from zotify.file import LocalFile
@ -69,7 +69,7 @@ class Playable:
"""
for k, v in self.metadata.items():
output = output.replace(
"{" + k + "}", fix_filename(str(v).replace("\0", ","))
"{" + k + "}", fix_filename(str(v).replace("\0", ", "))
)
file_path = library.joinpath(output).expanduser()
if file_path.exists() and not replace:

View file

@ -1,14 +1,15 @@
from enum import Enum
from sys import stderr
from tqdm import tqdm
from zotify.config import (
Config,
PRINT_SKIPS,
PRINT_PROGRESS,
PRINT_ERRORS,
PRINT_WARNINGS,
PRINT_DOWNLOADS,
PRINT_ERRORS,
PRINT_PROGRESS,
PRINT_SKIPS,
PRINT_WARNINGS,
Config,
)