Compare commits

..

3 Commits

Author SHA1 Message Date
7ed2b2e643 feat(hsr): add auto_repair when hpatchz error 2024-01-03 02:46:07 +07:00
6db85bc439 feat(hsr): support continue download & apply-archive 2024-01-02 23:16:58 +07:00
6f030e79ce feat(hsr): download & install updates
WIP WIP WIP, probably will crash at some point.
Also the code in paths isn't working, so i'm merging them to constants in the next commit
2024-01-02 15:27:40 +07:00
11 changed files with 393 additions and 46 deletions

View File

@ -3,7 +3,7 @@ from vollerei.cli import hsr
application = Application()
for command in hsr.commands:
application.add(command)
application.add(command())
def run():

View File

@ -1,6 +1,8 @@
from cleo.commands.command import Command
from cleo.helpers import option
from cleo.helpers import option, argument
from copy import deepcopy
from platform import system
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.cli import utils
from vollerei.exceptions.game import GameError
from vollerei.hsr import Game, Patcher
@ -11,6 +13,8 @@ patcher = Patcher()
default_options = [
option("channel", "c", description="Game channel", flag=False, default="overseas"),
option("force", "f", description="Force the command to run"),
option(
"game-path",
"g",
@ -19,6 +23,7 @@ default_options = [
default=".",
),
option("patch-type", "p", description="Patch type", flag=False),
option("temporary-path", "t", description="Temporary path", flag=False),
option("silent", "s", description="Silent mode"),
option("noconfirm", "y", description="Do not ask for confirmation"),
]
@ -36,9 +41,17 @@ def callback(
"""
game_path = command.option("game-path")
patch_type = command.option("patch-type")
channel = command.option("channel")
silent = command.option("silent")
noconfirm = command.option("noconfirm")
State.game: Game = Game(game_path)
temporary_path = command.option("temporary-path")
if isinstance(channel, str):
channel = GameChannel[channel.capitalize()]
elif isinstance(channel, int):
channel = GameChannel(channel)
State.game: Game = Game(game_path, temporary_path)
if channel:
State.game.channel_override = channel
if patch_type is None:
patch_type = PatchType.Jadeite
elif isinstance(patch_type, str):
@ -197,6 +210,53 @@ class PatchInstallCommand(Command):
self.astra()
PatchCommand = deepcopy(PatchInstallCommand)
PatchCommand.name = "hsr patch"
class PatchTelemetryCommand(Command):
name = "hsr patch telemetry"
description = "Checks for telemetry hosts and block them."
options = default_options
def handle(self):
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Patching aborted.</error>")
self.line(
"<error>Please block these hosts manually then try again.</error>"
)
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
self.line_error(
f"<error>Couldn't block telemetry hosts: {e.__context__}</error>"
)
# There's a good reason for this.
if system() != "Windows":
self.line(
"<error>Cannot continue, please block them manually then try again.</error>"
)
return
self.line("<warn>Continuing anyway...</warn>")
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
class GetVersionCommand(Command):
name = "hsr version"
description = "Gets the local game version"
@ -212,9 +272,125 @@ class GetVersionCommand(Command):
self.line_error(f"<error>Couldn't get game version: {e}</error>")
commands = [
PatchTypeCommand(),
UpdatePatchCommand(),
PatchInstallCommand(),
GetVersionCommand(),
class UpdateCommand(Command):
name = "hsr update"
description = "Updates the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} ({e.__context__})</error>"
)
return
if update_diff is None:
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.latest.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
self.line("Downloading update package...")
out_path = State.game._cache.joinpath(update_diff.name)
try:
download_result = utils.download(
update_diff.path, out_path, file_len=update_diff.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(out_path, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
self.line("Setting version config... ")
self.set_version_config()
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
class ApplyUpdateArchive(Command):
name = "hsr update apply-archive"
description = "Applies the update archive to the local game"
arguments = [argument("path", description="Path to the update archive")]
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
update_archive = self.argument("path")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(update_archive, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
self.line("Setting version config... ")
try:
State.game.set_version_config()
except Exception as e:
self.line_error(f"<warn>Couldn't set version config: {e}</warn>")
self.line_error(
"This won't affect the overall experience, but if you're using the official launcher"
)
self.line_error(
"you may have to edit the file 'config.ini' manually to reflect the latest version."
)
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
commands = [
ApplyUpdateArchive,
GetVersionCommand,
PatchCommand,
PatchInstallCommand,
PatchTelemetryCommand,
PatchTypeCommand,
UpdatePatchCommand,
UpdateCommand,
]

View File

@ -1,6 +1,9 @@
import requests
from cleo.commands.command import Command
from pathlib import Path
from threading import Thread
from time import sleep
from tqdm import tqdm
no_confirm = False
@ -59,6 +62,35 @@ class ProgressIndicator:
self.progress.finish(message=message, reset_indicator=reset_indicator)
def download(url, out: Path, file_len: int = None, overwrite: bool = False) -> bool:
if overwrite:
out.unlink(missing_ok=True)
headers = {}
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status_code == 416:
return
# Sizes in bytes.
total_size = int(response.headers.get("content-length", 0))
block_size = 32768
with tqdm(total=total_size, unit="KB", unit_scale=True) as progress_bar:
with out.open("ab") as file:
for data in response.iter_content(block_size):
progress_bar.update(len(data))
file.write(data)
if total_size != 0 and progress_bar.n != total_size:
return False
return True
def msg(*args, **kwargs):
"""
Print but silentable

View File

@ -1,4 +1,4 @@
import concurrent
import concurrent.futures
from io import IOBase
import json
from pathlib import Path
@ -65,13 +65,19 @@ def apply_update_archive(
patchpath.unlink(missing_ok=True)
# Extract patch file
archive.extract(patch_file, game.temppath)
old_suffix = file.suffix
file = file.rename(file.with_suffix(".bak"))
file = file.rename(file.with_suffix(file.suffix + ".bak"))
try:
_hdiff.patch_file(file, file.with_suffix(old_suffix), patchpath)
_hdiff.patch_file(file, file.with_suffix(""), patchpath)
except HPatchZPatchError:
if auto_repair:
try:
game.repair_file(game.path.joinpath(file.with_suffix("")))
except Exception:
# Let the game download the file.
file.rename(file.with_suffix(old_suffix))
file.rename(file.with_suffix(""))
else:
# Let the game download the file.
file.rename(file.with_suffix(""))
return
finally:
patchpath.unlink()
@ -86,7 +92,7 @@ def apply_update_archive(
# Repair file
if not auto_repair:
raise e
game._repair_file(game.path.joinpath(file))
game.repair_file(game.path.joinpath(file))
# Multi-threaded patching
patch_jobs = []

View File

@ -13,6 +13,30 @@ class GameNotInstalledError(GameError):
pass
class GameAlreadyUpdatedError(GameError):
"""Game is already updated."""
pass
class GameAlreadyInstalledError(GameError):
"""Game is already installed."""
pass
class ScatteredFilesNotAvailableError(GameError):
"""Scattered files are not available."""
pass
class GameNotUpdatedError(GameError):
"""Game is not updated."""
pass
class PreDownloadNotAvailable(GameError):
"""Pre-download version is not available."""

View File

@ -20,7 +20,7 @@ class LAUNCHER_API:
}
LATEST_VERSION = (1, 1, 0)
LATEST_VERSION = (1, 6, 0)
MD5SUMS = {
"1.0.5": {
"cn": {

View File

@ -17,7 +17,7 @@ def get_resource(channel: GameChannel = GameChannel.Overseas) -> Resource:
Returns:
Resource: Game resource information.
"""
resource_path: dict
resource_path: dict = None
match channel:
case GameChannel.Overseas:
resource_path = LAUNCHER_API.OS

View File

@ -5,11 +5,17 @@ from pathlib import Path
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.exceptions.game import GameNotInstalledError, PreDownloadNotAvailable
from vollerei.exceptions.game import (
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
ScatteredFilesNotAvailableError,
)
from vollerei.hsr.constants import MD5SUMS
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.hsr.launcher import api
from vollerei.paths import cache_path
from vollerei import paths
from vollerei.utils import download
class Game(GameABC):
@ -21,16 +27,20 @@ class Game(GameABC):
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None):
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
self._path: Path | None = Path(path) if path else None
self._cache: Path = cache_path.joinpath("game")
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self._cache: Path = cache_path.joinpath("game/hsr/")
self._cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Override the game version.
Overrides the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
@ -46,7 +56,7 @@ class Game(GameABC):
@property
def channel_override(self) -> GameChannel | None:
"""
Override the game channel.
Overrides the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
@ -65,7 +75,7 @@ class Game(GameABC):
@property
def path(self) -> Path | None:
"""
Path to the game folder.
Paths to the game folder.
"""
return self._path
@ -75,7 +85,7 @@ class Game(GameABC):
def data_folder(self) -> Path:
"""
Path to the game data folder.
Paths to the game data folder.
"""
try:
return self._path.joinpath("StarRail_Data")
@ -84,7 +94,7 @@ class Game(GameABC):
def is_installed(self) -> bool:
"""
Check if the game is installed.
Checks if the game is installed.
Returns:
bool: True if the game is installed, False otherwise.
@ -103,7 +113,7 @@ class Game(GameABC):
def get_version_config(self) -> tuple[int, int, int]:
"""
Get the current installed game version from config.ini.
Gets the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
@ -130,9 +140,22 @@ class Game(GameABC):
return (0, 0, 0)
return version
def set_version_config(self):
"""
Sets the current installed game version to config.ini.
This method is meant to keep compatibility with the official launcher only.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
raise FileNotFoundError("config.ini not found.")
cfg = ConfigFile(cfg_file)
cfg.set("General", "game_version", self.get_version_str())
cfg.save()
def get_version(self) -> tuple[int, int, int]:
"""
Get the current installed game version.
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
@ -197,18 +220,18 @@ class Game(GameABC):
# Fallback to config.ini
return self.get_version_config()
def version_as_str(self, version: tuple[int, int, int]) -> str:
def get_version_str(self) -> str:
"""
Convert a version tuple to a string.
Gets the current installed game version as a string.
Returns:
str: The version as a string.
"""
return ".".join(str(i) for i in version)
return ".".join(str(i) for i in self.get_version())
def get_channel(self) -> GameChannel:
"""
Get the current game channel.
Gets the current game channel.
Only works for Star Rail version 1.0.5, other versions will return the
overridden channel or None if no channel is overridden.
@ -234,9 +257,15 @@ class Game(GameABC):
case "os":
return GameChannel.Overseas
else:
return
# if self._path.joinpath("StarRail_Data").is_dir():
# return GameChannel.Overseas
# elif self._path.joinpath("StarRail_Data").exists():
# return GameChannel.China
# No reliable method there, so we'll just return the overridden channel or
# fallback to overseas.
return self._channel_override or GameChannel.Overseas
def _get_game(self, pre_download: bool) -> resource.Game:
def get_remote_game(self, pre_download: bool) -> resource.Game:
channel = self._channel_override or self.get_channel()
if pre_download:
game = api.get_resource(channel=channel).pre_download_game
@ -247,24 +276,57 @@ class Game(GameABC):
def get_update(self, pre_download: bool = False) -> resource.Diff | None:
"""
Get the current game update.
Gets the current game update.
Returns a `Diff` object that contains the update information or
None if the game is not installed or already up-to-date.
"""
if not self.is_installed():
return None
version = self.version_as_str(self._version_override or self.get_version())
for diff in self._get_game(pre_download=pre_download).diffs:
version = (
".".join(x for x in self._version_override)
if self._version_override
else self.get_version_str()
)
for diff in self.get_remote_game(pre_download=pre_download).diffs:
if diff.version == version:
return diff
return None
def repair_file(self, file: PathLike, pre_download: bool = False) -> None:
"""
Repairs a game file.
Args:
file (PathLike): The file to repair.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
file = Path(file)
if not file.is_relative_to(self._path):
raise ValueError("File is not in the game folder.")
game = self.get_remote_game(pre_download=pre_download)
if game.latest.decompressed_path is None:
raise ScatteredFilesNotAvailableError("Scattered files are not available.")
url = game.latest.decompressed_path + "/" + file.relative_to(self._path)
# Backup the file
file.rename(file.with_suffix(file.suffix + ".bak"))
try:
# Download the file
download(url, file.with_suffix(""))
except Exception:
# Restore the backup
file.rename(file.with_suffix(""))
raise
else:
# Delete the backup
file.unlink(missing_ok=True)
def apply_update_archive(
self, archive_file: PathLike | IOBase, auto_repair: bool = True
) -> None:
"""
Apply an update archive to the game, it can be the game update or a
Applies an update archive to the game, it can be the game update or a
voicepack update.
`archive_file` can be a path to the archive file or a file-like object,
@ -290,7 +352,7 @@ class Game(GameABC):
self, update_info: resource.Diff = None, auto_repair: bool = True
):
"""
Install an update from a `Diff` object.
Installs an update from a `Diff` object.
You may want to download the update manually and pass it to
`apply_update_archive()` instead for better control.
@ -302,4 +364,9 @@ class Game(GameABC):
raise GameNotInstalledError("Game is not installed.")
if not update_info:
update_info = self.get_update()
pass
if not update_info or update_info.version == self.get_version_str():
raise GameAlreadyUpdatedError("Game is already updated.")
archive_file = self._cache.joinpath(update_info.name)
download(update_info.path, archive_file)
self.apply_update_archive(archive_file=archive_file, auto_repair=auto_repair)
self.set_version_config()

View File

@ -19,11 +19,9 @@ class Paths:
@staticmethod
def set_base_path(path: PathLike):
path = Path(path)
Paths.base_paths = PlatformDirs(
"vollerei", "tretrauit", roaming=True, base_path=path
)
Paths.cache_path = Paths.base_paths.site_cache_path
Paths.data_path = Paths.base_paths.site_data_path
Paths.base_paths = path
Paths.cache_path = Paths.base_paths.joinpath("Cache")
Paths.data_path = Paths.base_paths
Paths.tools_data_path = Paths.data_path.joinpath("tools")
Paths.tools_cache_path = Paths.cache_path.joinpath("tools")
Paths.launcher_cache_path = Paths.cache_path.joinpath("launcher")
@ -38,3 +36,15 @@ tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path
def set_base_path(path: PathLike):
Paths.set_base_path(path)
global base_paths, cache_path, data_path, tools_data_path, tools_cache_path, launcher_cache_path, utils_cache_path
base_paths = Paths.base_paths
cache_path = Paths.cache_path
data_path = Paths.data_path
tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path

View File

@ -52,6 +52,38 @@ __all__ = [
]
def download(
url: str, out: Path, file_len: int = None, overwrite: bool = False
) -> None:
"""
Download to a path.
Args:
url (str): URL to download from.
path (Path): Path to download to.
"""
if overwrite:
out.unlink(missing_ok=True)
headers = {}
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status == 416:
return
# Sizes in bytes.
block_size = 32768
with out.open("ab") as file:
for data in response.iter_content(block_size):
file.write(data)
return True
def download_and_extract(url: str, path: Path) -> None:
"""
Download and extract a zip file to a path.

View File

@ -29,11 +29,11 @@ def write_text(text, path: str | Path):
"""Write text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee "{path}"', stdin=text)
exec_su(f'tee "{path}"', stdin=text)
def append_text(text, path: str | Path):
"""Append text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee -a "{path}"', stdin=text)
exec_su(f'tee -a "{path}"', stdin=text)