Added support for updating game & applying voiceover packs

Implemented & Improved some CLI commands.
Currently working in 2.5.0 version.

A few more work and this should be fully usable,,,
This commit is contained in:
tretrauit 2022-02-17 02:43:21 +07:00
parent da3ee30ab1
commit b1a9223c19
No known key found for this signature in database
GPG Key ID: 862760FF1903319E
4 changed files with 197 additions and 23 deletions

View File

@ -5,6 +5,7 @@ LAUNCHER_API_URL_CN = "https://sdk-static.mihoyo.com/hk4e_cn/mdk/launcher/api"
PATCH_GIT_URL = "https://notabug.org/Krock/dawn"
TELEMETRY_URL_LIST = [
"log-upload-os.mihoyo.com",
"log-upload-os.hoyoverse.com",
"log-upload.mihoyo.com",
"overseauspider.yuanshen.com"
"uspider.yuanshen.com"

View File

@ -5,22 +5,70 @@ import appdirs
from pathlib import Path
from worthless.launcher import Launcher
from worthless.installer import Installer
from worthless.patcher import Patcher
import worthless.constants as constants
class UI:
def __init__(self, gamedir: str, noconfirm: bool) -> None:
def __init__(self, gamedir: str, noconfirm: bool, tempdir: str | Path = None) -> None:
self._noconfirm = noconfirm
self._gamedir = gamedir
self._launcher = Launcher(gamedir)
self._installer = Installer(gamedir)
self._installer = Installer(gamedir, data_dir=tempdir)
self._patcher = Patcher(gamedir)
def _ask(self, title, description):
raise NotImplementedError()
@staticmethod
def _ask(question):
answer = ""
while answer.lower() not in ['y', 'n']:
if answer != "":
print("Invalid choice, please try again.")
answer = input(question + " (y/n): ")
return answer.lower() == 'y'
def get_game_version(self):
print(self._installer.get_game_version())
def _update_from_archive(self, filepath):
print("Reverting patches if patched...")
self._patcher.revert_patch(True)
print("Updating game from archive...")
self._installer.update_game(filepath)
def _apply_voiceover_from_archive(self, filepath):
print("Applying voiceover from archive...")
self._installer.apply_voiceover(filepath)
def install_voiceover_from_file(self, filepath):
if not self._ask("Do you want to apply this voiceover pack? ({})".format(filepath)):
print("Aborting apply process.")
return
self._apply_voiceover_from_archive(filepath)
print("Voiceover applied successfully.")
def revert_patch(self):
print("Reverting patches...")
self._patcher.revert_patch(True)
print("Patches reverted.")
def install_from_file(self, filepath):
gamever = self._installer.get_game_version()
if gamever:
print("Current game installation detected. ({})".format(self._installer.get_game_version()))
print("Archive game version: " + self._installer.get_archive_version(filepath))
if not self._ask("Do you want to update the game? (from {})".format(filepath)):
print("Aborting update process.")
return
self._update_from_archive(filepath)
print("Game updated successfully.")
else:
print("No game installation detected.")
if not self._ask("Do you want to install the game? ({})".format(filepath)):
print("Aborting installation process.")
return
raise NotImplementedError("Install from file not implemented yet.")
# print("Game installed successfully.")
def install_game(self):
# TODO
raise NotImplementedError("Install game is not implemented.")
@ -46,8 +94,11 @@ def main():
default_dirs.user_cache_dir))
parser.add_argument("-S", "--install", action="store_true",
help="Install/update the game (if not already installed, else do nothing)")
parser.add_argument("-U", "--install-from-file", action="store_true",
help="Install the game from the game archive (if not already installed, \
parser.add_argument("-U", "--install-from-file", action="store", type=Path, default=None,
help="Install the game from an archive (if not already installed, \
else update from archive)")
parser.add_argument("-Uv", "--install-voiceover-from-file", action="store", type=Path, default=None,
help="Install the voiceover from an archive (if not already installed, \
else update from archive)")
parser.add_argument("-Sp", "--patch", action="store_true",
help="Patch the game (if not already patched, else do nothing)")
@ -66,12 +117,25 @@ def main():
help="Do not ask any for confirmation. (Ignored in interactive mode)")
args = parser.parse_args()
interactive_mode = not args.install and not args.install_from_file and not args.patch and not args.update and not \
args.remove and not args.remove_patch and not args.remove_voiceover and not args.get_game_version
ui = UI(args.dir, args.noconfirm)
args.remove and not args.remove_patch and not args.remove_voiceover and not args.get_game_version and not \
args.install_voiceover_from_file
if args.temporary_dir:
args.temporary_dir.mkdir(parents=True, exist_ok=True)
ui = UI(args.dir, args.noconfirm, args.temporary_dir)
if args.install and args.update:
raise ValueError("Cannot specify both --install and --update arguments.")
if args.install_from_file and args.update:
raise ValueError("Cannot specify both --install-from-file and --update arguments.")
if args.install_voiceover_from_file and args.update:
raise ValueError("Cannot specify both --install-voiceover-from-file and --update arguments.")
if args.install_from_file and args.install:
raise ValueError("Cannot specify both --install-from-file and --install arguments.")
if args.get_game_version:
ui.get_game_version()
@ -80,7 +144,15 @@ def main():
if args.update:
ui.update_game()
return
if args.install_from_file:
ui.install_from_file(args.install_from_file)
if args.install_voiceover_from_file:
ui.install_voiceover_from_file(args.install_voiceover_from_file)
if args.remove_patch:
ui.revert_patch()
if interactive_mode:
ui.interactive_ui()

