Compare commits

..

No commits in common. "7ed2b2e643c5b64c63fd8e838a5c40925fe7f8c1" and "fe7b1945efcfe0506822584f8c4cbd28bdc50ae0" have entirely different histories.

11 changed files with 46 additions and 393 deletions

View File

@ -3,7 +3,7 @@ from vollerei.cli import hsr
application = Application() application = Application()
for command in hsr.commands: for command in hsr.commands:
application.add(command()) application.add(command)
def run(): def run():

View File

@ -1,8 +1,6 @@
from cleo.commands.command import Command from cleo.commands.command import Command
from cleo.helpers import option, argument from cleo.helpers import option
from copy import deepcopy
from platform import system from platform import system
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.cli import utils from vollerei.cli import utils
from vollerei.exceptions.game import GameError from vollerei.exceptions.game import GameError
from vollerei.hsr import Game, Patcher from vollerei.hsr import Game, Patcher
@ -13,8 +11,6 @@ patcher = Patcher()
default_options = [ default_options = [
option("channel", "c", description="Game channel", flag=False, default="overseas"),
option("force", "f", description="Force the command to run"),
option( option(
"game-path", "game-path",
"g", "g",
@ -23,7 +19,6 @@ default_options = [
default=".", default=".",
), ),
option("patch-type", "p", description="Patch type", flag=False), option("patch-type", "p", description="Patch type", flag=False),
option("temporary-path", "t", description="Temporary path", flag=False),
option("silent", "s", description="Silent mode"), option("silent", "s", description="Silent mode"),
option("noconfirm", "y", description="Do not ask for confirmation"), option("noconfirm", "y", description="Do not ask for confirmation"),
] ]
@ -41,17 +36,9 @@ def callback(
""" """
game_path = command.option("game-path") game_path = command.option("game-path")
patch_type = command.option("patch-type") patch_type = command.option("patch-type")
channel = command.option("channel")
silent = command.option("silent") silent = command.option("silent")
noconfirm = command.option("noconfirm") noconfirm = command.option("noconfirm")
temporary_path = command.option("temporary-path") State.game: Game = Game(game_path)
if isinstance(channel, str):
channel = GameChannel[channel.capitalize()]
elif isinstance(channel, int):
channel = GameChannel(channel)
State.game: Game = Game(game_path, temporary_path)
if channel:
State.game.channel_override = channel
if patch_type is None: if patch_type is None:
patch_type = PatchType.Jadeite patch_type = PatchType.Jadeite
elif isinstance(patch_type, str): elif isinstance(patch_type, str):
@ -210,53 +197,6 @@ class PatchInstallCommand(Command):
self.astra() self.astra()
PatchCommand = deepcopy(PatchInstallCommand)
PatchCommand.name = "hsr patch"
class PatchTelemetryCommand(Command):
name = "hsr patch telemetry"
description = "Checks for telemetry hosts and block them."
options = default_options
def handle(self):
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Patching aborted.</error>")
self.line(
"<error>Please block these hosts manually then try again.</error>"
)
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
self.line_error(
f"<error>Couldn't block telemetry hosts: {e.__context__}</error>"
)
# There's a good reason for this.
if system() != "Windows":
self.line(
"<error>Cannot continue, please block them manually then try again.</error>"
)
return
self.line("<warn>Continuing anyway...</warn>")
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
class GetVersionCommand(Command): class GetVersionCommand(Command):
name = "hsr version" name = "hsr version"
description = "Gets the local game version" description = "Gets the local game version"
@ -272,125 +212,9 @@ class GetVersionCommand(Command):
self.line_error(f"<error>Couldn't get game version: {e}</error>") self.line_error(f"<error>Couldn't get game version: {e}</error>")
class UpdateCommand(Command):
name = "hsr update"
description = "Updates the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} ({e.__context__})</error>"
)
return
if update_diff is None:
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.latest.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
self.line("Downloading update package...")
out_path = State.game._cache.joinpath(update_diff.name)
try:
download_result = utils.download(
update_diff.path, out_path, file_len=update_diff.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(out_path, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
self.line("Setting version config... ")
self.set_version_config()
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
class ApplyUpdateArchive(Command):
name = "hsr update apply-archive"
description = "Applies the update archive to the local game"
arguments = [argument("path", description="Path to the update archive")]
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
update_archive = self.argument("path")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(update_archive, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
self.line("Setting version config... ")
try:
State.game.set_version_config()
except Exception as e:
self.line_error(f"<warn>Couldn't set version config: {e}</warn>")
self.line_error(
"This won't affect the overall experience, but if you're using the official launcher"
)
self.line_error(
"you may have to edit the file 'config.ini' manually to reflect the latest version."
)
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
commands = [ commands = [
ApplyUpdateArchive, PatchTypeCommand(),
GetVersionCommand, UpdatePatchCommand(),
PatchCommand, PatchInstallCommand(),
PatchInstallCommand, GetVersionCommand(),
PatchTelemetryCommand,
PatchTypeCommand,
UpdatePatchCommand,
UpdateCommand,
] ]

View File

@ -1,9 +1,6 @@
import requests
from cleo.commands.command import Command from cleo.commands.command import Command
from pathlib import Path
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from tqdm import tqdm
no_confirm = False no_confirm = False
@ -62,35 +59,6 @@ class ProgressIndicator:
self.progress.finish(message=message, reset_indicator=reset_indicator) self.progress.finish(message=message, reset_indicator=reset_indicator)
def download(url, out: Path, file_len: int = None, overwrite: bool = False) -> bool:
if overwrite:
out.unlink(missing_ok=True)
headers = {}
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status_code == 416:
return
# Sizes in bytes.
total_size = int(response.headers.get("content-length", 0))
block_size = 32768
with tqdm(total=total_size, unit="KB", unit_scale=True) as progress_bar:
with out.open("ab") as file:
for data in response.iter_content(block_size):
progress_bar.update(len(data))
file.write(data)
if total_size != 0 and progress_bar.n != total_size:
return False
return True
def msg(*args, **kwargs): def msg(*args, **kwargs):
""" """
Print but silentable Print but silentable

View File

@ -1,4 +1,4 @@
import concurrent.futures import concurrent
from io import IOBase from io import IOBase
import json import json
from pathlib import Path from pathlib import Path
@ -65,19 +65,13 @@ def apply_update_archive(
patchpath.unlink(missing_ok=True) patchpath.unlink(missing_ok=True)
# Extract patch file # Extract patch file
archive.extract(patch_file, game.temppath) archive.extract(patch_file, game.temppath)
file = file.rename(file.with_suffix(file.suffix + ".bak")) old_suffix = file.suffix
file = file.rename(file.with_suffix(".bak"))
try: try:
_hdiff.patch_file(file, file.with_suffix(""), patchpath) _hdiff.patch_file(file, file.with_suffix(old_suffix), patchpath)
except HPatchZPatchError: except HPatchZPatchError:
if auto_repair:
try:
game.repair_file(game.path.joinpath(file.with_suffix("")))
except Exception:
# Let the game download the file. # Let the game download the file.
file.rename(file.with_suffix("")) file.rename(file.with_suffix(old_suffix))
else:
# Let the game download the file.
file.rename(file.with_suffix(""))
return return
finally: finally:
patchpath.unlink() patchpath.unlink()
@ -92,7 +86,7 @@ def apply_update_archive(
# Repair file # Repair file
if not auto_repair: if not auto_repair:
raise e raise e
game.repair_file(game.path.joinpath(file)) game._repair_file(game.path.joinpath(file))
# Multi-threaded patching # Multi-threaded patching
patch_jobs = [] patch_jobs = []

View File

@ -13,30 +13,6 @@ class GameNotInstalledError(GameError):
pass pass
class GameAlreadyUpdatedError(GameError):
"""Game is already updated."""
pass
class GameAlreadyInstalledError(GameError):
"""Game is already installed."""
pass
class ScatteredFilesNotAvailableError(GameError):
"""Scattered files are not available."""
pass
class GameNotUpdatedError(GameError):
"""Game is not updated."""
pass
class PreDownloadNotAvailable(GameError): class PreDownloadNotAvailable(GameError):
"""Pre-download version is not available.""" """Pre-download version is not available."""

View File

@ -20,7 +20,7 @@ class LAUNCHER_API:
} }
LATEST_VERSION = (1, 6, 0) LATEST_VERSION = (1, 1, 0)
MD5SUMS = { MD5SUMS = {
"1.0.5": { "1.0.5": {
"cn": { "cn": {

View File

@ -17,7 +17,7 @@ def get_resource(channel: GameChannel = GameChannel.Overseas) -> Resource:
Returns: Returns:
Resource: Game resource information. Resource: Game resource information.
""" """
resource_path: dict = None resource_path: dict
match channel: match channel:
case GameChannel.Overseas: case GameChannel.Overseas:
resource_path = LAUNCHER_API.OS resource_path = LAUNCHER_API.OS

View File

@ -5,17 +5,11 @@ from pathlib import Path
from vollerei.abc.launcher.game import GameABC from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource from vollerei.common.api import resource
from vollerei.exceptions.game import ( from vollerei.exceptions.game import GameNotInstalledError, PreDownloadNotAvailable
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
ScatteredFilesNotAvailableError,
)
from vollerei.hsr.constants import MD5SUMS from vollerei.hsr.constants import MD5SUMS
from vollerei.hsr.launcher.enums import GameChannel from vollerei.hsr.launcher.enums import GameChannel
from vollerei.hsr.launcher import api from vollerei.hsr.launcher import api
from vollerei import paths from vollerei.paths import cache_path
from vollerei.utils import download
class Game(GameABC): class Game(GameABC):
@ -27,20 +21,16 @@ class Game(GameABC):
the property `channel_override` to the channel you want to use. the property `channel_override` to the channel you want to use.
""" """
def __init__(self, path: PathLike = None, cache_path: PathLike = None): def __init__(self, path: PathLike = None):
self._path: Path | None = Path(path) if path else None self._path: Path | None = Path(path) if path else None
if not cache_path: self._cache: Path = cache_path.joinpath("game")
cache_path = paths.cache_path
cache_path = Path(cache_path)
self._cache: Path = cache_path.joinpath("game/hsr/")
self._cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None self._channel_override: GameChannel | None = None
@property @property
def version_override(self) -> tuple[int, int, int] | None: def version_override(self) -> tuple[int, int, int] | None:
""" """
Overrides the game version. Override the game version.
This can be useful if you want to override the version of the game This can be useful if you want to override the version of the game
and additionally working around bugs. and additionally working around bugs.
@ -56,7 +46,7 @@ class Game(GameABC):
@property @property
def channel_override(self) -> GameChannel | None: def channel_override(self) -> GameChannel | None:
""" """
Overrides the game channel. Override the game channel.
Because game channel detection isn't implemented yet, you may need Because game channel detection isn't implemented yet, you may need
to use this for some functions to work. to use this for some functions to work.
@ -75,7 +65,7 @@ class Game(GameABC):
@property @property
def path(self) -> Path | None: def path(self) -> Path | None:
""" """
Paths to the game folder. Path to the game folder.
""" """
return self._path return self._path
@ -85,7 +75,7 @@ class Game(GameABC):
def data_folder(self) -> Path: def data_folder(self) -> Path:
""" """
Paths to the game data folder. Path to the game data folder.
""" """
try: try:
return self._path.joinpath("StarRail_Data") return self._path.joinpath("StarRail_Data")
@ -94,7 +84,7 @@ class Game(GameABC):
def is_installed(self) -> bool: def is_installed(self) -> bool:
""" """
Checks if the game is installed. Check if the game is installed.
Returns: Returns:
bool: True if the game is installed, False otherwise. bool: True if the game is installed, False otherwise.
@ -113,7 +103,7 @@ class Game(GameABC):
def get_version_config(self) -> tuple[int, int, int]: def get_version_config(self) -> tuple[int, int, int]:
""" """
Gets the current installed game version from config.ini. Get the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`. and uses this file, instead you should use `get_version()`.
@ -140,22 +130,9 @@ class Game(GameABC):
return (0, 0, 0) return (0, 0, 0)
return version return version
def set_version_config(self):
"""
Sets the current installed game version to config.ini.
This method is meant to keep compatibility with the official launcher only.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
raise FileNotFoundError("config.ini not found.")
cfg = ConfigFile(cfg_file)
cfg.set("General", "game_version", self.get_version_str())
cfg.save()
def get_version(self) -> tuple[int, int, int]: def get_version(self) -> tuple[int, int, int]:
""" """
Gets the current installed game version. Get the current installed game version.
Credits to An Anime Team for the code that does the magic: Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49 https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
@ -220,18 +197,18 @@ class Game(GameABC):
# Fallback to config.ini # Fallback to config.ini
return self.get_version_config() return self.get_version_config()
def get_version_str(self) -> str: def version_as_str(self, version: tuple[int, int, int]) -> str:
""" """
Gets the current installed game version as a string. Convert a version tuple to a string.
Returns: Returns:
str: The version as a string. str: The version as a string.
""" """
return ".".join(str(i) for i in self.get_version()) return ".".join(str(i) for i in version)
def get_channel(self) -> GameChannel: def get_channel(self) -> GameChannel:
""" """
Gets the current game channel. Get the current game channel.
Only works for Star Rail version 1.0.5, other versions will return the Only works for Star Rail version 1.0.5, other versions will return the
overridden channel or None if no channel is overridden. overridden channel or None if no channel is overridden.
@ -257,15 +234,9 @@ class Game(GameABC):
case "os": case "os":
return GameChannel.Overseas return GameChannel.Overseas
else: else:
# if self._path.joinpath("StarRail_Data").is_dir(): return
# return GameChannel.Overseas
# elif self._path.joinpath("StarRail_Data").exists():
# return GameChannel.China
# No reliable method there, so we'll just return the overridden channel or
# fallback to overseas.
return self._channel_override or GameChannel.Overseas
def get_remote_game(self, pre_download: bool) -> resource.Game: def _get_game(self, pre_download: bool) -> resource.Game:
channel = self._channel_override or self.get_channel() channel = self._channel_override or self.get_channel()
if pre_download: if pre_download:
game = api.get_resource(channel=channel).pre_download_game game = api.get_resource(channel=channel).pre_download_game
@ -276,57 +247,24 @@ class Game(GameABC):
def get_update(self, pre_download: bool = False) -> resource.Diff | None: def get_update(self, pre_download: bool = False) -> resource.Diff | None:
""" """
Gets the current game update. Get the current game update.
Returns a `Diff` object that contains the update information or Returns a `Diff` object that contains the update information or
None if the game is not installed or already up-to-date. None if the game is not installed or already up-to-date.
""" """
if not self.is_installed(): if not self.is_installed():
return None return None
version = ( version = self.version_as_str(self._version_override or self.get_version())
".".join(x for x in self._version_override) for diff in self._get_game(pre_download=pre_download).diffs:
if self._version_override
else self.get_version_str()
)
for diff in self.get_remote_game(pre_download=pre_download).diffs:
if diff.version == version: if diff.version == version:
return diff return diff
return None return None
def repair_file(self, file: PathLike, pre_download: bool = False) -> None:
"""
Repairs a game file.
Args:
file (PathLike): The file to repair.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
file = Path(file)
if not file.is_relative_to(self._path):
raise ValueError("File is not in the game folder.")
game = self.get_remote_game(pre_download=pre_download)
if game.latest.decompressed_path is None:
raise ScatteredFilesNotAvailableError("Scattered files are not available.")
url = game.latest.decompressed_path + "/" + file.relative_to(self._path)
# Backup the file
file.rename(file.with_suffix(file.suffix + ".bak"))
try:
# Download the file
download(url, file.with_suffix(""))
except Exception:
# Restore the backup
file.rename(file.with_suffix(""))
raise
else:
# Delete the backup
file.unlink(missing_ok=True)
def apply_update_archive( def apply_update_archive(
self, archive_file: PathLike | IOBase, auto_repair: bool = True self, archive_file: PathLike | IOBase, auto_repair: bool = True
) -> None: ) -> None:
""" """
Applies an update archive to the game, it can be the game update or a Apply an update archive to the game, it can be the game update or a
voicepack update. voicepack update.
`archive_file` can be a path to the archive file or a file-like object, `archive_file` can be a path to the archive file or a file-like object,
@ -352,7 +290,7 @@ class Game(GameABC):
self, update_info: resource.Diff = None, auto_repair: bool = True self, update_info: resource.Diff = None, auto_repair: bool = True
): ):
""" """
Installs an update from a `Diff` object. Install an update from a `Diff` object.
You may want to download the update manually and pass it to You may want to download the update manually and pass it to
`apply_update_archive()` instead for better control. `apply_update_archive()` instead for better control.
@ -364,9 +302,4 @@ class Game(GameABC):
raise GameNotInstalledError("Game is not installed.") raise GameNotInstalledError("Game is not installed.")
if not update_info: if not update_info:
update_info = self.get_update() update_info = self.get_update()
if not update_info or update_info.version == self.get_version_str(): pass
raise GameAlreadyUpdatedError("Game is already updated.")
archive_file = self._cache.joinpath(update_info.name)
download(update_info.path, archive_file)
self.apply_update_archive(archive_file=archive_file, auto_repair=auto_repair)
self.set_version_config()

View File

@ -19,9 +19,11 @@ class Paths:
@staticmethod @staticmethod
def set_base_path(path: PathLike): def set_base_path(path: PathLike):
path = Path(path) path = Path(path)
Paths.base_paths = path Paths.base_paths = PlatformDirs(
Paths.cache_path = Paths.base_paths.joinpath("Cache") "vollerei", "tretrauit", roaming=True, base_path=path
Paths.data_path = Paths.base_paths )
Paths.cache_path = Paths.base_paths.site_cache_path
Paths.data_path = Paths.base_paths.site_data_path
Paths.tools_data_path = Paths.data_path.joinpath("tools") Paths.tools_data_path = Paths.data_path.joinpath("tools")
Paths.tools_cache_path = Paths.cache_path.joinpath("tools") Paths.tools_cache_path = Paths.cache_path.joinpath("tools")
Paths.launcher_cache_path = Paths.cache_path.joinpath("launcher") Paths.launcher_cache_path = Paths.cache_path.joinpath("launcher")
@ -36,15 +38,3 @@ tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path utils_cache_path = Paths.utils_cache_path
def set_base_path(path: PathLike):
Paths.set_base_path(path)
global base_paths, cache_path, data_path, tools_data_path, tools_cache_path, launcher_cache_path, utils_cache_path
base_paths = Paths.base_paths
cache_path = Paths.cache_path
data_path = Paths.data_path
tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path

View File

@ -52,38 +52,6 @@ __all__ = [
] ]
def download(
url: str, out: Path, file_len: int = None, overwrite: bool = False
) -> None:
"""
Download to a path.
Args:
url (str): URL to download from.
path (Path): Path to download to.
"""
if overwrite:
out.unlink(missing_ok=True)
headers = {}
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status == 416:
return
# Sizes in bytes.
block_size = 32768
with out.open("ab") as file:
for data in response.iter_content(block_size):
file.write(data)
return True
def download_and_extract(url: str, path: Path) -> None: def download_and_extract(url: str, path: Path) -> None:
""" """
Download and extract a zip file to a path. Download and extract a zip file to a path.

View File

@ -29,11 +29,11 @@ def write_text(text, path: str | Path):
"""Write text to a file using pkexec (friendly gui)""" """Write text to a file using pkexec (friendly gui)"""
if isinstance(path, Path): if isinstance(path, Path):
path = str(path) path = str(path)
exec_su(f'tee "{path}"', stdin=text) exec_su(f'pkexec tee "{path}"', stdin=text)
def append_text(text, path: str | Path): def append_text(text, path: str | Path):
"""Append text to a file using pkexec (friendly gui)""" """Append text to a file using pkexec (friendly gui)"""
if isinstance(path, Path): if isinstance(path, Path):
path = str(path) path = str(path)
exec_su(f'tee -a "{path}"', stdin=text) exec_su(f'pkexec tee -a "{path}"', stdin=text)