import re import shutil import platform import aiohttp import appdirs import zipfile import warnings import json from pathlib import Path from configparser import ConfigParser from aiopath import AsyncPath from worthless import constants from worthless.launcher import Launcher async def _download_file(file_url: str, file_name: str, file_path: Path | str, file_len: int = None, overwrite=False, chunks=8192): """ Download file name to temporary directory, :param file_url: :param file_name: :return: """ params = {} file_path = AsyncPath(file_path).joinpath(file_name) if overwrite: await file_path.unlink(missing_ok=True) if await file_path.exists(): cur_len = len(await file_path.read_bytes()) params |= { "Range": f"bytes={cur_len}-{file_len if file_len else ''}" } else: await file_path.touch() async with aiohttp.ClientSession() as session: rsp = await session.get(file_url, params=params, timeout=None) rsp.raise_for_status() while True: chunk = await rsp.content.read(chunks) if not chunk: break async with file_path.open("ab") as f: await f.write(chunk) class HDiffPatch: def __init__(self, git_url=None, data_dir=None): if not git_url: repo_url = constants.HDIFFPATCH_GIT_URL self._git_url = git_url if not data_dir: self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("HDiffPatch") self.data_path = Path(self._appdirs.user_data_dir).joinpath("Tools/HDiffPatch") else: if not isinstance(data_dir, Path): data_dir = Path(data_dir) self.data_path = Path(data_dir).joinpath("Tools/HDiffPatch") self.temp_path = data_dir.joinpath("Temp/HDiffPatch") self.temp_path.mkdir(parents=True, exist_ok=True) @staticmethod def _get_platform_arch(): match platform.system(): case "Windows": match platform.architecture()[0]: case "32bit": return "windows32" case "64bit": return "windows64" case "Linux": match platform.architecture()[0]: case "32bit": return "linux32" case "64bit": return "linux64" case "Darwin": return "macos" # Rip BSD they need to use Linux compatibility layer to run this (or use Wine if they prefer that) raise RuntimeError("Unsupported platform") def _get_hdiffpatch_exec(self, exec_name): if shutil.which(exec_name): return exec_name if not any(self.data_path.iterdir()): return None platform_arch_path = self.data_path.joinpath(self._get_platform_arch()) if platform_arch_path.joinpath(exec_name).exists(): return str(platform_arch_path.joinpath(exec_name)) return None def get_hpatchz_executable(self): return self._get_hdiffpatch_exec("hpatchz") def get_hdiffz_executable(self): return self._get_hdiffpatch_exec("hdiffz") async def _get_latest_release_info(self): async with aiohttp.ClientSession() as session: split = self._git_url.split("/") repo = split[-1] owner = split[-2] rsp = await session.get("https://api.github.com/repos/{}/{}/releases/latest".format(owner, repo), params={"Headers": "Accept: application/vnd.github.v3+json"}) rsp.raise_for_status() for asset in await rsp.json()["assets"]: if asset["name"].endswith(".zip") and not "linux" in asset["name"] and not "windows" in asset["name"] \ and not "macos" in asset["name"] and not "android" in asset["name"]: return asset async def get_latest_release_url(self): asset = await self._get_latest_release_info() return asset["browser_download_url"] async def get_latest_release_name(self): asset = await self._get_latest_release_info() return asset["name"] async def download_latest_release(self, extract=True): url = await self.get_latest_release_url() name = await self.get_latest_release_name() if not url: raise RuntimeError("Unable to find latest release") await _download_file(url, name, self.temp_path, overwrite=True) if not extract: return archive = zipfile.ZipFile(self.temp_path.joinpath(name)) archive.extractall(self.data_path) archive.close() class Installer: def _read_version_from_config(self): warnings.warn("This function is not reliable as upgrading game version from worthless\ doesn't write the config.", DeprecationWarning) if not self._config_file.exists(): raise FileNotFoundError(f"Config file {self._config_file} not found") cfg = ConfigParser() cfg.read(str(self._config_file)) return cfg.get("General", "game_version") @staticmethod def read_version_from_game_file(globalgamemanagers: Path | bytes): """ Reads the version from the globalgamemanagers file. (Data/globalgamemanagers) Uses `An Anime Game Launcher` method to read the version: https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26 :return: Game version (ex 1.0.0) """ if isinstance(globalgamemanagers, Path): with globalgamemanagers.open("rb") as f: data = f.read().decode("ascii", errors="ignore") else: data = globalgamemanagers.decode("ascii", errors="ignore") result = re.search(r"([1-9]+\.[0-9]+\.[0-9]+)_[\d]+_[\d]+", data) if not result: raise ValueError("Could not find version in game file") return result.group(1) def get_game_data_name(self): if self._overseas: return "GenshinImpact_Data/" else: return "YuanShen_Data/" def get_game_data_path(self): return self._gamedir.joinpath(self.get_game_data_name()) def get_game_version(self): globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers") if not globalgamemanagers.exists(): return return self.read_version_from_game_file(globalgamemanagers) def get_installed_voiceovers(self): """ Returns a list of installed voiceovers. :return: List of installed voiceovers """ voiceovers = [] for file in self.get_game_data_path().joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows").iterdir(): if file.is_dir(): voiceovers.append(file.name) return voiceovers def __init__(self, gamedir: str | Path = Path.cwd(), overseas: bool = True, data_dir: str | Path = None): if isinstance(gamedir, str): gamedir = Path(gamedir) self._gamedir = gamedir if not data_dir: self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("Installer") else: if not isinstance(data_dir, Path): data_dir = Path(data_dir) self.temp_path = data_dir.joinpath("Temp/Installer/") self.temp_path.mkdir(parents=True, exist_ok=True) config_file = self._gamedir.joinpath("config.ini") self._config_file = config_file.resolve() self._download_chunk = 8192 self._version = None self._overseas = overseas self._launcher = Launcher(self._gamedir, overseas=self._overseas) self._hdiffpatch = HDiffPatch(data_dir=data_dir) self._version = self.get_game_version() def set_download_chunk(self, chunk: int): self._download_chunk = chunk async def _download_file(self, file_url: str, file_name: str, file_len: int = None, overwrite=False): """ Download file name to temporary directory, :param file_url: :param file_name: :return: """ await _download_file(file_url, file_name, self.temp_path, file_len=file_len, overwrite=overwrite) def get_game_archive_version(self, game_archive: str | Path): if not game_archive.exists(): raise FileNotFoundError(f"Game archive {game_archive} not found") archive = zipfile.ZipFile(game_archive, 'r') return self.read_version_from_game_file(archive.read(self.get_game_data_name() + "globalgamemanagers")) @staticmethod def voiceover_lang_translate(lang: str): """ Translates the voiceover language to the language code used by the game. :param lang: Language to translate :return: Language code """ match lang: case "English(US)": return "en-us" case "Japanese": return "ja-jp" case "Chinese": return "zh-cn" case "Korean": return "ko-kr" return lang @staticmethod def get_voiceover_archive_language(voiceover_archive: str | Path): if isinstance(voiceover_archive, str): voiceover_archive = Path(voiceover_archive).resolve() if not voiceover_archive.exists(): raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found") archive = zipfile.ZipFile(voiceover_archive, 'r') archive_path = zipfile.Path(archive) for file in archive_path.iterdir(): if file.name.endswith("_pkg_version"): return file.name.split("_")[1] def get_voiceover_archive_type(self, voiceover_archive: str | Path): vo_lang = self.get_voiceover_archive_language(voiceover_archive) archive = zipfile.ZipFile(voiceover_archive, 'r') archive_path = zipfile.Path(archive) files = archive.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n") for file in files: if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists(): return False return True def apply_voiceover(self, voiceover_archive: str | Path): # Since Voiceover packages are unclear about diff package or full package # we will try to extract the voiceover package and apply it to the game # making this function universal for both cases if not self.get_game_data_path().exists(): raise FileNotFoundError(f"Game not found in {self._gamedir}") if isinstance(voiceover_archive, str): voiceover_archive = Path(voiceover_archive).resolve() if not voiceover_archive.exists(): raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found") archive = zipfile.ZipFile(voiceover_archive, 'r') archive.extractall(self._gamedir) archive.close() def update_game(self, game_archive: str | Path): if not self.get_game_data_path().exists(): raise FileNotFoundError(f"Game not found in {self._gamedir}") if isinstance(game_archive, str): game_archive = Path(game_archive).resolve() if not game_archive.exists(): raise FileNotFoundError(f"Update archive {game_archive} not found") archive = zipfile.ZipFile(game_archive, 'r') files = archive.namelist() # Don't extract these files (they're useless and if the game isn't patched then it'll # raise 31-4xxx error ingame) for file in ["deletefiles.txt", "hdifffiles.txt"]: try: files.remove(file) except ValueError: pass deletefiles = archive.read("deletefiles.txt").decode().split("\n") for file in deletefiles: current_game_file = self._gamedir.joinpath(file) if not current_game_file.exists(): continue if current_game_file.is_file(): current_game_file.unlink(missing_ok=True) archive.extractall(self._gamedir, members=files) archive.close() # Update game version on local variable. self._version = self.get_game_version() async def download_full_game(self, overwrite: bool = False): if self._version and not overwrite: raise ValueError("Game already exists") archive = await self._launcher.get_resource_info() if archive is None: raise RuntimeError("Failed to fetch game resource info.") if self._version == archive.game.latest.version: raise ValueError("Game is already up to date.") await self._download_file(archive.game.latest.path, archive.game.latest.name, archive.game.latest.size) async def download_full_voiceover(self, language: str): archive = await self._launcher.get_resource_info() if archive is None: raise RuntimeError("Failed to fetch game resource info.") translated_lang = self.voiceover_lang_translate(language) for vo in archive.game.latest.voice_packs: if vo.language == translated_lang: await self._download_file(vo.path, vo.name, vo.size) async def download_game_update(self, from_version: str = None): if not from_version: if self._version: from_version = self._version else: from_version = self._version = self.get_game_version() if not from_version: raise ValueError("No game version found") version_info = await self._launcher.get_resource_info() if version_info is None: raise RuntimeError("Failed to fetch game resource info.") if self._version == version_info.game.latest.version: raise ValueError("Game is already up to date.") diff_archive = await self.get_game_diff_archive(from_version) if diff_archive is None: raise ValueError("Game diff archive is not available for this version, please reinstall.") await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size) async def download_voiceover_update(self, language: str, from_version: str = None): if not from_version: if self._version: from_version = self._version else: from_version = self._version = self.get_game_version() if not from_version: raise ValueError("No game version found") version_info = await self._launcher.get_resource_info() if version_info is None: raise RuntimeError("Failed to fetch game resource info.") diff_archive = await self.get_voiceover_diff_archive(language, from_version) if diff_archive is None: raise ValueError("Voiceover diff archive is not available for this version, please reinstall.") await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size) def uninstall_game(self): shutil.rmtree(self._gamedir) def install_game(self, game_archive: str | Path, force_reinstall: bool = False): """Installs the game to the current directory If `force_reinstall` is True, the game will be uninstalled then reinstalled. """ if self.get_game_data_path().exists(): if not force_reinstall: raise ValueError(f"Game is already installed in {self._gamedir}") self.uninstall_game() self._gamedir.mkdir(parents=True, exist_ok=True) if isinstance(game_archive, str): game_archive = Path(game_archive).resolve() if not game_archive.exists(): raise FileNotFoundError(f"Install archive {game_archive} not found") archive = zipfile.ZipFile(game_archive, 'r') archive.extractall(self._gamedir) archive.close() async def get_voiceover_diff_archive(self, lang: str, from_version: str = None): """Gets a diff archive from `from_version` to the latest one If from_version is not specified, it will be taken from the game version. """ if not from_version: if self._version: from_version = self._version else: from_version = self._version = self.get_game_version() if not from_version: raise ValueError("No game version found") game_resource = await self._launcher.get_resource_info() if not game_resource: raise ValueError("Could not fetch game resource") translated_lang = self.voiceover_lang_translate(lang) for v in game_resource.game.diffs: if v.version != from_version: continue for vo in v.voice_packs: if vo.language != translated_lang: continue return vo async def get_game_diff_archive(self, from_version: str = None): """Gets a diff archive from `from_version` to the latest one If from_version is not specified, it will be taken from the game version. """ if not from_version: if self._version: from_version = self._version else: from_version = self._version = self.get_game_version() if not from_version: raise ValueError("No game version found") game_resource = await self._launcher.get_resource_info() if not game_resource: raise ValueError("Could not fetch game resource") for v in game_resource.game.diffs: if v.version == from_version: return v