View File

@ -1,5 +1,6 @@
import re
import appdirs
import zipfile
from pathlib import Path
from configparser import ConfigParser
@ -7,9 +8,12 @@ from worthless import constants
from worthless.launcher import Launcher
def _read_version_from_game_file(globalgamemanagers: Path):
def read_version_from_game_file(globalgamemanagers: Path | bytes):
if isinstance(globalgamemanagers, Path):
with globalgamemanagers.open("rb") as f:
data = f.read().decode("ascii", errors="ignore")
else:
data = globalgamemanagers.decode("ascii", errors="ignore")
result = re.search(r"([1-9]+\.[0-9]+\.[0-9]+)_[\d]+_[\d]+", data)
if not result:
raise ValueError("Could not find version in game file")
@ -24,18 +28,24 @@ class Installer:
cfg.read(str(self._config_file))
return cfg.get("General", "game_version")
def get_game_data_name(self):
if self._overseas:
return "GenshinImpact_Data/"
else:
return "YuanShen_Data/"
def get_game_data_path(self):
return self._gamedir.joinpath(self.get_game_data_name())
# https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26
def get_game_version(self):
if self._config_file.exists():
return self._read_version_from_config()
else:
if self._overseas:
globalgamemanagers = self._gamedir.joinpath("./GenshinImpact_Data/globalgamemanagers")
else:
globalgamemanagers = self._gamedir.joinpath("./YuanShen_Data/globalgamemanagers")
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
if not globalgamemanagers.exists():
return
return _read_version_from_game_file(globalgamemanagers)
return read_version_from_game_file(globalgamemanagers)
def __init__(self, gamedir: str | Path = Path.cwd(), overseas: bool = True, data_dir: str | Path = None):
if isinstance(gamedir, str):
@ -55,6 +65,61 @@ class Installer:
self._launcher = Launcher(self._gamedir, overseas=self._overseas)
self._version = self.get_game_version()
def get_archive_version(self, game_archive: str | Path):
if not game_archive.exists():
raise FileNotFoundError(f"Game archive {game_archive} not found")
archive = zipfile.ZipFile(game_archive, 'r')
return read_version_from_game_file(archive.read(self.get_game_data_name() + "globalgamemanagers"))
def apply_voiceover(self, voiceover_archive: str | Path):
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._gamedir}")
if isinstance(voiceover_archive, str):
game_archive = Path(voiceover_archive).resolve()
if not voiceover_archive.exists():
raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found")
archive = zipfile.ZipFile(voiceover_archive, 'r')
archive.extractall(self._gamedir)
archive.close()
def update_game(self, game_archive: str | Path):
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._gamedir}")
if isinstance(game_archive, str):
game_archive = Path(game_archive).resolve()
if not game_archive.exists():
raise FileNotFoundError(f"Update archive {game_archive} not found")
archive = zipfile.ZipFile(game_archive, 'r')
deletefiles = archive.read("deletefiles.txt").decode().split("\n")
for file in deletefiles:
current_game_file = self._gamedir.joinpath(file)
if not current_game_file.exists():
continue
if current_game_file.is_file():
current_game_file.unlink(missing_ok=True)
archive.extractall(self._gamedir)
archive.close()
self._gamedir.joinpath("deletefiles.txt").unlink(missing_ok=True)
self._gamedir.joinpath("hdifffiles.txt").unlink(missing_ok=True)
async def install_game(self, force_reinstall: bool = False):
"""Installs the game to the current directory
If `from_version` is not specified, it will be taken from the game version.
If `to_version` is not specified, it will be taken from the game version.
"""
raise NotImplementedError("Not implemented yet")
# if not force:
# if self._temp_path.exists():
# raise FileExistsError(f"Directory {self._temp_path} already exists")
# self._temp_path.mkdir(parents=True, exist_ok=True)
# self._launcher.set_temp_path(self._temp_path)
# await self._launcher.download_game_diff_archive(from_version, to_version)
# await self._launcher.extract_game_diff_archive()
# await self._launcher.install_game_diff_archive()
# self._launcher.set_temp_path(None)
# self._temp_path.rmdir()
async def get_game_diff_archive(self, from_version: str = None):
"""Gets a diff archive from `from_version` to the latest one

View File

@ -1,15 +1,17 @@
import asyncio
import tarfile
import constants
import appdirs
import aiofiles
from pathlib import Path
import shutil
import aiohttp
import asyncio
from worthless import constants
from worthless.launcher import Launcher
from worthless.installer import Installer
class Patcher:
def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None):
def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None, overseas=True):
self._gamedir = gamedir
self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace('http://', 'https://')
if not data_dir:
@ -21,6 +23,8 @@ class Patcher:
data_dir = Path(data_dir)
self._patch_path = data_dir.joinpath("Patch")
self._temp_path = data_dir.joinpath("Temp/Patcher")
self._installer = Installer(self._gamedir, overseas=overseas, data_dir=self._temp_path)
self._launcher = Launcher(self._gamedir, overseas=overseas)
@staticmethod
async def _get(url, **kwargs) -> aiohttp.ClientResponse:
@ -96,10 +100,42 @@ class Patcher:
"""
pass
def revert_patch(self):
def _revert_file(self, original_file: str, base_file: Path, ignore_error=False):
original_path = self._gamedir.joinpath(original_file + ".bak").resolve()
target_file = self._gamedir.joinpath(original_file).resolve()
if original_path.exists():
if abs(base_file.stat().st_mtime_ns - original_path.stat().st_mtime_ns) > 3600:
if not ignore_error:
raise RuntimeError("{} is not for this game version.".format(original_path.name))
original_path.unlink(missing_ok=True)
else:
target_file.unlink(missing_ok=True)
original_path.rename(target_file)
def revert_patch(self, ignore_errors=True) -> None:
"""
Revert the patch (and revert the login door crash fix if patched)
:return: None
"""
pass
game_exec = self._gamedir.joinpath(asyncio.run(self._launcher.get_resource_info()).game.latest.entry)
revert_files = [
"UnityPlayer.dll",
self._installer.get_game_data_name() + "upload_crash.exe",
self._installer.get_game_data_name() + "Plugins/crashreport.exe",
self._installer.get_game_data_name() + "Plugins/xlua.dll",
]
for file in revert_files:
self._revert_file(file, game_exec, ignore_errors)
self._gamedir.joinpath("launcher.bat").unlink(missing_ok=True)
self._gamedir.joinpath("mhyprot2_running.reg").unlink(missing_ok=True)
def get_files(extensions):
all_files = []
for ext in extensions:
all_files.extend(self._gamedir.glob(ext))
return all_files
files = get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log'))
for file in files:
file.unlink(missing_ok=True)