diff --git a/worthless/constants.py b/worthless/constants.py index db55dc4..f684960 100644 --- a/worthless/constants.py +++ b/worthless/constants.py @@ -5,6 +5,7 @@ LAUNCHER_API_URL_CN = "https://sdk-static.mihoyo.com/hk4e_cn/mdk/launcher/api" PATCH_GIT_URL = "https://notabug.org/Krock/dawn" TELEMETRY_URL_LIST = [ "log-upload-os.mihoyo.com", + "log-upload-os.hoyoverse.com", "log-upload.mihoyo.com", "overseauspider.yuanshen.com" "uspider.yuanshen.com" diff --git a/worthless/gui.py b/worthless/gui.py index 9f9ee65..97107b9 100755 --- a/worthless/gui.py +++ b/worthless/gui.py @@ -5,22 +5,70 @@ import appdirs from pathlib import Path from worthless.launcher import Launcher from worthless.installer import Installer +from worthless.patcher import Patcher import worthless.constants as constants class UI: - def __init__(self, gamedir: str, noconfirm: bool) -> None: + def __init__(self, gamedir: str, noconfirm: bool, tempdir: str | Path = None) -> None: self._noconfirm = noconfirm self._gamedir = gamedir self._launcher = Launcher(gamedir) - self._installer = Installer(gamedir) + self._installer = Installer(gamedir, data_dir=tempdir) + self._patcher = Patcher(gamedir) - def _ask(self, title, description): - raise NotImplementedError() + @staticmethod + def _ask(question): + answer = "" + while answer.lower() not in ['y', 'n']: + if answer != "": + print("Invalid choice, please try again.") + answer = input(question + " (y/n): ") + return answer.lower() == 'y' def get_game_version(self): print(self._installer.get_game_version()) + def _update_from_archive(self, filepath): + print("Reverting patches if patched...") + self._patcher.revert_patch(True) + print("Updating game from archive...") + self._installer.update_game(filepath) + + def _apply_voiceover_from_archive(self, filepath): + print("Applying voiceover from archive...") + self._installer.apply_voiceover(filepath) + + def install_voiceover_from_file(self, filepath): + if not self._ask("Do you want to apply this voiceover pack? ({})".format(filepath)): + print("Aborting apply process.") + return + self._apply_voiceover_from_archive(filepath) + print("Voiceover applied successfully.") + + def revert_patch(self): + print("Reverting patches...") + self._patcher.revert_patch(True) + print("Patches reverted.") + + def install_from_file(self, filepath): + gamever = self._installer.get_game_version() + if gamever: + print("Current game installation detected. ({})".format(self._installer.get_game_version())) + print("Archive game version: " + self._installer.get_archive_version(filepath)) + if not self._ask("Do you want to update the game? (from {})".format(filepath)): + print("Aborting update process.") + return + self._update_from_archive(filepath) + print("Game updated successfully.") + else: + print("No game installation detected.") + if not self._ask("Do you want to install the game? ({})".format(filepath)): + print("Aborting installation process.") + return + raise NotImplementedError("Install from file not implemented yet.") + # print("Game installed successfully.") + def install_game(self): # TODO raise NotImplementedError("Install game is not implemented.") @@ -43,11 +91,14 @@ def main(): help="Specify the game directory (default current working directory)") parser.add_argument("-W", "--temporary-dir", action="store", type=Path, default=None, help="Specify the temporary directory (default {} and {})".format(default_dirs.user_data_dir, - default_dirs.user_cache_dir)) + default_dirs.user_cache_dir)) parser.add_argument("-S", "--install", action="store_true", help="Install/update the game (if not already installed, else do nothing)") - parser.add_argument("-U", "--install-from-file", action="store_true", - help="Install the game from the game archive (if not already installed, \ + parser.add_argument("-U", "--install-from-file", action="store", type=Path, default=None, + help="Install the game from an archive (if not already installed, \ + else update from archive)") + parser.add_argument("-Uv", "--install-voiceover-from-file", action="store", type=Path, default=None, + help="Install the voiceover from an archive (if not already installed, \ else update from archive)") parser.add_argument("-Sp", "--patch", action="store_true", help="Patch the game (if not already patched, else do nothing)") @@ -66,12 +117,25 @@ def main(): help="Do not ask any for confirmation. (Ignored in interactive mode)") args = parser.parse_args() interactive_mode = not args.install and not args.install_from_file and not args.patch and not args.update and not \ - args.remove and not args.remove_patch and not args.remove_voiceover and not args.get_game_version - ui = UI(args.dir, args.noconfirm) + args.remove and not args.remove_patch and not args.remove_voiceover and not args.get_game_version and not \ + args.install_voiceover_from_file + if args.temporary_dir: + args.temporary_dir.mkdir(parents=True, exist_ok=True) + + ui = UI(args.dir, args.noconfirm, args.temporary_dir) if args.install and args.update: raise ValueError("Cannot specify both --install and --update arguments.") + if args.install_from_file and args.update: + raise ValueError("Cannot specify both --install-from-file and --update arguments.") + + if args.install_voiceover_from_file and args.update: + raise ValueError("Cannot specify both --install-voiceover-from-file and --update arguments.") + + if args.install_from_file and args.install: + raise ValueError("Cannot specify both --install-from-file and --install arguments.") + if args.get_game_version: ui.get_game_version() @@ -80,7 +144,15 @@ def main(): if args.update: ui.update_game() - return + + if args.install_from_file: + ui.install_from_file(args.install_from_file) + + if args.install_voiceover_from_file: + ui.install_voiceover_from_file(args.install_voiceover_from_file) + + if args.remove_patch: + ui.revert_patch() if interactive_mode: ui.interactive_ui() diff --git a/worthless/installer.py b/worthless/installer.py index 7ad21dd..09dd071 100644 --- a/worthless/installer.py +++ b/worthless/installer.py @@ -1,5 +1,6 @@ import re import appdirs +import zipfile from pathlib import Path from configparser import ConfigParser @@ -7,9 +8,12 @@ from worthless import constants from worthless.launcher import Launcher -def _read_version_from_game_file(globalgamemanagers: Path): - with globalgamemanagers.open("rb") as f: - data = f.read().decode("ascii", errors="ignore") +def read_version_from_game_file(globalgamemanagers: Path | bytes): + if isinstance(globalgamemanagers, Path): + with globalgamemanagers.open("rb") as f: + data = f.read().decode("ascii", errors="ignore") + else: + data = globalgamemanagers.decode("ascii", errors="ignore") result = re.search(r"([1-9]+\.[0-9]+\.[0-9]+)_[\d]+_[\d]+", data) if not result: raise ValueError("Could not find version in game file") @@ -24,18 +28,24 @@ class Installer: cfg.read(str(self._config_file)) return cfg.get("General", "game_version") + def get_game_data_name(self): + if self._overseas: + return "GenshinImpact_Data/" + else: + return "YuanShen_Data/" + + def get_game_data_path(self): + return self._gamedir.joinpath(self.get_game_data_name()) + # https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26 def get_game_version(self): if self._config_file.exists(): return self._read_version_from_config() else: - if self._overseas: - globalgamemanagers = self._gamedir.joinpath("./GenshinImpact_Data/globalgamemanagers") - else: - globalgamemanagers = self._gamedir.joinpath("./YuanShen_Data/globalgamemanagers") + globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers") if not globalgamemanagers.exists(): return - return _read_version_from_game_file(globalgamemanagers) + return read_version_from_game_file(globalgamemanagers) def __init__(self, gamedir: str | Path = Path.cwd(), overseas: bool = True, data_dir: str | Path = None): if isinstance(gamedir, str): @@ -55,6 +65,61 @@ class Installer: self._launcher = Launcher(self._gamedir, overseas=self._overseas) self._version = self.get_game_version() + def get_archive_version(self, game_archive: str | Path): + if not game_archive.exists(): + raise FileNotFoundError(f"Game archive {game_archive} not found") + archive = zipfile.ZipFile(game_archive, 'r') + return read_version_from_game_file(archive.read(self.get_game_data_name() + "globalgamemanagers")) + + def apply_voiceover(self, voiceover_archive: str | Path): + if not self.get_game_data_path().exists(): + raise FileNotFoundError(f"Game not found in {self._gamedir}") + if isinstance(voiceover_archive, str): + game_archive = Path(voiceover_archive).resolve() + if not voiceover_archive.exists(): + raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found") + archive = zipfile.ZipFile(voiceover_archive, 'r') + archive.extractall(self._gamedir) + archive.close() + + def update_game(self, game_archive: str | Path): + if not self.get_game_data_path().exists(): + raise FileNotFoundError(f"Game not found in {self._gamedir}") + if isinstance(game_archive, str): + game_archive = Path(game_archive).resolve() + if not game_archive.exists(): + raise FileNotFoundError(f"Update archive {game_archive} not found") + archive = zipfile.ZipFile(game_archive, 'r') + deletefiles = archive.read("deletefiles.txt").decode().split("\n") + for file in deletefiles: + current_game_file = self._gamedir.joinpath(file) + if not current_game_file.exists(): + continue + if current_game_file.is_file(): + current_game_file.unlink(missing_ok=True) + archive.extractall(self._gamedir) + archive.close() + self._gamedir.joinpath("deletefiles.txt").unlink(missing_ok=True) + self._gamedir.joinpath("hdifffiles.txt").unlink(missing_ok=True) + + async def install_game(self, force_reinstall: bool = False): + """Installs the game to the current directory + + If `from_version` is not specified, it will be taken from the game version. + If `to_version` is not specified, it will be taken from the game version. + """ + raise NotImplementedError("Not implemented yet") + # if not force: + # if self._temp_path.exists(): + # raise FileExistsError(f"Directory {self._temp_path} already exists") + # self._temp_path.mkdir(parents=True, exist_ok=True) + # self._launcher.set_temp_path(self._temp_path) + # await self._launcher.download_game_diff_archive(from_version, to_version) + # await self._launcher.extract_game_diff_archive() + # await self._launcher.install_game_diff_archive() + # self._launcher.set_temp_path(None) + # self._temp_path.rmdir() + async def get_game_diff_archive(self, from_version: str = None): """Gets a diff archive from `from_version` to the latest one diff --git a/worthless/patcher.py b/worthless/patcher.py index a72e1c0..b6e006e 100644 --- a/worthless/patcher.py +++ b/worthless/patcher.py @@ -1,15 +1,17 @@ import asyncio import tarfile -import constants import appdirs -import aiofiles from pathlib import Path import shutil import aiohttp +import asyncio +from worthless import constants +from worthless.launcher import Launcher +from worthless.installer import Installer class Patcher: - def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None): + def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None, overseas=True): self._gamedir = gamedir self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace('http://', 'https://') if not data_dir: @@ -21,6 +23,8 @@ class Patcher: data_dir = Path(data_dir) self._patch_path = data_dir.joinpath("Patch") self._temp_path = data_dir.joinpath("Temp/Patcher") + self._installer = Installer(self._gamedir, overseas=overseas, data_dir=self._temp_path) + self._launcher = Launcher(self._gamedir, overseas=overseas) @staticmethod async def _get(url, **kwargs) -> aiohttp.ClientResponse: @@ -96,10 +100,42 @@ class Patcher: """ pass - def revert_patch(self): + def _revert_file(self, original_file: str, base_file: Path, ignore_error=False): + original_path = self._gamedir.joinpath(original_file + ".bak").resolve() + target_file = self._gamedir.joinpath(original_file).resolve() + if original_path.exists(): + if abs(base_file.stat().st_mtime_ns - original_path.stat().st_mtime_ns) > 3600: + if not ignore_error: + raise RuntimeError("{} is not for this game version.".format(original_path.name)) + original_path.unlink(missing_ok=True) + else: + target_file.unlink(missing_ok=True) + original_path.rename(target_file) + + def revert_patch(self, ignore_errors=True) -> None: """ Revert the patch (and revert the login door crash fix if patched) :return: None """ - pass + game_exec = self._gamedir.joinpath(asyncio.run(self._launcher.get_resource_info()).game.latest.entry) + revert_files = [ + "UnityPlayer.dll", + self._installer.get_game_data_name() + "upload_crash.exe", + self._installer.get_game_data_name() + "Plugins/crashreport.exe", + self._installer.get_game_data_name() + "Plugins/xlua.dll", + ] + for file in revert_files: + self._revert_file(file, game_exec, ignore_errors) + self._gamedir.joinpath("launcher.bat").unlink(missing_ok=True) + self._gamedir.joinpath("mhyprot2_running.reg").unlink(missing_ok=True) + + def get_files(extensions): + all_files = [] + for ext in extensions: + all_files.extend(self._gamedir.glob(ext)) + return all_files + + files = get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log')) + for file in files: + file.unlink(missing_ok=True)