import re import shutil import appdirs import zipfile import warnings import json from pathlib import Path from configparser import ConfigParser from worthless import constants from worthless.launcher import Launcher class Installer: def _read_version_from_config(self): warnings.warn("This function is not reliable as upgrading game version from worthless\ doesn't write the config.", DeprecationWarning) if not self._config_file.exists(): raise FileNotFoundError(f"Config file {self._config_file} not found") cfg = ConfigParser() cfg.read(str(self._config_file)) return cfg.get("General", "game_version") @staticmethod def read_version_from_game_file(globalgamemanagers: Path | bytes): """ Reads the version from the globalgamemanagers file. (Data/globalgamemanagers) Uses `An Anime Game Launcher` method to read the version: https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26 :return: Game version (ex 1.0.0) """ if isinstance(globalgamemanagers, Path): with globalgamemanagers.open("rb") as f: data = f.read().decode("ascii", errors="ignore") else: data = globalgamemanagers.decode("ascii", errors="ignore") result = re.search(r"([1-9]+\.[0-9]+\.[0-9]+)_[\d]+_[\d]+", data) if not result: raise ValueError("Could not find version in game file") return result.group(1) def get_game_data_name(self): if self._overseas: return "GenshinImpact_Data/" else: return "YuanShen_Data/" def get_game_data_path(self): return self._gamedir.joinpath(self.get_game_data_name()) def get_game_version(self): globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers") if not globalgamemanagers.exists(): return return self.read_version_from_game_file(globalgamemanagers) def get_installed_voiceovers(self): """ Returns a list of installed voiceovers. :return: List of installed voiceovers """ voiceovers = [] for file in self.get_game_data_path().joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows").iterdir(): if file.is_dir(): voiceovers.append(file.name) return voiceovers def __init__(self, gamedir: str | Path = Path.cwd(), overseas: bool = True, data_dir: str | Path = None): if isinstance(gamedir, str): gamedir = Path(gamedir) self._gamedir = gamedir if not data_dir: self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) self._temp_path = Path(self._appdirs.user_cache_dir).joinpath("Installer") else: if not isinstance(data_dir, Path): data_dir = Path(data_dir) self._temp_path = data_dir.joinpath("Temp/Installer/") config_file = self._gamedir.joinpath("config.ini") self._config_file = config_file.resolve() self._version = None self._overseas = overseas self._launcher = Launcher(self._gamedir, overseas=self._overseas) self._version = self.get_game_version() def get_game_archive_version(self, game_archive: str | Path): if not game_archive.exists(): raise FileNotFoundError(f"Game archive {game_archive} not found") archive = zipfile.ZipFile(game_archive, 'r') return self.read_version_from_game_file(archive.read(self.get_game_data_name() + "globalgamemanagers")) @staticmethod def voiceover_lang_translate(lang: str): """ Translates the voiceover language to the language code used by the game. :param lang: Language to translate :return: Language code """ match lang: case "English(US)": return "en-us" case "Japanese": return "ja-jp" case "Chinese": return "zh-cn" case "Korean": return "ko-kr" @staticmethod def get_voiceover_archive_language(voiceover_archive: str | Path): if isinstance(voiceover_archive, str): voiceover_archive = Path(voiceover_archive).resolve() if not voiceover_archive.exists(): raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found") archive = zipfile.ZipFile(voiceover_archive, 'r') archive_path = zipfile.Path(archive) for file in archive_path.iterdir(): if file.name.endswith("_pkg_version"): return file.name.split("_")[1] def get_voiceover_archive_type(self, voiceover_archive: str | Path): vo_lang = self.get_voiceover_archive_language(voiceover_archive) archive = zipfile.ZipFile(voiceover_archive, 'r') archive_path = zipfile.Path(archive) files = archive.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n") for file in files: if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists(): return False return True def apply_voiceover(self, voiceover_archive: str | Path): # Since Voiceover packages are unclear about diff package or full package # we will try to extract the voiceover package and apply it to the game # making this function universal for both cases if not self.get_game_data_path().exists(): raise FileNotFoundError(f"Game not found in {self._gamedir}") if isinstance(voiceover_archive, str): voiceover_archive = Path(voiceover_archive).resolve() if not voiceover_archive.exists(): raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found") archive = zipfile.ZipFile(voiceover_archive, 'r') archive.extractall(self._gamedir) archive.close() def update_game(self, game_archive: str | Path): if not self.get_game_data_path().exists(): raise FileNotFoundError(f"Game not found in {self._gamedir}") if isinstance(game_archive, str): game_archive = Path(game_archive).resolve() if not game_archive.exists(): raise FileNotFoundError(f"Update archive {game_archive} not found") archive = zipfile.ZipFile(game_archive, 'r') files = archive.namelist() # Don't extract these files (they're useless and if the game isn't patched then it'll # raise 31-4xxx error ingame) for file in ["deletefiles.txt", "hdifffiles.txt"]: try: files.remove(file) except ValueError: pass deletefiles = archive.read("deletefiles.txt").decode().split("\n") for file in deletefiles: current_game_file = self._gamedir.joinpath(file) if not current_game_file.exists(): continue if current_game_file.is_file(): current_game_file.unlink(missing_ok=True) archive.extractall(self._gamedir, members=files) archive.close() async def download_game_update(self): if self._version is None: raise ValueError("Game version not found, use install_game to install the game.") version_info = await self._launcher.get_resource_info() if version_info is None: raise RuntimeError("Failed to fetch game resource info.") if self._version == version_info.game.latest.version: raise ValueError("Game is already up to date.") diff_archive = self.get_game_diff_archive() if diff_archive is None: raise ValueError("Game diff archive is not available for this version, please reinstall.") # TODO: Download the diff archive raise NotImplementedError("Downloading game diff archive is not implemented yet.") def uninstall_game(self): shutil.rmtree(self._gamedir) def install_game(self, game_archive: str | Path, force_reinstall: bool = False): """Installs the game to the current directory If `force_reinstall` is True, the game will be uninstalled then reinstalled. """ if self.get_game_data_path().exists(): if not force_reinstall: raise ValueError(f"Game is already installed in {self._gamedir}") self.uninstall_game() self._gamedir.mkdir(parents=True, exist_ok=True) if isinstance(game_archive, str): game_archive = Path(game_archive).resolve() if not game_archive.exists(): raise FileNotFoundError(f"Install archive {game_archive} not found") archive = zipfile.ZipFile(game_archive, 'r') archive.extractall(self._gamedir) archive.close() async def get_game_diff_archive(self, from_version: str = None): """Gets a diff archive from `from_version` to the latest one If from_version is not specified, it will be taken from the game version. """ if not from_version: if self._version: from_version = self._version else: from_version = self._version = self.get_game_version() if not from_version: raise ValueError("No game version found") game_resource = await self._launcher.get_resource_info() if not game_resource: raise ValueError("Could not fetch game resource") for v in game_resource.game.diffs: if v.version == from_version: return v