todo: add global config support

This commit is contained in:
logykk 2022-02-02 21:56:57 +13:00
parent 1eef9756fd
commit 70da426463
15 changed files with 197 additions and 157 deletions

0
zotify/__init__.py Normal file
View file

View file

@ -7,10 +7,10 @@ It's like youtube-dl, but for that other music platform.
import argparse
from app import client
from config import CONFIG_VALUES
from zotify.app import client
from zotify.config import CONFIG_VALUES
if __name__ == '__main__':
def main():
parser = argparse.ArgumentParser(prog='zotify',
description='A music and podcast downloader needing only a python interpreter and ffmpeg.')
parser.add_argument('-ns', '--no-splash',
@ -51,3 +51,6 @@ if __name__ == '__main__':
args = parser.parse_args()
args.func(args)
if __name__ == '__main__':
main()

View file

@ -1,8 +1,8 @@
from const import ITEMS, ARTISTS, NAME, ID
from termoutput import Printer
from track import download_track
from utils import fix_filename
from zotify import Zotify
from zotify.const import ITEMS, ARTISTS, NAME, ID
from zotify.termoutput import Printer
from zotify.track import download_track
from zotify.utils import fix_filename
from zotify.zotify import Zotify
ALBUM_URL = 'https://api.spotify.com/v1/albums'
ARTIST_URL = 'https://api.spotify.com/v1/artists'

View file

@ -1,16 +1,17 @@
from librespot.audio.decoders import AudioQuality
from tabulate import tabulate
import os
#import os
from pathlib import Path
from album import download_album, download_artist_albums
from const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUM, ALBUMS, \
OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME
from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from podcast import download_episode, get_show_episodes
from termoutput import Printer, PrintChannel
from track import download_track, get_saved_tracks
from utils import splash, split_input, regex_input_for_urls
from zotify import Zotify
from zotify.album import download_album, download_artist_albums
from zotify.const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUM, ALBUMS, \
OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME, TYPE
from zotify.playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from zotify.podcast import download_episode, get_show_episodes
from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track, get_saved_tracks
from zotify.utils import splash, split_input, regex_input_for_urls
from zotify.zotify import Zotify
SEARCH_URL = 'https://api.spotify.com/v1/search'
@ -31,7 +32,7 @@ def client(args) -> None:
if args.download:
urls = []
filename = args.download
if os.path.exists(filename):
if Path(filename).exists():
with open(filename, 'r', encoding='utf-8') as file:
urls.extend([line.strip() for line in file.readlines()])
@ -88,14 +89,17 @@ def download_from_urls(urls: list[str]) -> bool:
if not song[TRACK][NAME] or not song[TRACK][ID]:
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ANYMORE ###' + "\n")
else:
download_track('playlist', song[TRACK][ID], extra_keys=
{
'playlist_song_name': song[TRACK][NAME],
'playlist': name,
'playlist_num': str(enum).zfill(char_num),
'playlist_id': playlist_id,
'playlist_track_id': song[TRACK][ID]
})
if song[TRACK][TYPE] == "episode": # Playlist item is a podcast episode
download_episode(song[TRACK][ID])
else:
download_track('playlist', song[TRACK][ID], extra_keys=
{
'playlist_song_name': song[TRACK][NAME],
'playlist': name,
'playlist_num': str(enum).zfill(char_num),
'playlist_id': playlist_id,
'playlist_track_id': song[TRACK][ID]
})
enum += 1
elif episode_id is not None:
download = True

View file

@ -1,8 +1,9 @@
import json
import os
# import os
from pathlib import Path, PurePath
from typing import Any
CONFIG_FILE_PATH = '../zconfig.json'
CONFIG_FILE_PATH = './zconfig.json'
ROOT_PATH = 'ROOT_PATH'
ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH'
@ -34,34 +35,34 @@ PRINT_WARNINGS = 'PRINT_WARNINGS'
RETRY_ATTEMPTS = 'RETRY_ATTEMPTS'
CONFIG_VALUES = {
ROOT_PATH: { 'default': '../Zotify Music/', 'type': str, 'arg': '--root-path' },
ROOT_PODCAST_PATH: { 'default': '../Zotify Podcasts/', 'type': str, 'arg': '--root-podcast-path' },
SKIP_EXISTING_FILES: { 'default': 'True', 'type': bool, 'arg': '--skip-existing-files' },
SKIP_PREVIOUSLY_DOWNLOADED: { 'default': 'False', 'type': bool, 'arg': '--skip-previously-downloaded' },
RETRY_ATTEMPTS: { 'default': '5', 'type': int, 'arg': '--retry-attemps' },
DOWNLOAD_FORMAT: { 'default': 'ogg', 'type': str, 'arg': '--download-format' },
FORCE_PREMIUM: { 'default': 'False', 'type': bool, 'arg': '--force-premium' },
ANTI_BAN_WAIT_TIME: { 'default': '1', 'type': int, 'arg': '--anti-ban-wait-time' },
OVERRIDE_AUTO_WAIT: { 'default': 'False', 'type': bool, 'arg': '--override-auto-wait' },
CHUNK_SIZE: { 'default': '50000', 'type': int, 'arg': '--chunk-size' },
SPLIT_ALBUM_DISCS: { 'default': 'False', 'type': bool, 'arg': '--split-album-discs' },
DOWNLOAD_REAL_TIME: { 'default': 'False', 'type': bool, 'arg': '--download-real-time' },
LANGUAGE: { 'default': 'en', 'type': str, 'arg': '--language' },
BITRATE: { 'default': '', 'type': str, 'arg': '--bitrate' },
SONG_ARCHIVE: { 'default': '.song_archive', 'type': str, 'arg': '--song-archive' },
CREDENTIALS_LOCATION: { 'default': 'credentials.json', 'type': str, 'arg': '--credentials-location' },
OUTPUT: { 'default': '', 'type': str, 'arg': '--output' },
PRINT_SPLASH: { 'default': 'False', 'type': bool, 'arg': '--print-splash' },
PRINT_SKIPS: { 'default': 'True', 'type': bool, 'arg': '--print-skips' },
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
PRINT_API_ERRORS: { 'default': 'False', 'type': bool, 'arg': '--print-api-errors' },
PRINT_PROGRESS_INFO: { 'default': 'True', 'type': bool, 'arg': '--print-progress-info' },
PRINT_WARNINGS: { 'default': 'True', 'type': bool, 'arg': '--print-warnings' },
MD_ALLGENRES: { 'default': 'False', 'type': bool, 'arg': '--md-allgenres' },
MD_GENREDELIMITER: { 'default': ';', 'type': str, 'arg': '--md-genredelimiter' },
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' }
ROOT_PATH: { 'default': './Zotify Music/', 'type': str, 'arg': '--root-path' },
ROOT_PODCAST_PATH: { 'default': './Zotify Podcasts/', 'type': str, 'arg': '--root-podcast-path' },
SKIP_EXISTING_FILES: { 'default': 'True', 'type': bool, 'arg': '--skip-existing-files' },
SKIP_PREVIOUSLY_DOWNLOADED: { 'default': 'False', 'type': bool, 'arg': '--skip-previously-downloaded' },
RETRY_ATTEMPTS: { 'default': '5', 'type': int, 'arg': '--retry-attemps' },
DOWNLOAD_FORMAT: { 'default': 'ogg', 'type': str, 'arg': '--download-format' },
FORCE_PREMIUM: { 'default': 'False', 'type': bool, 'arg': '--force-premium' },
ANTI_BAN_WAIT_TIME: { 'default': '1', 'type': int, 'arg': '--anti-ban-wait-time' },
OVERRIDE_AUTO_WAIT: { 'default': 'False', 'type': bool, 'arg': '--override-auto-wait' },
CHUNK_SIZE: { 'default': '50000', 'type': int, 'arg': '--chunk-size' },
SPLIT_ALBUM_DISCS: { 'default': 'False', 'type': bool, 'arg': '--split-album-discs' },
DOWNLOAD_REAL_TIME: { 'default': 'False', 'type': bool, 'arg': '--download-real-time' },
LANGUAGE: { 'default': 'en', 'type': str, 'arg': '--language' },
BITRATE: { 'default': '', 'type': str, 'arg': '--bitrate' },
SONG_ARCHIVE: { 'default': '.song_archive', 'type': str, 'arg': '--song-archive' },
CREDENTIALS_LOCATION: { 'default': 'credentials.json', 'type': str, 'arg': '--credentials-location' },
OUTPUT: { 'default': '', 'type': str, 'arg': '--output' },
PRINT_SPLASH: { 'default': 'False', 'type': bool, 'arg': '--print-splash' },
PRINT_SKIPS: { 'default': 'True', 'type': bool, 'arg': '--print-skips' },
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
PRINT_API_ERRORS: { 'default': 'False', 'type': bool, 'arg': '--print-api-errors' },
PRINT_PROGRESS_INFO: { 'default': 'True', 'type': bool, 'arg': '--print-progress-info' },
PRINT_WARNINGS: { 'default': 'True', 'type': bool, 'arg': '--print-warnings' },
MD_ALLGENRES: { 'default': 'False', 'type': bool, 'arg': '--md-allgenres' },
MD_GENREDELIMITER: { 'default': ';', 'type': str, 'arg': '--md-genredelimiter' },
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' }
}
OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}'
@ -76,17 +77,18 @@ class Config:
@classmethod
def load(cls, args) -> None:
app_dir = os.path.dirname(__file__)
#app_dir = PurePath(__file__).parent
app_dir = Path.cwd()
config_fp = CONFIG_FILE_PATH
if args.config_location:
config_fp = args.config_location
true_config_file_path = os.path.join(app_dir, config_fp)
true_config_file_path = PurePath(app_dir).joinpath(config_fp)
# Load config from zconfig.json
if not os.path.exists(true_config_file_path):
if not Path(true_config_file_path).exists():
with open(true_config_file_path, 'w', encoding='utf-8') as config_file:
json.dump(cls.get_default_json(), config_file, indent=4)
cls.Values = cls.get_default_json()
@ -142,11 +144,11 @@ class Config:
@classmethod
def get_root_path(cls) -> str:
return os.path.join(os.path.dirname(__file__), cls.get(ROOT_PATH))
return PurePath(Path.cwd()).joinpath(cls.get(ROOT_PATH))
@classmethod
def get_root_podcast_path(cls) -> str:
return os.path.join(os.path.dirname(__file__), cls.get(ROOT_PODCAST_PATH))
return PurePath(Path.cwd()).joinpath(cls.get(ROOT_PODCAST_PATH))
@classmethod
def get_skip_existing_files(cls) -> bool:
@ -194,17 +196,17 @@ class Config:
@classmethod
def get_song_archive(cls) -> str:
return os.path.join(cls.get_root_path(), cls.get(SONG_ARCHIVE))
return PurePath(cls.get_root_path()).joinpath(cls.get(SONG_ARCHIVE))
@classmethod
def get_credentials_location(cls) -> str:
return os.path.join(os.getcwd(), cls.get(CREDENTIALS_LOCATION))
return PurePath(Path.cwd()).joinpath(cls.get(CREDENTIALS_LOCATION))
@classmethod
def get_temp_download_dir(cls) -> str:
if cls.get(TEMP_DOWNLOAD_DIR) == '':
return ''
return os.path.join(cls.get_root_path(), cls.get(TEMP_DOWNLOAD_DIR))
return PurePath(cls.get_root_path()).joinpath(cls.get(TEMP_DOWNLOAD_DIR))
@classmethod
def get_all_genres(cls) -> bool:
@ -221,28 +223,38 @@ class Config:
return v
if mode == 'playlist':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_PLAYLIST)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
# split = os.path.split(OUTPUT_DEFAULT_PLAYLIST)
# return os.path.join(split[0], 'Disc {disc_number}', split[0])
split = PurePath(OUTPUT_DEFAULT_PLAYLIST).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_PLAYLIST
if mode == 'extplaylist':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_PLAYLIST_EXT)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
# split = os.path.split(OUTPUT_DEFAULT_PLAYLIST_EXT)
# return os.path.join(split[0], 'Disc {disc_number}', split[0])
split = PurePath(OUTPUT_DEFAULT_PLAYLIST_EXT).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_PLAYLIST_EXT
if mode == 'liked':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_LIKED_SONGS)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
# split = os.path.split(OUTPUT_DEFAULT_LIKED_SONGS)
# return os.path.join(split[0], 'Disc {disc_number}', split[0])
split = PurePath(OUTPUT_DEFAULT_LIKED_SONGS).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_LIKED_SONGS
if mode == 'single':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_SINGLE)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
# split = os.path.split(OUTPUT_DEFAULT_SINGLE)
# return os.path.join(split[0], 'Disc {disc_number}', split[0])
split = PurePath(OUTPUT_DEFAULT_SINGLE).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_SINGLE
if mode == 'album':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_ALBUM)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
# split = os.path.split(OUTPUT_DEFAULT_ALBUM)
# return os.path.join(split[0], 'Disc {disc_number}', split[0])
split = PurePath(OUTPUT_DEFAULT_ALBUM).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_ALBUM
raise ValueError()

