feat(hsr): download & install updates

WIP WIP WIP, probably will crash at some point.
Also the code in paths isn't working, so i'm merging them to constants in the next commit
This commit is contained in:
tretrauit 2024-01-02 15:27:40 +07:00
parent fe7b1945ef
commit 6f030e79ce
10 changed files with 283 additions and 39 deletions

View File

@ -1,16 +1,21 @@
from cleo.commands.command import Command
from cleo.helpers import option
from platform import system
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.paths import set_base_path, base_paths
from vollerei.cli import utils
from vollerei.exceptions.game import GameError
from vollerei.hsr import Game, Patcher
from vollerei.exceptions.patcher import PatcherError, PatchUpdateError
from vollerei.hsr.patcher import PatchType
from tqdm import tqdm
import requests
patcher = Patcher()
default_options = [
option("channel", "c", description="Game channel", flag=False, default="overseas"),
option(
"game-path",
"g",
@ -19,6 +24,7 @@ default_options = [
default=".",
),
option("patch-type", "p", description="Patch type", flag=False),
option("temporary-path", "t", description="Temporary path", flag=False),
option("silent", "s", description="Silent mode"),
option("noconfirm", "y", description="Do not ask for confirmation"),
]
@ -36,9 +42,17 @@ def callback(
"""
game_path = command.option("game-path")
patch_type = command.option("patch-type")
channel = command.option("channel")
silent = command.option("silent")
noconfirm = command.option("noconfirm")
State.game: Game = Game(game_path)
temporary_path = command.option("temporary-path")
if isinstance(channel, str):
channel = GameChannel[channel.capitalize()]
elif isinstance(channel, int):
channel = GameChannel(channel)
State.game: Game = Game(game_path, temporary_path)
if channel:
State.game.channel_override = channel
if patch_type is None:
patch_type = PatchType.Jadeite
elif isinstance(patch_type, str):
@ -212,7 +226,73 @@ class GetVersionCommand(Command):
self.line_error(f"<error>Couldn't get game version: {e}</error>")
class UpdateCommand(Command):
name = "hsr update"
description = "Updates the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} ({e.__context__})</error>"
)
return
if update_diff is None:
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.latest.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
self.line("Downloading update package...")
out_path = State.game._cache.joinpath(update_diff.name)
try:
download_result = utils.download(
update_diff.path, out_path, file_len=update_diff.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(out_path, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
commands = [
UpdateCommand(),
PatchTypeCommand(),
UpdatePatchCommand(),
PatchInstallCommand(),

View File

@ -1,6 +1,9 @@
import requests
from cleo.commands.command import Command
from pathlib import Path
from threading import Thread
from time import sleep
from tqdm import tqdm
no_confirm = False
@ -59,6 +62,35 @@ class ProgressIndicator:
self.progress.finish(message=message, reset_indicator=reset_indicator)
def download(url, out: Path, file_len: int = None, overwrite: bool = False) -> bool:
if overwrite:
out.unlink(missing_ok=True)
headers = {}
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status_code == 416:
return
# Sizes in bytes.
total_size = int(response.headers.get("content-length", 0))
block_size = 32768
with tqdm(total=total_size, unit="KB", unit_scale=True) as progress_bar:
with out.open("ab") as file:
for data in response.iter_content(block_size):
progress_bar.update(len(data))
file.write(data)
if total_size != 0 and progress_bar.n != total_size:
return False
return True
def msg(*args, **kwargs):
"""
Print but silentable

View File

@ -1,4 +1,4 @@
import concurrent
import concurrent.futures
from io import IOBase
import json
from pathlib import Path
@ -65,13 +65,12 @@ def apply_update_archive(
patchpath.unlink(missing_ok=True)
# Extract patch file
archive.extract(patch_file, game.temppath)
old_suffix = file.suffix
file = file.rename(file.with_suffix(".bak"))
file = file.rename(file.with_suffix(file.suffix + ".bak"))
try:
_hdiff.patch_file(file, file.with_suffix(old_suffix), patchpath)
_hdiff.patch_file(file, file.with_suffix(""), patchpath)
except HPatchZPatchError:
# Let the game download the file.
file.rename(file.with_suffix(old_suffix))
file.rename(file.with_suffix(""))
return
finally:
patchpath.unlink()
@ -86,7 +85,7 @@ def apply_update_archive(
# Repair file
if not auto_repair:
raise e
game._repair_file(game.path.joinpath(file))
game.repair_file(game.path.joinpath(file))
# Multi-threaded patching
patch_jobs = []

View File

@ -13,6 +13,30 @@ class GameNotInstalledError(GameError):
pass
class GameAlreadyUpdatedError(GameError):
"""Game is already updated."""
pass
class GameAlreadyInstalledError(GameError):
"""Game is already installed."""
pass
class ScatteredFilesNotAvailableError(GameError):
"""Scattered files are not available."""
pass
class GameNotUpdatedError(GameError):
"""Game is not updated."""
pass
class PreDownloadNotAvailable(GameError):
"""Pre-download version is not available."""

View File

@ -20,7 +20,7 @@ class LAUNCHER_API:
}
LATEST_VERSION = (1, 1, 0)
LATEST_VERSION = (1, 6, 0)
MD5SUMS = {
"1.0.5": {
"cn": {

View File

@ -17,7 +17,7 @@ def get_resource(channel: GameChannel = GameChannel.Overseas) -> Resource:
Returns:
Resource: Game resource information.
"""
resource_path: dict
resource_path: dict = None
match channel:
case GameChannel.Overseas:
resource_path = LAUNCHER_API.OS

View File

@ -5,11 +5,17 @@ from pathlib import Path
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.exceptions.game import GameNotInstalledError, PreDownloadNotAvailable
from vollerei.exceptions.game import (
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
ScatteredFilesNotAvailableError,
)
from vollerei.hsr.constants import MD5SUMS
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.hsr.launcher import api
from vollerei.paths import cache_path
from vollerei import paths
from vollerei.utils import download
class Game(GameABC):
@ -21,16 +27,20 @@ class Game(GameABC):
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None):
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
self._path: Path | None = Path(path) if path else None
self._cache: Path = cache_path.joinpath("game")
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self._cache: Path = cache_path.joinpath("game/hsr/")
self._cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Override the game version.
Overrides the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
@ -46,7 +56,7 @@ class Game(GameABC):
@property
def channel_override(self) -> GameChannel | None:
"""
Override the game channel.
Overrides the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
@ -65,7 +75,7 @@ class Game(GameABC):
@property
def path(self) -> Path | None:
"""
Path to the game folder.
Paths to the game folder.
"""
return self._path
@ -75,7 +85,7 @@ class Game(GameABC):
def data_folder(self) -> Path:
"""
Path to the game data folder.
Paths to the game data folder.
"""
try:
return self._path.joinpath("StarRail_Data")
@ -84,7 +94,7 @@ class Game(GameABC):
def is_installed(self) -> bool:
"""
Check if the game is installed.
Checks if the game is installed.
Returns:
bool: True if the game is installed, False otherwise.
@ -103,7 +113,7 @@ class Game(GameABC):
def get_version_config(self) -> tuple[int, int, int]:
"""
Get the current installed game version from config.ini.
Gets the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
@ -130,9 +140,22 @@ class Game(GameABC):
return (0, 0, 0)
return version
def set_version_config(self):
"""
Sets the current installed game version to config.ini.
This method is meant to keep compatibility with the official launcher only.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
raise FileNotFoundError("config.ini not found.")
cfg = ConfigFile(cfg_file)
cfg.set("General", "game_version", self.get_version_str())
cfg.save()
def get_version(self) -> tuple[int, int, int]:
"""
Get the current installed game version.
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
@ -197,18 +220,18 @@ class Game(GameABC):
# Fallback to config.ini
return self.get_version_config()
def version_as_str(self, version: tuple[int, int, int]) -> str:
def get_version_str(self) -> str:
"""
Convert a version tuple to a string.
Gets the current installed game version as a string.
Returns:
str: The version as a string.
"""
return ".".join(str(i) for i in version)
return ".".join(str(i) for i in self.get_version())
def get_channel(self) -> GameChannel:
"""
Get the current game channel.
Gets the current game channel.
Only works for Star Rail version 1.0.5, other versions will return the
overridden channel or None if no channel is overridden.
@ -234,9 +257,15 @@ class Game(GameABC):
case "os":
return GameChannel.Overseas
else:
return
# if self._path.joinpath("StarRail_Data").is_dir():
# return GameChannel.Overseas
# elif self._path.joinpath("StarRail_Data").exists():
# return GameChannel.China
# No reliable method there, so we'll just return the overridden channel or
# fallback to overseas.
return self._channel_override or GameChannel.Overseas
def _get_game(self, pre_download: bool) -> resource.Game:
def get_remote_game(self, pre_download: bool) -> resource.Game:
channel = self._channel_override or self.get_channel()
if pre_download:
game = api.get_resource(channel=channel).pre_download_game
@ -247,24 +276,57 @@ class Game(GameABC):
def get_update(self, pre_download: bool = False) -> resource.Diff | None:
"""
Get the current game update.
Gets the current game update.
Returns a `Diff` object that contains the update information or
None if the game is not installed or already up-to-date.
"""
if not self.is_installed():
return None
version = self.version_as_str(self._version_override or self.get_version())
for diff in self._get_game(pre_download=pre_download).diffs:
version = (
".".join(x for x in self._version_override)
if self._version_override
else self.get_version_str()
)
for diff in self.get_remote_game(pre_download=pre_download).diffs:
if diff.version == version:
return diff
return None
def repair_file(self, file: PathLike, pre_download: bool = False) -> None:
"""
Repairs a game file.
Args:
file (PathLike): The file to repair.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
file = Path(file)
if not file.is_relative_to(self._path):
raise ValueError("File is not in the game folder.")
game = self.get_remote_game(pre_download=pre_download)
if game.latest.decompressed_path is None:
raise ScatteredFilesNotAvailableError("Scattered files are not available.")
url = game.latest.decompressed_path + "/" + file.relative_to(self._path)
# Backup the file
file.rename(file.with_suffix(file.suffix + ".bak"))
try:
# Download the file
download(url, file.with_suffix(""))
except Exception:
# Restore the backup
file.rename(file.with_suffix(""))
raise
else:
# Delete the backup
file.unlink(missing_ok=True)
def apply_update_archive(
self, archive_file: PathLike | IOBase, auto_repair: bool = True
) -> None:
"""
Apply an update archive to the game, it can be the game update or a
Applies an update archive to the game, it can be the game update or a
voicepack update.
`archive_file` can be a path to the archive file or a file-like object,
@ -285,12 +347,13 @@ class Game(GameABC):
archive_file = Path(archive_file)
# Hello hell again, dealing with HDiffPatch and all the things again.
functions.apply_update_archive(self, archive_file, auto_repair=auto_repair)
self.set_version_config()
def install_update(
self, update_info: resource.Diff = None, auto_repair: bool = True
):
"""
Install an update from a `Diff` object.
Installs an update from a `Diff` object.
You may want to download the update manually and pass it to
`apply_update_archive()` instead for better control.
@ -302,4 +365,8 @@ class Game(GameABC):
raise GameNotInstalledError("Game is not installed.")
if not update_info:
update_info = self.get_update()
pass
if not update_info or update_info.version == self.get_version_str():
raise GameAlreadyUpdatedError("Game is already updated.")
archive_file = self._cache.joinpath(update_info.name)
download(update_info.path, archive_file)
self.apply_update_archive(archive_file=archive_file, auto_repair=auto_repair)

View File

@ -19,11 +19,9 @@ class Paths:
@staticmethod
def set_base_path(path: PathLike):
path = Path(path)
Paths.base_paths = PlatformDirs(
"vollerei", "tretrauit", roaming=True, base_path=path
)
Paths.cache_path = Paths.base_paths.site_cache_path
Paths.data_path = Paths.base_paths.site_data_path
Paths.base_paths = path
Paths.cache_path = Paths.base_paths.joinpath("Cache")
Paths.data_path = Paths.base_paths
Paths.tools_data_path = Paths.data_path.joinpath("tools")
Paths.tools_cache_path = Paths.cache_path.joinpath("tools")
Paths.launcher_cache_path = Paths.cache_path.joinpath("launcher")
@ -38,3 +36,15 @@ tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path
def set_base_path(path: PathLike):
Paths.set_base_path(path)
global base_paths, cache_path, data_path, tools_data_path, tools_cache_path, launcher_cache_path, utils_cache_path
base_paths = Paths.base_paths
cache_path = Paths.cache_path
data_path = Paths.data_path
tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path

View File

@ -52,6 +52,38 @@ __all__ = [
]
def download(
url: str, out: Path, file_len: int = None, overwrite: bool = False
) -> None:
"""
Download to a path.
Args:
url (str): URL to download from.
path (Path): Path to download to.
"""
if overwrite:
out.unlink(missing_ok=True)
headers = {}
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status == 416:
return
# Sizes in bytes.
block_size = 32768
with out.open("ab") as file:
for data in response.iter_content(block_size):
file.write(data)
return True
def download_and_extract(url: str, path: Path) -> None:
"""
Download and extract a zip file to a path.

View File

@ -29,11 +29,11 @@ def write_text(text, path: str | Path):
"""Write text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee "{path}"', stdin=text)
exec_su(f'tee "{path}"', stdin=text)
def append_text(text, path: str | Path):
"""Append text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee -a "{path}"', stdin=text)
exec_su(f'tee -a "{path}"', stdin=text)