WIP game patcher

Now worthless can patch the game using xdelta3-python
This commit is contained in:
tretrauit 2022-02-18 20:09:03 +07:00
parent 6286c080f8
commit 5a492c912c
No known key found for this signature in database
GPG Key ID: 862760FF1903319E
4 changed files with 68 additions and 14 deletions

View File

@ -1,4 +1,6 @@
aiohttp==3.8.1 aiohttp==3.8.1
appdirs~=1.4.4 appdirs~=1.4.4
aiofiles~=0.8.0 aiopath~=0.6.10
aiopath~=0.6.10 worthless~=1.1.1
setuptools~=59.3.0
xdelta3~=0.0.5

View File

@ -17,7 +17,7 @@ class UI:
self._gamedir = gamedir self._gamedir = gamedir
self._launcher = Launcher(gamedir) self._launcher = Launcher(gamedir)
self._installer = Installer(gamedir, data_dir=tempdir) self._installer = Installer(gamedir, data_dir=tempdir)
self._patcher = Patcher(gamedir) self._patcher = Patcher(gamedir, data_dir=tempdir)
def _ask(self, question): def _ask(self, question):
if self._noconfirm: if self._noconfirm:
@ -63,6 +63,17 @@ class UI:
self._patcher.revert_patch(True) self._patcher.revert_patch(True)
print("Patches reverted.") print("Patches reverted.")
def patch_game(self, login_fix: bool = False):
print("NOTE: Hereby you are violating the game's Terms of Service!")
print("Do not patch the game if you don't know what you are doing!")
if not self._ask("Do you want to patch the game? (This will overwrite your game files!)"):
print("Aborting patch process.")
return
print("Patching game...")
self._patcher.apply_patch(login_fix)
print("Game patched.")
print("Please refrain from sharing this project to public especially official channels, thank you.")
def install_from_file(self, filepath): def install_from_file(self, filepath):
gamever = self._installer.get_game_version() gamever = self._installer.get_game_version()
print("Archive game version: " + self._installer.get_game_archive_version(filepath)) print("Archive game version: " + self._installer.get_game_archive_version(filepath))
@ -88,7 +99,7 @@ class UI:
print("Aborting game installation process.") print("Aborting game installation process.")
return return
print("Downloading full game (This will take a long time)...") print("Downloading full game (This will take a long time)...")
asyncio.run(self._installer.download_full_game()) asyncio.run(self._installer.download_full_game(forced))
print("Installing game...") print("Installing game...")
self._install_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.name)) self._install_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.name))
@ -174,6 +185,8 @@ def main():
else update from archive)") else update from archive)")
parser.add_argument("-Sp", "--patch", action="store_true", parser.add_argument("-Sp", "--patch", action="store_true",
help="Patch the game (if not already patched, else do nothing)") help="Patch the game (if not already patched, else do nothing)")
parser.add_argument("--login-fix", action="store_true",
help="Patch the game to fix login issues (if not already patched, else do nothing)")
parser.add_argument("-Sy", "--update", action="store", type=str, parser.add_argument("-Sy", "--update", action="store", type=str,
help="Update the game and specified voiceover pack only (or install if not found)") help="Update the game and specified voiceover pack only (or install if not found)")
parser.add_argument("-Sv", "--update-voiceover", action="store", type=str, parser.add_argument("-Sv", "--update-voiceover", action="store", type=str,
@ -226,6 +239,9 @@ def main():
if args.install_voiceover_from_file: if args.install_voiceover_from_file:
ui.install_voiceover_from_file(args.install_voiceover_from_file) ui.install_voiceover_from_file(args.install_voiceover_from_file)
if args.patch:
ui.patch_game(args.login_fix)
if args.remove_patch: if args.remove_patch:
ui.revert_patch() ui.revert_patch()

View File

@ -84,12 +84,16 @@ class Installer:
self.temp_path.mkdir(parents=True, exist_ok=True) self.temp_path.mkdir(parents=True, exist_ok=True)
config_file = self._gamedir.joinpath("config.ini") config_file = self._gamedir.joinpath("config.ini")
self._config_file = config_file.resolve() self._config_file = config_file.resolve()
self._download_chunk = 8192
self._version = None self._version = None
self._overseas = overseas self._overseas = overseas
self._launcher = Launcher(self._gamedir, overseas=self._overseas) self._launcher = Launcher(self._gamedir, overseas=self._overseas)
self._version = self.get_game_version() self._version = self.get_game_version()
async def _download_file(self, file_url: str, file_name: str, file_len: int = None): def set_download_chunk(self, chunk: int):
self._download_chunk = chunk
async def _download_file(self, file_url: str, file_name: str, file_len: int = None, overwrite=False):
""" """
Download file name to temporary directory, Download file name to temporary directory,
:param file_url: :param file_url:
@ -98,9 +102,10 @@ class Installer:
""" """
params = {} params = {}
file_path = AsyncPath(self.temp_path).joinpath(file_name) file_path = AsyncPath(self.temp_path).joinpath(file_name)
if overwrite:
await file_path.unlink(missing_ok=True)
if file_path.exists(): if file_path.exists():
async with file_path.open("rb") as f: cur_len = len(await file_path.read_bytes())
cur_len = len(await f.read())
params |= { params |= {
"Range": f"bytes={cur_len}-{file_len if file_len else ''}" "Range": f"bytes={cur_len}-{file_len if file_len else ''}"
} }
@ -110,7 +115,7 @@ class Installer:
rsp = await session.get(file_url, params=params, timeout=None) rsp = await session.get(file_url, params=params, timeout=None)
rsp.raise_for_status() rsp.raise_for_status()
while True: while True:
chunk = await rsp.content.read(8192) chunk = await rsp.content.read(self._download_chunk)
if not chunk: if not chunk:
break break
async with file_path.open("ab") as f: async with file_path.open("ab") as f:
@ -207,7 +212,9 @@ class Installer:
# Update game version on local variable. # Update game version on local variable.
self._version = self.get_game_version() self._version = self.get_game_version()
async def download_full_game(self): async def download_full_game(self, overwrite: bool = False):
if self._version and not overwrite:
raise ValueError("Game already exists")
archive = await self._launcher.get_resource_info() archive = await self._launcher.get_resource_info()
if archive is None: if archive is None:
raise RuntimeError("Failed to fetch game resource info.") raise RuntimeError("Failed to fetch game resource info.")

View File

@ -1,4 +1,4 @@
import asyncio import xdelta3
import tarfile import tarfile
import appdirs import appdirs
from pathlib import Path from pathlib import Path
@ -23,6 +23,7 @@ class Patcher:
data_dir = Path(data_dir) data_dir = Path(data_dir)
self._patch_path = data_dir.joinpath("Patch") self._patch_path = data_dir.joinpath("Patch")
self._temp_path = data_dir.joinpath("Temp/Patcher") self._temp_path = data_dir.joinpath("Temp/Patcher")
self._overseas = overseas
self._installer = Installer(self._gamedir, overseas=overseas, data_dir=self._temp_path) self._installer = Installer(self._gamedir, overseas=overseas, data_dir=self._temp_path)
self._launcher = Launcher(self._gamedir, overseas=overseas) self._launcher = Launcher(self._gamedir, overseas=overseas)
@ -56,7 +57,6 @@ class Patcher:
pass pass
else: else:
return await archive.read() return await archive.read()
return
async def _download_repo(self): async def _download_repo(self):
if shutil.which("git"): if shutil.which("git"):
@ -91,14 +91,43 @@ class Patcher:
""" """
await self._download_repo() await self._download_repo()
def _patch_unityplayer(self):
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
patch = "unityplayer_patch_cn.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
unity_path = self._gamedir.joinpath("UnityPlayer.dll")
patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes()
patched_unity_bytes = xdelta3.decode(unity_path.read_bytes(), patch_bytes)
unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak"))
with Path(self._gamedir.joinpath("UnityPlayer.dll")).open("wb") as f:
f.write(patched_unity_bytes)
def _patch_xlua(self):
patch = "xlua_patch.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
data_name = self._installer.get_game_data_name()
xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))
patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes()
patched_xlua_bytes = xdelta3.decode(xlua_path.read_bytes(), patch_bytes)
xlua_path.rename(self._gamedir.joinpath("xlua.dll.bak"))
with Path(self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name)).open("wb") as f:
f.write(patched_xlua_bytes)
def apply_xlua_patch(self):
self._patch_xlua()
def apply_patch(self, crash_fix=False) -> None: def apply_patch(self, crash_fix=False) -> None:
""" """
Patch the game (and optionally patch the login door crash fix if specified) Patch the game (and optionally patch xLua if specified)
:param crash_fix: Whether to patch the login door crash fix or not :param crash_fix: Whether to patch xLua or not
:return: None :return: None
""" """
pass self._patch_unityplayer()
if crash_fix:
self._patch_xlua()
def _revert_file(self, original_file: str, base_file: Path, ignore_error=False): def _revert_file(self, original_file: str, base_file: Path, ignore_error=False):
original_path = self._gamedir.joinpath(original_file + ".bak").resolve() original_path = self._gamedir.joinpath(original_file + ".bak").resolve()