View file

@ -7,7 +7,7 @@ from shutil import get_terminal_size
from threading import Thread
from time import sleep
from termoutput import Printer
from zotify.termoutput import Printer
class Loader:

View file

@ -1,8 +1,8 @@
from const import ITEMS, ID, TRACK, NAME
from termoutput import Printer
from track import download_track
from utils import split_input
from zotify import Zotify
from zotify.const import ITEMS, ID, TRACK, NAME
from zotify.termoutput import Printer
from zotify.track import download_track
from zotify.utils import split_input
from zotify.zotify import Zotify
MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists'
PLAYLISTS_URL = 'https://api.spotify.com/v1/playlists'

View file

@ -1,14 +1,15 @@
import os
# import os
from pathlib import PurePath, Path
import time
from typing import Optional, Tuple
from librespot.metadata import EpisodeId
from const import ERROR, ID, ITEMS, NAME, SHOW, DURATION_MS
from termoutput import PrintChannel, Printer
from utils import create_download_directory, fix_filename
from zotify import Zotify
from loader import Loader
from zotify.const import ERROR, ID, ITEMS, NAME, SHOW, DURATION_MS
from zotify.termoutput import PrintChannel, Printer
from zotify.utils import create_download_directory, fix_filename
from zotify.zotify import Zotify
from zotify.loader import Loader
EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes'
@ -46,7 +47,7 @@ def get_show_episodes(show_id_str) -> list:
def download_podcast_directly(url, filename):
import functools
import pathlib
# import pathlib
import shutil
import requests
from tqdm.auto import tqdm
@ -58,7 +59,8 @@ def download_podcast_directly(url, filename):
f"Request to {url} returned status code {r.status_code}")
file_size = int(r.headers.get('Content-Length', 0))
path = pathlib.Path(filename).expanduser().resolve()
# path = pathlib.Path(filename).expanduser().resolve()
path = Path(filename).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)
desc = "(Unknown total file size)" if file_size == 0 else ""
@ -86,8 +88,8 @@ def download_episode(episode_id) -> None:
direct_download_url = Zotify.invoke_url(
'https://api-partner.spotify.com/pathfinder/v1/query?operationName=getEpisode&variables={"uri":"spotify:episode:' + episode_id + '"}&extensions={"persistedQuery":{"version":1,"sha256Hash":"224ba0fd89fcfdfb3a15fa2d82a6112d3f4e2ac88fba5c6713de04d1b72cf482"}}')[1]["data"]["episode"]["audio"]["items"][-1]["url"]
download_directory = os.path.join(Zotify.CONFIG.get_root_podcast_path(), extra_paths)
download_directory = os.path.realpath(download_directory)
download_directory = PurePath(Zotify.CONFIG.get_root_podcast_path()).joinpath(extra_paths)
# download_directory = os.path.realpath(download_directory)
create_download_directory(download_directory)
if "anon-podcast.scdn.co" in direct_download_url:
@ -97,10 +99,10 @@ def download_episode(episode_id) -> None:
total_size = stream.input_stream.size
filepath = os.path.join(download_directory, f"{filename}.ogg")
filepath = PurePath(download_directory).joinpath(f"{filename}.ogg")
if (
os.path.isfile(filepath)
and os.path.getsize(filepath) == total_size
Path(filepath).isfile()
and Path(filepath).stat().st_size == total_size
and Zotify.CONFIG.get_skip_existing_files()
):
Printer.print(PrintChannel.SKIPS, "\n### SKIPPING: " + podcast_name + " - " + episode_name + " (EPISODE ALREADY EXISTS) ###")
@ -128,7 +130,7 @@ def download_episode(episode_id) -> None:
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
else:
filepath = os.path.join(download_directory, f"{filename}.mp3")
filepath = PurePath(download_directory).joinpath(f"{filename}.mp3")
download_podcast_directly(direct_download_url, filepath)
prepare_download_loader.stop()

