feat(cli): use cleo

It is much smarter than typer in terms of options parsing, and also once you know how to use it (with the holy outdated documentation) then it suits my purpose pretty well.
This commit is contained in:
tretrauit 2024-01-02 02:31:15 +07:00
parent 4db6e92b54
commit fe7b1945ef
17 changed files with 1202 additions and 758 deletions

1122
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -10,7 +10,7 @@ readme = "README.md"
python = "^3.11"
platformdirs = "^3.5.1"
requests = "^2.31.0"
typer = {extras = ["all"], version = "^0.9.0"}
cleo = "^2.1.0"
[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"

View File

@ -1,4 +1,4 @@
from vollerei.cli import app
from vollerei.cli import run
app()
run()

View File

@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from os import PathLike
from pathlib import Path
from typing import Any
class GameABC(ABC):
@ -8,6 +9,10 @@ class GameABC(ABC):
Manages the game installation
"""
path: Path
version_override: tuple[int, int, int] | None
channel_override: Any
def __init__(self, path: PathLike = None):
pass

View File

@ -1,18 +1,10 @@
from cleo.application import Application
from vollerei.cli import hsr
from vollerei.cli import utils
import typer
application = Application()
for command in hsr.commands:
application.add(command)
app = typer.Typer()
app.add_typer(hsr.app, name="hsr")
app.callback()
def callback(noconfirm: bool = False, silent: bool = False):
"""
An open-source launcher for anime games.
"""
utils.silent_message = silent
if noconfirm:
utils.no_confirm = noconfirm
def run():
application.run()

View File

@ -1,28 +1,43 @@
from traceback import print_exc
from cleo.commands.command import Command
from cleo.helpers import option
from platform import system
from vollerei.cli.utils import ask, msg
from vollerei.cli import utils
from vollerei.exceptions.game import GameError
from vollerei.hsr import Game, Patcher
from vollerei.exceptions.patcher import PatcherError, PatchUpdateError
from vollerei.hsr.patcher import PatchType
import typer
app = typer.Typer()
patcher = Patcher()
default_options = [
option(
"game-path",
"g",
description="Path to the game installation",
flag=False,
default=".",
),
option("patch-type", "p", description="Patch type", flag=False),
option("silent", "s", description="Silent mode"),
option("noconfirm", "y", description="Do not ask for confirmation"),
]
class State:
game: Game = None
@app.callback()
def callback(game_path: str = None, patch_type: str = None):
def callback(
command: Command,
):
"""
Manages the Honkai: Star Rail installation
This manages the game installation and handle the patching process automatically.
Base callback for all commands
"""
game_path = command.option("game-path")
patch_type = command.option("patch-type")
silent = command.option("silent")
noconfirm = command.option("noconfirm")
State.game: Game = Game(game_path)
if patch_type is None:
patch_type = PatchType.Jadeite
@ -31,112 +46,175 @@ def callback(game_path: str = None, patch_type: str = None):
elif isinstance(patch_type, int):
patch_type = PatchType(patch_type)
patcher.patch_type = patch_type
utils.silent_message = silent
if noconfirm:
utils.no_confirm = noconfirm
command.add_style("warn", fg="yellow")
@app.command()
def patch_type():
print("Patch type:", patcher.patch_type.name)
class PatchTypeCommand(Command):
name = "hsr patch type"
description = "Get the patch type of the game"
options = default_options
def handle(self):
callback(command=self)
self.line(f"<comment>Patch type:</comment> {patcher.patch_type.name}")
@app.command()
def update_patch():
patch_type()
msg("Updating patch...", end=" ")
try:
patcher.update_patch()
except PatchUpdateError as e:
print("FAILED")
print(f"Patch update failed with following error: {e} ({e.__context__})")
return False
msg("OK")
return True
class UpdatePatchCommand(Command):
name = "hsr patch update"
description = "Updates the patch"
options = default_options
def _patch_jadeite():
try:
msg("Installing patch...", end=" ")
jadeite_dir = patcher.patch_game(game=State.game)
except PatcherError as e:
print("FAILED")
print("Patching failed with following error:", e)
print_exc()
return
print("OK")
exe_path = jadeite_dir.joinpath("jadeite.exe")
msg("Jadeite executable is located at: ", end="")
print(exe_path)
msg()
msg("=" * 15)
msg(
"Installation succeeded, but note that you need to run the game using "
+ "Jadeite to use the patch."
)
msg()
msg(f'E.g: I_WANT_A_BAN=1 {exe_path} "{State.game.path}"')
msg()
msg("Please don't spread this project to public, we just want to play the game.")
msg(
"And for your own sake, please only use testing accounts, as there is an "
+ "extremely high risk of getting banned."
)
msg("=" * 15)
def _patch_astra(self):
try:
msg("Patching game...", end=" ")
patcher.patch_game(game=State.game)
except PatcherError as e:
print("FAILED")
print(f"Patching failed with following error: {e}")
return
print("OK")
def patch(self):
if system() == "Windows":
msg("Windows is supported officialy by the game, so no patching is needed.")
msg("By patching you are breaking the ToS, use at your own risk.")
if not ask("Do you want to patch the game?"):
print("Patching aborted.")
return
msg("Checking telemetry hosts...", end=" ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
msg("FOUND")
print("Telemetry hosts found:")
for host in telemetry_list:
print(f"{host}")
msg(
"To prevent the game from sending data about the patch, "
+ "we need to block these hosts."
)
if not ask("Do you want to block these hosts?"):
print("Patching aborted.")
print("Please block these hosts manually then try again.")
return
def handle(self):
callback(command=self)
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
print("Couldn't block telemetry hosts:", e)
if system() != "Windows":
print("Cannot continue, please block them manually then try again.")
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} ({e.__context__})</error>"
)
else:
progress.finish("<comment>Patch updated!</comment>")
class PatchInstallCommand(Command):
name = "hsr patch install"
description = "Installs the patch"
options = default_options
def jadeite(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
jadeite_dir = patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
print()
exe_path = jadeite_dir.joinpath("jadeite.exe")
self.line(f"Jadeite executable is located at: <question>{exe_path}</question>")
self.line(
"You need to <warn>run the game using Jadeite</warn> to use the patch."
)
self.line(
f'E.g: <question>I_WANT_A_BAN=1 {exe_path} "{State.game.path}"</question>'
)
print()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only <warn>use test accounts</warn>, as there is an <warn>extremely high risk of getting banned.</warn>"
)
def astra(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
self.line()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only use testing accounts, as there is an extremely high risk of getting banned."
)
def handle(self):
callback(command=self)
if system() == "Windows":
self.line(
"Windows is <comment>officialy supported</comment> by the game, so no patching is needed."
)
self.line(
"By patching the game, <warn>you are violating the ToS of the game.</warn>"
)
if not self.confirm("Do you want to patch the game?"):
self.line("<error>Patching aborted.</error>")
return
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Patching aborted.</error>")
self.line(
"<error>Please block these hosts manually then try again.</error>"
)
return
print("Continuing anyway...")
else:
msg("OK")
if not update_patch():
return
match patcher.patch_type:
case PatchType.Jadeite:
_patch_jadeite()
case PatchType.Astra:
_patch_astra()
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
self.line_error(
f"<error>Couldn't block telemetry hosts: {e.__context__}</error>"
)
# There's a good reason for this.
if system() != "Windows":
self.line(
"<error>Cannot continue, please block them manually then try again.</error>"
)
return
self.line("<warn>Continuing anyway...</warn>")
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} ({e.__context__})</error>"
)
else:
progress.finish("<comment>Patch updated.</comment>")
match patcher.patch_type:
case PatchType.Jadeite:
self.jadeite()
case PatchType.Astra:
self.astra()
@app.command()
def get_version():
try:
print(State.game.get_version_str())
except GameError as e:
print("Couldn't get game version:", e)
class GetVersionCommand(Command):
name = "hsr version"
description = "Gets the local game version"
options = default_options
def handle(self):
callback(command=self)
try:
self.line(
f"<comment>Version:</comment> {'.'.join(str(x) for x in State.game.get_version())}"
)
except GameError as e:
self.line_error(f"<error>Couldn't get game version: {e}</error>")
commands = [
PatchTypeCommand(),
UpdatePatchCommand(),
PatchInstallCommand(),
GetVersionCommand(),
]

View File

@ -1,18 +1,62 @@
from cleo.commands.command import Command
from threading import Thread
from time import sleep
no_confirm = False
silent_message = False
def ask(question: str):
if no_confirm or silent_message:
msg(question + " [Y/n]: Y")
return True
while True:
answer = input(question + " [Y/n]: ")
if answer.lower().strip() in ["y", "yes", ""]:
return True
# Pacman way, treat all other answers as no
else:
return False
def args_to_kwargs(args: list):
"""
Convert a list of arguments to a dict of keyword arguments.
"""
kwargs = {}
cur_key = None
for arg in args:
if "--" == arg[:2]:
arg_key = arg[2:].replace("-", "_")
kwargs[arg_key] = True
cur_key = arg_key
elif cur_key:
kwargs[cur_key] = arg
return kwargs
class ProgressIndicator:
def auto_advance(self):
"""
Automatically advance the progress indicator.
"""
while self.progress._started:
self.progress.advance()
sleep(self.progress._interval / 1000)
def __init__(
self, command: Command, interval: int = None, values: list[str] = None
):
self.command = command
if not interval:
interval = 100
if not values:
values = ["", "", "", "", "", "", "", "", "", ""]
self.progress = self.command.progress_indicator(
interval=interval, values=values
)
self.thread = Thread(target=self.auto_advance)
def start(self, message: str):
"""
Start the progress indicator.
"""
self.progress.start(message)
self.thread.start()
def finish(self, message: str, reset_indicator=False):
"""
Finish the progress indicator.
"""
self.progress.finish(message=message, reset_indicator=reset_indicator)
def msg(*args, **kwargs):

View File

@ -0,0 +1,119 @@
import concurrent
from io import IOBase
import json
from pathlib import Path
import zipfile
from vollerei.abc.launcher.game import GameABC
from vollerei.utils import HDiffPatch, HPatchZPatchError
_hdiff = HDiffPatch()
def apply_update_archive(
game: GameABC, archive_file: Path | IOBase, auto_repair: bool = True
) -> None:
# Most code here are copied from worthless-launcher.
# worthless-launcher uses asyncio for multithreading while this one uses
# ThreadPoolExecutor, probably better for this use case.
# We need `game` for the path and `auto_repair` for the auto repair option.
# Install HDiffPatch
_hdiff.hpatchz()
# Open archive
archive = zipfile.ZipFile(archive_file, "r")
# Get files list (we don't want to extract all of them)
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then
# it'll raise 31-4xxx error in Genshin)
for file in ["deletefiles.txt", "hdifffiles.txt"]:
try:
files.remove(file)
except ValueError:
pass
try:
# miHoYo loves CRLF
deletefiles = archive.read("deletefiles.txt").decode().split("\r\n")
except IOError:
pass
else:
for file_str in deletefiles:
file = game.path.joinpath(file)
if file == game.path:
# Don't delete the game folder
continue
if not file.relative_to(game.path):
# File is not in the game folder
continue
# Delete the file
file.unlink(missing_ok=True)
# hdiffpatch implementation
# Read hdifffiles.txt to get the files to patch
hdifffiles = []
for x in archive.read("hdifffiles.txt").decode().split("\r\n"):
try:
hdifffiles.append(json.loads(x.strip())["remoteName"])
except json.JSONDecodeError:
pass
# Patch function
def extract_and_patch(file, patch_file):
patchpath = game._cache.joinpath(patch_file)
# Delete old patch file if exists
patchpath.unlink(missing_ok=True)
# Extract patch file
archive.extract(patch_file, game.temppath)
old_suffix = file.suffix
file = file.rename(file.with_suffix(".bak"))
try:
_hdiff.patch_file(file, file.with_suffix(old_suffix), patchpath)
except HPatchZPatchError:
# Let the game download the file.
file.rename(file.with_suffix(old_suffix))
return
finally:
patchpath.unlink()
# Remove old file, since we don't need it anymore.
file.unlink()
def extract_or_repair(file):
# Extract file
try:
archive.extract(file, game.path)
except Exception as e:
# Repair file
if not auto_repair:
raise e
game._repair_file(game.path.joinpath(file))
# Multi-threaded patching
patch_jobs = []
for file_str in hdifffiles:
file = game.path.joinpath(file_str)
if not file.exists():
# Not patching since we don't have the file
continue
patch_file: str = file_str + ".hdiff"
# Remove hdiff files from files list to extract
files.remove(patch_file)
patch_jobs.append([extract_and_patch, [file, patch_file]])
# Create new ThreadPoolExecutor for patching
patch_executor = concurrent.futures.ThreadPoolExecutor()
for job in patch_jobs:
patch_executor.submit(job[0], *job[1])
patch_executor.shutdown(wait=True)
# Extract files from archive after we have filtered out the patch files
# Using ThreadPoolExecutor instead of archive.extractall() because
# archive.extractall() can crash with large archives, and it doesn't
# handle broken files.
extract_executor = concurrent.futures.ThreadPoolExecutor()
for file in files:
extract_executor.submit(extract_or_repair, file)
extract_executor.shutdown(wait=True)
# Close the archive
archive.close()

View File

@ -1,5 +1,5 @@
import requests
import concurrent
import concurrent.futures
from vollerei.utils import write_hosts
from vollerei.constants import TELEMETRY_HOSTS

View File

@ -8,6 +8,12 @@ class GameError(VollereiError):
class GameNotInstalledError(GameError):
"""Exception raised when the game is not installed."""
"""Game is not installed."""
pass
class PreDownloadNotAvailable(GameError):
"""Pre-download version is not available."""
pass

View File

@ -9,6 +9,8 @@ def get_resource(channel: GameChannel = GameChannel.Overseas) -> Resource:
"""
Get game resource information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.

View File

@ -1,20 +1,29 @@
from hashlib import md5
from io import IOBase
from os import PathLike
from pathlib import Path
from vollerei.exceptions.game import GameNotInstalledError
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.common import ConfigFile
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.exceptions.game import GameNotInstalledError, PreDownloadNotAvailable
from vollerei.hsr.constants import MD5SUMS
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.hsr.launcher import api
from vollerei.paths import cache_path
class Game(GameABC):
"""
Manages the game installation
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None):
self._path: Path | None = Path(path) if path else None
self._cache: Path = cache_path.joinpath("game")
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@ -76,6 +85,9 @@ class Game(GameABC):
def is_installed(self) -> bool:
"""
Check if the game is installed.
Returns:
bool: True if the game is installed, False otherwise.
"""
if self._path is None:
return False
@ -89,7 +101,18 @@ class Game(GameABC):
return False
return True
def _get_version_config(self) -> tuple[int, int, int]:
def get_version_config(self) -> tuple[int, int, int]:
"""
Get the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
This returns (0, 0, 0) if the version could not be found.
Returns:
tuple[int, int, int]: Game version.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
return (0, 0, 0)
@ -115,10 +138,12 @@ class Game(GameABC):
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version. (Doesn't work with AAGL-based launchers)
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed)
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
@ -170,22 +195,23 @@ class Game(GameABC):
except Exception:
pass
# Fallback to config.ini
return self._get_version_config()
return self.get_version_config()
def get_version_str(self) -> str:
def version_as_str(self, version: tuple[int, int, int]) -> str:
"""
Same as get_version, but returns a string instead.
Convert a version tuple to a string.
Returns:
str: The version as a string.
"""
return ".".join(str(i) for i in self.get_version())
return ".".join(str(i) for i in version)
def get_channel(self) -> GameChannel:
"""
Get the current game channel.
Only works for Star Rail version 1.0.5, other versions will return None
Only works for Star Rail version 1.0.5, other versions will return the
overridden channel or None if no channel is overridden.
This is not needed for game patching, since the patcher will automatically
detect the channel.
@ -209,3 +235,71 @@ class Game(GameABC):
return GameChannel.Overseas
else:
return
def _get_game(self, pre_download: bool) -> resource.Game:
channel = self._channel_override or self.get_channel()
if pre_download:
game = api.get_resource(channel=channel).pre_download_game
if not game:
raise PreDownloadNotAvailable("Pre-download version is not available.")
return game
return api.get_resource(channel=channel).game
def get_update(self, pre_download: bool = False) -> resource.Diff | None:
"""
Get the current game update.
Returns a `Diff` object that contains the update information or
None if the game is not installed or already up-to-date.
"""
if not self.is_installed():
return None
version = self.version_as_str(self._version_override or self.get_version())
for diff in self._get_game(pre_download=pre_download).diffs:
if diff.version == version:
return diff
return None
def apply_update_archive(
self, archive_file: PathLike | IOBase, auto_repair: bool = True
) -> None:
"""
Apply an update archive to the game, it can be the game update or a
voicepack update.
`archive_file` can be a path to the archive file or a file-like object,
like if you have very high amount of RAM and want to download the update
to memory instead of disk, this can be useful for you.
`auto_repair` is used to determine whether to repair the file if it's
broken. If it's set to False, then it'll raise an exception if the file
is broken.
Args:
archive_file (PathLike | IOBase): The archive file.
auto_repair (bool, optional): Whether to repair the file if it's broken.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not isinstance(archive_file, IOBase):
archive_file = Path(archive_file)
# Hello hell again, dealing with HDiffPatch and all the things again.
functions.apply_update_archive(self, archive_file, auto_repair=auto_repair)
def install_update(
self, update_info: resource.Diff = None, auto_repair: bool = True
):
"""
Install an update from a `Diff` object.
You may want to download the update manually and pass it to
`apply_update_archive()` instead for better control.
Args:
update_info (Diff, optional): The update information. Defaults to None.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not update_info:
update_info = self.get_update()
pass

View File

@ -3,21 +3,38 @@ from os import PathLike
from platformdirs import PlatformDirs
base_paths = PlatformDirs("vollerei", "tretrauit", roaming=True)
cache_path = base_paths.site_cache_path
data_path = base_paths.site_data_path
tools_data_path = data_path.joinpath("tools")
tools_cache_path = cache_path.joinpath("tools")
launcher_cache_path = cache_path.joinpath("launcher")
utils_cache_path = cache_path.joinpath("utils")
class Paths:
"""
Manages the paths
"""
def change_base_path(path: PathLike):
path = Path(path)
global base_paths, tools_data_path, tools_cache_path, launcher_cache_path, utils_cache_path, cache_path, data_path
cache_path = path.joinpath("cache")
data_path = path.joinpath("data")
base_paths = PlatformDirs("vollerei", "tretrauit", roaming=True)
cache_path = base_paths.site_cache_path
data_path = base_paths.site_data_path
tools_data_path = data_path.joinpath("tools")
tools_cache_path = cache_path.joinpath("tools")
launcher_cache_path = cache_path.joinpath("launcher")
utils_cache_path = cache_path.joinpath("utils")
@staticmethod
def set_base_path(path: PathLike):
path = Path(path)
Paths.base_paths = PlatformDirs(
"vollerei", "tretrauit", roaming=True, base_path=path
)
Paths.cache_path = Paths.base_paths.site_cache_path
Paths.data_path = Paths.base_paths.site_data_path
Paths.tools_data_path = Paths.data_path.joinpath("tools")
Paths.tools_cache_path = Paths.cache_path.joinpath("tools")
Paths.launcher_cache_path = Paths.cache_path.joinpath("launcher")
Paths.utils_cache_path = Paths.cache_path.joinpath("utils")
# Aliases
base_paths = Paths.base_paths
cache_path = Paths.cache_path
data_path = Paths.data_path
tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path

View File

@ -10,20 +10,56 @@ match platform.system():
case _:
def append_text(text: str, path: Path) -> None:
raise NotImplementedError(
"append_text is not implemented for this platform"
)
# Fallback to our own implementation
# Will NOT work if we don't have permission to write to the file
try:
with path.open("a") as f:
f.write(text)
except FileNotFoundError:
with path.open(path, "w") as f:
f.write(text)
except (PermissionError, OSError) as e:
raise PermissionError(
"You don't have permission to write to the file."
) from e
# Re-exports
from vollerei.utils.git import Git
from vollerei.utils.xdelta3 import Xdelta3
from vollerei.utils.xdelta3 import Xdelta3, Xdelta3NotInstalledError, Xdelta3PatchError
from vollerei.utils.xdelta3.exceptions import Xdelta3Error
from vollerei.utils.hdiffpatch import (
HDiffPatch,
HPatchZPatchError,
NotInstalledError,
PlatformNotSupportedError as HPatchZPlatformNotSupportedError,
)
__all__ = ["Git", "Xdelta3", "download_and_extract"]
__all__ = [
"Git",
"Xdelta3",
"download_and_extract",
"HDiffPatch",
"write_hosts",
"append_text_to_file",
"Xdelta3Error",
"Xdelta3NotInstalledError",
"Xdelta3PatchError",
"HPatchZPatchError",
"NotInstalledError",
"HPatchZPlatformNotSupportedError",
]
def download_and_extract(url: str, path: Path) -> None:
"""
Download and extract a zip file to a path.
Args:
url (str): URL to download from.
path (Path): Path to extract to.
"""
rsp = requests.get(url, stream=True)
rsp.raise_for_status()
with BytesIO() as f:

View File

@ -6,12 +6,23 @@ from io import BytesIO
from shutil import which
from vollerei.constants import HDIFFPATCH_GIT_URL
from vollerei.paths import tools_data_path
from vollerei.utils.hdiffpatch.exceptions import (
HPatchZPatchError,
NotInstalledError,
PlatformNotSupportedError,
)
class HDiffPatch:
"""
Quick wrapper around HDiffPatch binaries
Mostly copied from worthless-launcher
"""
def __init__(self):
self._data = tools_data_path.joinpath("hdiffpatch")
self._data.mkdir(parents=True, exist_ok=True)
self._hdiff = tools_data_path.joinpath("hdiffpatch")
self._hdiff.mkdir(parents=True, exist_ok=True)
@staticmethod
def _get_platform_arch():
@ -33,32 +44,42 @@ class HDiffPatch:
# Rip BSD they need to use Linux compatibility layer to run this
# (or use Wine if they prefer that)
raise RuntimeError("Only Windows, Linux and macOS are supported by HDiffPatch")
raise PlatformNotSupportedError(
"Only Windows, Linux and macOS are supported by HDiffPatch"
)
def _get_exec(self, exec_name) -> str | None:
def _get_binary(self, exec_name: str, recurse=None) -> str:
if which(exec_name):
return exec_name
if not self.data_path.exists():
return None
if not any(self.data_path.iterdir()):
return None
platform_arch_path = self.data_path.joinpath(self._get_platform_arch())
file = platform_arch_path.joinpath(exec_name)
if file.exists():
file.chmod(0o755)
return str(file)
if platform.system() == "Windows" and not exec_name.endswith(".exe"):
exec_name += ".exe"
if self._hdiff.exists() and any(self._hdiff.iterdir()):
file = self._hdiff.joinpath(self._get_platform_arch(), exec_name)
if file.exists():
if platform.system() != "Windows":
file.chmod(0o755)
return str(file)
if recurse is None:
recurse = 3
elif recurse == 0:
raise NotInstalledError(
"HDiffPatch is not installed and can't be automatically installed"
)
else:
recurse -= 1
self.download()
return self._get_binary(exec_name=exec_name, recurse=recurse)
def hpatchz(self) -> str | None:
hpatchz_name = "hpatchz" + (".exe" if platform.system() == "Windows" else "")
return self._get_exec(hpatchz_name)
return self._get_binary("hpatchz")
def patch_file(self, in_file, out_file, patch_file):
hpatchz = self.hpatchz()
if not hpatchz:
raise RuntimeError("hpatchz executable not found")
subprocess.check_call([hpatchz, "-f", in_file, patch_file, out_file])
try:
subprocess.check_call([self.hpatchz(), "-f", in_file, patch_file, out_file])
except subprocess.CalledProcessError as e:
raise HPatchZPatchError("Patch error") from e
async def _get_latest_release_info(self) -> dict:
def _get_latest_release_info(self) -> dict:
split = HDIFFPATCH_GIT_URL.split("/")
repo = split[-1]
owner = split[-2]
@ -67,7 +88,7 @@ class HDiffPatch:
params={"Headers": "Accept: application/vnd.github.v3+json"},
)
rsp.raise_for_status()
for asset in (await rsp.json())["assets"]:
for asset in rsp.json()["assets"]:
if not asset["name"].endswith(".zip"):
continue
if "linux" in asset["name"]:
@ -80,25 +101,24 @@ class HDiffPatch:
continue
return asset
async def get_latest_release_url(self):
asset = await self._get_latest_release_info()
def get_latest_release_url(self):
asset = self._get_latest_release_info()
return asset["browser_download_url"]
async def get_latest_release_name(self):
asset = await self._get_latest_release_info()
def get_latest_release_name(self):
asset = self._get_latest_release_info()
return asset["name"]
async def download(self):
def download(self):
"""
Download the latest release of HDiffPatch.
"""
url = await self.get_latest_release_url()
url = self.get_latest_release_url()
if not url:
raise RuntimeError("Unable to find latest release")
file = BytesIO()
with requests.get(url, stream=True) as r:
with open(file, "wb") as f:
for chunk in r.iter_content(chunk_size=32768):
f.write(chunk)
for chunk in r.iter_content(chunk_size=32768):
file.write(chunk)
with ZipFile(file) as z:
z.extractall(self._data)
z.extractall(self._hdiff)

View File

@ -10,13 +10,19 @@ class HPatchZError(HDiffPatchError):
pass
class HPatchZPatchError(HPatchZError):
"""Raised when hpatchz patch fails"""
pass
class NotInstalledError(HPatchZError):
"""Raised when HDiffPatch is not installed"""
pass
class HPatchZPatchError(HPatchZError):
"""Raised when hpatchz patch fails"""
class PlatformNotSupportedError(HPatchZError):
"""Raised when HDiffPatch is not available for your platform"""
pass

View File

@ -5,7 +5,7 @@ from os import PathLike
from io import BytesIO
from zipfile import ZipFile
from shutil import which
from vollerei.paths import tools_cache_path
from vollerei.paths import tools_data_path
from vollerei.utils.xdelta3.exceptions import (
Xdelta3NotInstalledError,
Xdelta3PatchError,
@ -18,7 +18,7 @@ class Xdelta3:
"""
def __init__(self) -> None:
self._xdelta3_path = tools_cache_path.joinpath("xdelta3")
self._xdelta3_path = tools_data_path.joinpath("xdelta3")
self._xdelta3_path.mkdir(parents=True, exist_ok=True)
def _get_binary(self, recurse=None) -> str:
@ -26,7 +26,7 @@ class Xdelta3:
return "xdelta3"
if platform.system() == "Windows":
for path in self._xdelta3_path.glob("*.exe"):
return path
return str(path)
if recurse is None:
recurse = 3
elif recurse == 0:
@ -36,7 +36,7 @@ class Xdelta3:
else:
recurse -= 1
self.download()
return self.get_binary(recurse=recurse)
return self._get_binary(recurse=recurse)
raise Xdelta3NotInstalledError("xdelta3 is not installed")
def get_binary(self) -> str:
@ -63,9 +63,8 @@ class Xdelta3:
url = "https://github.com/jmacd/xdelta-gpl/releases/download/v3.1.0/xdelta3-3.1.0-i686.exe.zip"
file = BytesIO()
with requests.get(url, stream=True) as r:
with open(file, "wb") as f:
for chunk in r.iter_content(chunk_size=32768):
f.write(chunk)
for chunk in r.iter_content(chunk_size=32768):
file.write(chunk)
with ZipFile(file) as z:
z.extractall(self._xdelta3_path)