worthless-launcher/worthless/installer.py

622 lines
26 KiB
Python

import asyncio
import hashlib
import json
import platform
import re
import shutil
import zipfile
from configparser import ConfigParser
from pathlib import Path
import aiohttp
from worthless import constants
from worthless.launcher import Launcher
from worthless.launcherconfig import LauncherConfig
async def _download_file(file_url: str, file_name: str, file_path: Path | str, file_len: int = None, overwrite=False,
chunks=8192):
"""
Download file name to temporary directory,
:param file_url:
:param file_name:
:return:
"""
headers = {}
file_path = Path(file_path).joinpath(file_name)
if overwrite:
await file_path.unlink(missing_ok=True)
if file_path.exists():
cur_len = (file_path.stat()).st_size
headers |= {
"Range": f"bytes={cur_len}-{file_len if file_len else ''}"
}
else:
file_path.touch()
print(f"Downloading {file_url} to {file_path}...")
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60*60, sock_read=240)) as session:
rsp = await session.get(file_url, headers=headers, timeout=None)
if rsp.status == 416:
return
rsp.raise_for_status()
while True:
chunk = await rsp.content.read(chunks)
await asyncio.sleep(0)
if not chunk:
break
with file_path.open("ab") as f:
f.write(chunk)
def calculate_md5(file_to_calculate):
file_to_calculate = Path(file_to_calculate)
if not file_to_calculate.exists():
return ""
with file_to_calculate.open("rb") as f:
file_hash = hashlib.md5()
while chunk := f.read(1024 * 1024):
file_hash.update(chunk)
return file_hash.hexdigest()
class HDiffPatch:
def __init__(self, git_url=None, data_dir=None):
if not git_url:
git_url = constants.HDIFFPATCH_GIT_URL
self._git_url = git_url
if not data_dir:
self._appdirs = constants.APPDIRS
self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("HDiffPatch")
self.data_path = Path(self._appdirs.user_data_dir).joinpath("Tools/HDiffPatch")
else:
if not isinstance(data_dir, Path):
data_dir = Path(data_dir)
self.data_path = Path(data_dir).joinpath("Tools/HDiffPatch")
self.temp_path = data_dir.joinpath("Temp/HDiffPatch")
Path(self.temp_path).mkdir(parents=True, exist_ok=True)
@staticmethod
def _get_platform_arch():
match platform.system():
case "Windows":
match platform.architecture()[0]:
case "32bit":
return "windows32"
case "64bit":
return "windows64"
case "Linux":
match platform.architecture()[0]:
case "32bit":
return "linux32"
case "64bit":
return "linux64"
case "Darwin":
return "macos"
# Rip BSD they need to use Linux compatibility layer to run this (or use Wine if they prefer that)
raise RuntimeError("Unsupported platform")
def _get_hdiffpatch_exec(self, exec_name):
if shutil.which(exec_name):
return exec_name
if not self.data_path.exists():
return None
if not any(self.data_path.iterdir()):
return None
platform_arch_path = self.data_path.joinpath(self._get_platform_arch())
file = platform_arch_path.joinpath(exec_name)
if file.exists():
file.chmod(0o755)
return str(file)
return None
def get_hpatchz_executable(self):
hpatchz_name = "hpatchz" + (".exe" if platform.system() == "Windows" else "")
return self._get_hdiffpatch_exec(hpatchz_name)
async def patch_file(self, in_file, out_file, patch_file, error=False, wait=False):
print("executing hpatchz")
hpatchz = self.get_hpatchz_executable()
if not hpatchz:
raise RuntimeError("hpatchz executable not found")
proc = await asyncio.create_subprocess_exec(hpatchz, "-f", in_file, patch_file, out_file)
if not wait:
return proc
await proc.wait()
if error and proc.returncode != 0:
raise RuntimeError(f"Patching failed, return code is {proc.returncode}")
return proc
def get_hdiffz_executable(self):
hdiffz_name = "hdiffz" + (".exe" if platform.system() == "Windows" else "")
return self._get_hdiffpatch_exec(hdiffz_name)
async def _get_latest_release_info(self):
async with aiohttp.ClientSession() as session:
split = self._git_url.split("/")
repo = split[-1]
owner = split[-2]
rsp = await session.get("https://api.github.com/repos/{}/{}/releases/latest".format(owner, repo),
params={"Headers": "Accept: application/vnd.github.v3+json"})
rsp.raise_for_status()
for asset in (await rsp.json())["assets"]:
if asset["name"].endswith(".zip") and "linux" not in asset["name"] and "windows" not in asset["name"] \
and "macos" not in asset["name"] and "android" not in asset["name"]:
return asset
async def get_latest_release_url(self):
asset = await self._get_latest_release_info()
return asset["browser_download_url"]
async def get_latest_release_name(self):
asset = await self._get_latest_release_info()
return asset["name"]
async def download_latest_release(self, extract=True):
url = await self.get_latest_release_url()
name = await self.get_latest_release_name()
if not url:
raise RuntimeError("Unable to find latest release")
await _download_file(url, name, self.temp_path, overwrite=True)
if not extract:
return
with zipfile.ZipFile(self.temp_path.joinpath(name), 'r') as f:
await asyncio.to_thread(f.extractall, path=self.data_path)
class Installer:
def __init__(self, gamedir: str | Path = Path.cwd(),
overseas: bool = True, data_dir: str | Path = None):
if isinstance(gamedir, str | Path):
gamedir = Path(gamedir)
self._gamedir = gamedir
if not data_dir:
self._appdirs = constants.APPDIRS
self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("Installer")
else:
if isinstance(data_dir, str | Path):
data_dir = Path(data_dir)
self.temp_path = data_dir.joinpath("Temp/Installer/")
Path(self.temp_path).mkdir(parents=True, exist_ok=True)
config_file = self._gamedir.joinpath("config.ini")
self._config_file = config_file
self._download_chunk = 1024 * 1024
self._overseas = overseas
self._version = None
self._launcher = Launcher(self._gamedir, overseas=self._overseas)
self._hdiffpatch = HDiffPatch(data_dir=data_dir)
self._config = LauncherConfig(self._config_file, self._version)
self._game_version_re = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+")
async def _download_file(self, file_url: str, file_name: str, file_len: int = None, overwrite=False):
"""
Download file name to temporary directory,
:param file_url:
:param file_name:
:return:
"""
await _download_file(file_url, file_name, self.temp_path, file_len=file_len, overwrite=overwrite,
chunks=self._download_chunk)
async def read_version_from_config(self):
if not self._config_file.exists():
raise FileNotFoundError(f"Config file {self._config_file} not found")
cfg = ConfigParser()
await asyncio.to_thread(cfg.read, str(self._config_file))
return cfg.get("General", "game_version")
async def read_version_from_game_file(self, globalgamemanagers: Path | Path | bytes) -> str:
"""
Reads the version from the globalgamemanagers file. (Data/globalgamemanagers)
Uses `An Anime Game Launcher` method to read the version:
https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26
:return: Game version (ex 1.0.0)
"""
if isinstance(globalgamemanagers, Path | Path):
globalgamemanagers = Path(globalgamemanagers)
data = globalgamemanagers.read_text("ascii", errors="ignore")
else:
data = globalgamemanagers.decode("ascii", errors="ignore")
result = self._game_version_re.search(data)
if not result:
raise ValueError("Could not find version in game file")
return result.group(1)
@staticmethod
def voiceover_lang_translate(lang: str, base_language="game") -> str:
"""
Translates the voiceover language to the language code used by the game.
:param lang: Language to translate
:param base_language: Base language type (game/locale/both)
:return: Language code
"""
if base_language == "game" or base_language == "both":
match lang.lower():
case "english(us)":
return "en-us"
case "japanese":
return "ja-jp"
case "chinese":
return "zh-cn"
case "korean":
return "ko-kr"
if base_language == "locale" or base_language == "both":
match lang.lower().replace("_", "-"):
case "en-us":
return "English(US)"
case "ja-jp":
return "Japanese"
case "zh-cn":
return "Chinese"
case "ko-kr":
return "Korean"
# If nothing else matches
return lang
@staticmethod
async def get_voiceover_archive_language(voiceover_archive: str | Path | Path) -> str:
if isinstance(voiceover_archive, str | Path):
voiceover_archive = Path(voiceover_archive).resolve()
if not voiceover_archive.exists():
raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found")
with zipfile.ZipFile(voiceover_archive, 'r') as f:
for file in zipfile.Path(f).iterdir():
if file.name.endswith("_pkg_version"):
return file.name.split("_")[1]
@staticmethod
async def get_voiceover_archive_type(voiceover_archive: str | Path) -> bool:
"""
Gets voiceover archive type.
:param voiceover_archive:
:return: True if this is a full archive, else False.
"""
vo_lang = await Installer.get_voiceover_archive_language(voiceover_archive)
with zipfile.ZipFile(voiceover_archive, 'r') as f:
archive_path = zipfile.Path(f)
files = (await asyncio.to_thread(f.read, "Audio_{}_pkg_version".format(vo_lang))).decode().split("\n")
for file in files:
if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists():
return False
return True
def set_download_chunk(self, chunk: int):
self._download_chunk = chunk
def get_game_data_name(self):
if self._overseas:
return "GenshinImpact_Data/"
else:
return "YuanShen_Data/"
def get_game_data_path(self) -> Path:
return self._gamedir.joinpath(self.get_game_data_name())
async def get_game_archive_version(self, game_archive: str | Path):
game_archive = Path(game_archive)
if not game_archive.is_file():
raise FileNotFoundError(f"Game archive {game_archive} not found")
with zipfile.ZipFile(game_archive, 'r') as f:
return await self.read_version_from_game_file(
await asyncio.to_thread(f.read, self.get_game_data_name() + "globalgamemanagers")
)
async def get_game_version(self) -> str | None:
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
if not globalgamemanagers.exists():
try:
return await self.read_version_from_config()
except FileNotFoundError:
return
return await self.read_version_from_game_file(globalgamemanagers)
async def get_installed_voiceovers(self) -> list[str]:
"""
Returns a list of installed voiceovers.
:return: List of installed voiceovers
"""
voiceovers = []
async for file in self.get_game_data_path()\
.joinpath("StreamingAssets/AudioAssets/").iterdir():
if file.is_dir():
voiceovers.append(file.name)
return voiceovers
async def _update(self, game_archive: str | Path | Path):
archive = zipfile.ZipFile(game_archive, 'r')
if not self._hdiffpatch.get_hpatchz_executable():
await self._hdiffpatch.download_latest_release()
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then it'll
# raise 31-4xxx error ingame)
for file in ["deletefiles.txt", "hdifffiles.txt"]:
try:
files.remove(file)
except ValueError:
pass
try:
deletefiles = archive.read("deletefiles.txt").decode().split("\r\n")
for file in deletefiles:
current_game_file = Path(self._gamedir).joinpath(file)
if current_game_file == Path(self._gamedir):
# Don't delete the game folder
print("Game folder detected, not deleting:", current_game_file)
continue
if not current_game_file.relative_to(Path(self._gamedir)):
print("Not deleting (not relative to game):", current_game_file)
continue
print("Deleting", file)
current_game_file.unlink(missing_ok=True)
except Exception as e:
print(f"Error while reading deletefiles.txt: {e}")
# hdiffpatch implementation
try:
hdifffiles = []
for x in (await asyncio.to_thread(archive.read, "hdifffiles.txt")).decode().split("\n"):
if x:
hdifffiles.append(json.loads(x.strip())["remoteName"])
patch_jobs = []
cur_jobs = []
count = 0
for file in hdifffiles:
current_game_file = self._gamedir.joinpath(file)
if not current_game_file.exists():
print("File", file, "not found")
# Not patching since we don't have the file
continue
patch_file = str(file) + ".hdiff"
async def extract_and_patch(old_file, diff_file):
patch_path = self.temp_path.joinpath(diff_file)
patch_path.unlink(missing_ok=True)
try:
print(diff_file)
await asyncio.to_thread(archive.extract, diff_file, self.temp_path)
except FileExistsError:
print("Failed to extract diff file", diff_file)
return
old_suffix = old_file.suffix
old_file = old_file.rename(old_file.with_suffix(".bak"))
proc = await self._hdiffpatch.patch_file(old_file, old_file.with_suffix(old_suffix),
patch_path, wait=True)
patch_path.unlink()
if proc.returncode == 0:
old_file.unlink()
return
# Let the game download the file.
print("Failed to patch {}, reverting and let the in-game updater do the job...".format(
old_file.with_suffix(old_suffix))
)
old_file.rename(old_file.with_suffix(old_suffix))
files.remove(patch_file)
# Limit to 8 process running so it doesn't hang the PC.
if count == 7:
print("add job")
patch_jobs.append(cur_jobs)
cur_jobs = []
count = 0
cur_jobs.append(extract_and_patch(current_game_file, patch_file))
count += 1
# The last list may have count < 7 and the above code will not add them
patch_jobs.append(cur_jobs)
for jobs in patch_jobs:
print("exec jobs", jobs)
await asyncio.gather(*jobs)
except Exception as e:
print(f"Error while reading hdifffiles.txt: {e}")
await asyncio.to_thread(archive.extractall, self._gamedir, members=files)
archive.close()
async def update_game(self, game_archive: str | Path | Path):
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._gamedir}")
if isinstance(game_archive, str | Path):
game_archive = Path(game_archive).resolve()
if not game_archive.exists():
raise FileNotFoundError(f"Update archive {game_archive} not found")
await self._update(game_archive=game_archive)
# Update game version on local variable.
self._version = await self.get_game_version()
self.set_version_config()
def set_version_config(self, version: str = None):
if not version:
version = self._version
self._config.set_game_version(version)
self._config.save()
async def download_full_game(self, pre_download=False):
game = await self._get_game(pre_download)
if not game.latest.path == "":
archive_name = game.latest.path.split("/")[-1]
if calculate_md5(self.temp_path.joinpath(archive_name)) != game.latest.md5:
raise RuntimeError("mismatch md5 for downloaded game archive")
return
# Segment download
base_archive = None
for i, segment in enumerate(game.latest.segments):
archive_name = segment["path"].split("/")[-1]
if i == 0:
base_archive = archive_name = Path(archive_name).stem # Remove .001
if self.temp_path.joinpath(archive_name + ".downloaded").exists():
continue
await self._download_file(segment["path"], archive_name)
if i != 0:
with open(self.temp_path.joinpath(base_archive), 'ab') as f:
with open(self.temp_path.joinpath(archive_name), 'rb') as f2:
f.write(f2.read())
self.temp_path.joinpath(archive_name).unlink()
self.temp_path.joinpath(archive_name + ".downloaded").touch()
async def download_full_voiceover(self, language: str, pre_download=False):
game = await self._get_game(pre_download)
translated_lang = self.voiceover_lang_translate(language)
for vo in game.latest.voice_packs:
if vo.language == translated_lang:
await self._download_file(vo.path, vo.get_name(), vo.size)
async def uninstall_game(self):
await asyncio.to_thread(shutil.rmtree, self._gamedir, ignore_errors=True)
async def _extract_game_file(self, archive: str | Path | Path):
if isinstance(archive, str | Path):
archive = Path(archive).resolve()
if not archive.exists():
raise FileNotFoundError(f"'{archive}' not found")
with zipfile.ZipFile(archive, 'r') as f:
await asyncio.to_thread(f.extractall, path=self._gamedir)
async def apply_voiceover(self, voiceover_archive: str | Path):
# Since Voiceover packages are unclear about diff package or full package
# we will try to extract the voiceover package and apply it to the game
# making this function universal for both cases
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._gamedir}")
if isinstance(voiceover_archive, str | Path):
voiceover_archive = Path(voiceover_archive).resolve()
await self._update(voiceover_archive)
# await self._extract_game_file(voiceover_archive)
async def install_game(self, game_archive: str | Path | Path, force_reinstall: bool = False):
"""Installs the game to the current directory
If `force_reinstall` is True, the game will be uninstalled then reinstalled.
"""
if self.get_game_data_path().exists():
if not force_reinstall:
raise ValueError(f"Game is already installed in {self._gamedir}")
await self.uninstall_game()
self._gamedir.mkdir(parents=True, exist_ok=True)
await self._extract_game_file(game_archive)
self._version = await self.get_game_version()
self.set_version_config()
async def _get_game_version(self):
if self._version:
from_version = self._version
else:
from_version = self._version = await self.get_game_version()
return from_version
async def _get_game_resource(self):
game_resource = await self._launcher.get_resource_info()
if not game_resource:
raise ValueError("Could not fetch game resource")
return game_resource
async def _get_game(self, pre_download=False):
game_resource = await self._get_game_resource()
game = game_resource.game
if pre_download:
game = game_resource.pre_download_game
return game
async def download_game_update(self, from_version: str = None, pre_download=False):
if not from_version:
from_version = await self._get_game_version()
game = await self._get_game(pre_download=pre_download)
if self._version == game.latest.version:
raise ValueError("Game is already up to date.")
diff_archive = await self.get_game_diff_archive(from_version, pre_download)
if diff_archive is None:
raise ValueError("Game diff archive is not available for this version, please reinstall.")
await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size)
async def download_voiceover_update(self, language: str, from_version: str = None, pre_download=False):
if not from_version:
from_version = await self._get_game_version()
diff_archive = await self.get_voiceover_diff_archive(language, from_version, pre_download)
if diff_archive is None:
raise ValueError("Voiceover diff archive is not available for this version, please reinstall.")
await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size)
async def get_voiceover_diff_archive(self, lang: str, from_version: str = None, pre_download=False):
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the game version.
"""
if not from_version:
from_version = await self._get_game_version()
game = await self._get_game(pre_download=pre_download)
translated_lang = self.voiceover_lang_translate(lang)
for v in game.diffs:
if v.version.strip() != from_version.strip():
continue
for vo in v.voice_packs:
if vo.language == translated_lang:
return vo
async def get_game_diff_archive(self, from_version: str = None, pre_download=False):
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the game version.
"""
if not from_version:
from_version = await self._get_game_version()
game = await self._get_game(pre_download=pre_download)
for v in game.diffs:
if v.version == from_version:
return v
async def verify_from_pkg_version(self, pkg_version: Path, ignore_mismatch=False):
contents = pkg_version.read_text()
async def verify_file(file_to_verify, md5):
print("Verifying file:", file_to_verify)
file_md5 = await asyncio.to_thread(calculate_md5, file_to_verify)
if file_md5 == md5:
return None
if ignore_mismatch:
return file_to_verify, md5, file_md5
raise ValueError(f"MD5 does not match for {file_to_verify}, expected md5: {md5}, actual md5: {file_md5}")
verify_jobs = []
cur_jobs = []
count = 0
for content in contents.split("\r\n"):
if not content.strip():
continue
if count >= 7:
verify_jobs.append(cur_jobs)
cur_jobs = []
count = 0
info = json.loads(content)
cur_jobs.append(verify_file(self._gamedir.joinpath(info["remoteName"]), info["md5"]))
count += 1
verify_jobs.append(cur_jobs)
verify_result = []
for jobs in verify_jobs:
verify_result.extend(await asyncio.gather(*jobs))
failed_files = []
for file in verify_result:
if file is not None:
failed_files.append(file)
return None if not failed_files else failed_files
async def verify_game(self, pkg_version: str | Path | Path = None, ignore_mismatch=False):
if pkg_version is None:
pkg_version = self._gamedir.joinpath("pkg_version")
return await self.verify_from_pkg_version(pkg_version, ignore_mismatch)
async def clear_cache(self):
await asyncio.to_thread(shutil.rmtree, self.temp_path, ignore_errors=True)