View file

@ -2,8 +2,8 @@ import sys
from enum import Enum
from tqdm import tqdm
from config import *
from zotify import Zotify
from zotify.config import *
from zotify.zotify import Zotify
class PrintChannel(Enum):

View file

@ -1,4 +1,5 @@
import os
# import os
from pathlib import Path, PurePath
import re
import time
import uuid
@ -8,14 +9,14 @@ from librespot.audio.decoders import AudioQuality
from librespot.metadata import TrackId
from ffmpy import FFmpeg
from const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
from zotify.const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, HREF
from termoutput import Printer, PrintChannel
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
from zotify.termoutput import Printer, PrintChannel
from zotify.utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds
from zotify import Zotify
from zotify.zotify import Zotify
import traceback
from loader import Loader
from zotify.loader import Loader
def get_saved_tracks() -> list:
@ -136,25 +137,27 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", ext)
filename = os.path.join(Zotify.CONFIG.get_root_path(), output_template)
filedir = os.path.dirname(filename)
filename = PurePath(Zotify.CONFIG.get_root_path()).joinpath(output_template)
filedir = PurePath(filename).parent
filename_temp = filename
if Zotify.CONFIG.get_temp_download_dir() != '':
filename_temp = os.path.join(Zotify.CONFIG.get_temp_download_dir(), f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}')
filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath(f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}')
check_name = os.path.isfile(filename) and os.path.getsize(filename)
check_name = Path(filename).is_file() and Path(filename).stat().st_size
check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
# a song with the same name is installed
if not check_id and check_name:
c = len([file for file in os.listdir(filedir) if re.search(f'^{filename}_', str(file))]) + 1
c = len([file for file in Path(filedir).iterdir() if re.search(f'^{filename}_', str(file))]) + 1
fname = os.path.splitext(os.path.basename(filename))[0]
ext = os.path.splitext(os.path.basename(filename))[1]
# fname = os.path.splitext(os.path.basename(filename))[0]
# ext = os.path.splitext(os.path.basename(filename))[1]
fname = PurePath(PurePath(filename).name).parent
ext = PurePath(PurePath(filename).name).suffix
filename = os.path.join(filedir, f'{fname}_{c}{ext}')
filename = PurePath(filedir).joinpath(f'{fname}_{c}{ext}')
except Exception as e:
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
@ -218,18 +221,18 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
set_music_thumbnail(filename_temp, image_url)
if filename_temp != filename:
os.rename(filename_temp, filename)
Path(filename_temp).rename(filename)
time_finished = time.time()
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, Zotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n")
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{Path(filename).relative_to(Zotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n")
# add song id to archive file
if Zotify.CONFIG.get_skip_previously_downloaded():
add_to_archive(scraped_song_id, os.path.basename(filename), artists[0], name)
add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name)
# add song id to download directory's .song_ids file
if not check_id:
add_to_directory_song_ids(filedir, scraped_song_id, os.path.basename(filename), artists[0], name)
add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name)
if not Zotify.CONFIG.get_anti_ban_wait_time():
time.sleep(Zotify.CONFIG.get_anti_ban_wait_time())
@ -241,16 +244,17 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
Printer.print(PrintChannel.ERRORS, "\n")
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
if os.path.exists(filename_temp):
os.remove(filename_temp)
if Path(filename_temp).exists():
Path(filename_temp).unlink()
prepare_download_loader.stop()
def convert_audio_format(filename) -> None:
""" Converts raw audio into playable file """
temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
os.replace(filename, temp_filename)
# temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
temp_filename = f'{PurePath(filename).parent}.tmp'
Path(filename).replace(temp_filename)
download_format = Zotify.CONFIG.get_download_format().lower()
file_codec = CODEC_MAP.get(download_format, 'copy')
@ -277,5 +281,5 @@ def convert_audio_format(filename) -> None:
with Loader(PrintChannel.PROGRESS_INFO, "Converting file..."):
ff_m.run()
if os.path.exists(temp_filename):
os.remove(temp_filename)
if Path(temp_filename).exists():
Path(temp_filename).unlink()

View file

@ -5,14 +5,15 @@ import platform
import re
import subprocess
from enum import Enum
from pathlib import Path, PurePath
from typing import List, Tuple
import music_tag
import requests
from const import ARTIST, GENRE, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
from zotify.const import ARTIST, GENRE, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
WINDOWS_SYSTEM, ALBUMARTIST
from zotify import Zotify
from zotify.zotify import Zotify
class MusicFormat(str, Enum):
@ -22,11 +23,12 @@ class MusicFormat(str, Enum):
def create_download_directory(download_path: str) -> None:
""" Create directory and add a hidden file with song ids """
os.makedirs(download_path, exist_ok=True)
# os.makedirs(download_path, exist_ok=True)
Path(download_path).mkdir(parents=True, exist_ok=True)
# add hidden file with song ids
hidden_file_path = os.path.join(download_path, '.song_ids')
if not os.path.isfile(hidden_file_path):
hidden_file_path = PurePath(download_path).joinpath('.song_ids')
if not Path(hidden_file_path).is_file():
with open(hidden_file_path, 'w', encoding='utf-8') as f:
pass
@ -37,7 +39,7 @@ def get_previously_downloaded() -> List[str]:
ids = []
archive_path = Zotify.CONFIG.get_song_archive()
if os.path.exists(archive_path):
if Path(archive_path).exists():
with open(archive_path, 'r', encoding='utf-8') as f:
ids = [line.strip().split('\t')[0] for line in f.readlines()]
@ -49,7 +51,7 @@ def add_to_archive(song_id: str, filename: str, author_name: str, song_name: str
archive_path = Zotify.CONFIG.get_song_archive()
if os.path.exists(archive_path):
if Path(archive_path).exists():
with open(archive_path, 'a', encoding='utf-8') as file:
file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
else:
@ -62,8 +64,8 @@ def get_directory_song_ids(download_path: str) -> List[str]:
song_ids = []
hidden_file_path = os.path.join(download_path, '.song_ids')
if os.path.isfile(hidden_file_path):
hidden_file_path = PurePath(download_path).joinpath('.song_ids')
if Path(hidden_file_path).is_file():
with open(hidden_file_path, 'r', encoding='utf-8') as file:
song_ids.extend([line.strip().split('\t')[0] for line in file.readlines()])
@ -73,7 +75,7 @@ def get_directory_song_ids(download_path: str) -> List[str]:
def add_to_directory_song_ids(download_path: str, song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Appends song_id to .song_ids file in directory """
hidden_file_path = os.path.join(download_path, '.song_ids')
hidden_file_path = PurePath(download_path).joinpath('.song_ids')
# not checking if file exists because we need an exception
# to be raised if something is wrong
with open(hidden_file_path, 'a', encoding='utf-8') as file:

View file

@ -1,15 +1,16 @@
import os
import os.path
from pathlib import Path
from getpass import getpass
import time
import requests
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import Session
from const import TYPE, \
from zotify.const import TYPE, \
PREMIUM, USER_READ_EMAIL, OFFSET, LIMIT, \
PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ
from config import Config
from zotify.config import Config
class Zotify:
SESSION: Session = None
@ -26,7 +27,7 @@ class Zotify:
cred_location = Config.get_credentials_location()
if os.path.isfile(cred_location):
if Path(cred_location).is_file():
try:
cls.SESSION = Session.Builder().stored_file(cred_location).create()
return
@ -75,7 +76,7 @@ class Zotify:
@classmethod
def invoke_url(cls, url, tryCount=0):
# we need to import that here, otherwise we will get circular imports!
from termoutput import Printer, PrintChannel
from zotify.termoutput import Printer, PrintChannel
headers = cls.get_auth_header()
response = requests.get(url, headers=headers)
responsetext = response.text