feat: implement telemetry blocking

Proper Linux support for telemetry blocking now.
This commit is contained in:
tretrauit 2023-06-21 04:33:56 +07:00
parent 197dc767a8
commit 4eba9e595b
8 changed files with 185 additions and 18 deletions

View File

@ -45,7 +45,9 @@ class HSR:
return
print("OK")
exe_path = jadeite_dir.joinpath("jadeite.exe")
print("jadeite executable is located at:", exe_path)
print()
print("Jadeite executable is located at:", exe_path)
print()
print(
"Installation succeeded, but note that you need to run the game using "
+ "Jadeite to use the patch."
@ -74,6 +76,24 @@ class HSR:
if not ask("Do you still want to patch?"):
print("Patching aborted.")
return
telemetry_list = self._patcher.check_telemetry()
if telemetry_list:
print("Telemetry hosts found: ")
for host in telemetry_list:
print(f" - {host}")
if not ask(
"Do you want to block these hosts? (Without blocking you can't use the patch)"
):
print("Patching aborted.")
return
try:
self._patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
print("Couldn't block telemetry hosts:", e)
if system() != "Windows":
print("Cannot continue, please block them manually then try again.")
return
print("Continuing anyway...")
if not self.__update_patch():
return
match self._patcher.patch_type:

View File

@ -3,10 +3,10 @@ no_confirm = False
def ask(question: str):
if no_confirm:
print(question + " [Y/n] Y")
print(question + " [Y/n]: Y")
return True
while True:
answer = input(question + " [Y/n] ")
answer = input(question + " [Y/n]: ")
if answer.lower().strip() in ["y", "yes", ""]:
return True
# Pacman way, treat all other answers as no

View File

@ -0,0 +1,31 @@
import requests
import concurrent
from vollerei.utils import write_hosts
from vollerei.constants import TELEMETRY_HOSTS
def _check_telemetry(host: str) -> str | None:
try:
requests.get(f"https://{host}/", timeout=15)
except (requests.ConnectionError, requests.Timeout, requests.HTTPError):
return
return host
def check_telemetry() -> list[str]:
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for host in TELEMETRY_HOSTS:
futures.append(executor.submit(_check_telemetry, host))
hosts = []
for future in concurrent.futures.as_completed(futures):
host = future.result()
if host:
hosts.append(host)
return hosts
def block_telemetry(telemetry_list: list[str] = None):
if not telemetry_list:
telemetry_list = check_telemetry()
write_hosts(telemetry_list)

View File

@ -3,7 +3,7 @@ from os import PathLike
from pathlib import Path
from enum import Enum
from vollerei.abc.launcher.game import GameABC
from vollerei.hsr.constants import md5sums
from vollerei.hsr.constants import MD5SUMS
class GameChannel(Enum):
@ -119,7 +119,7 @@ class Game(GameABC):
GameChannel: The current game channel.
"""
if self.get_version() == (1, 0, 5):
for channel, v in md5sums["1.0.5"].values():
for channel, v in MD5SUMS["1.0.5"].values():
for file, md5sum in v.values():
if (
md5(self._path.joinpath(file).read_bytes()).hexdigest()

View File

@ -1,7 +1,8 @@
from enum import Enum
from shutil import copy2
from shutil import copy2, rmtree
from distutils.version import StrictVersion
from vollerei.abc.patcher import PatcherABC
from vollerei.common import telemetry
from vollerei.exceptions.game import GameNotInstalledError
from vollerei.exceptions.patcher import (
VersionNotSupportedError,
@ -88,11 +89,9 @@ class Patcher(PatcherABC):
file_type = "cn"
case GameChannel.Overseas:
file_type = "os"
# Backup
# Backup and patch
for file in ["UnityPlayer.dll", "StarRailBase.dll"]:
game.path.joinpath(file).rename(game.path.joinpath(f"{file}.bak"))
# Patch
for file in ["UnityPlayer.dll", "StarRailBase.dll"]:
self._xdelta3.patch_file(
self._astra.joinpath(f"{file_type}/diffs/{file}.vcdiff"),
game.path.joinpath(f"{file}.bak"),
@ -124,6 +123,40 @@ class Patcher(PatcherABC):
self._update_jadeite()
return self._jadeite
def _unpatch_astra(self, game: Game):
if game.get_version() != (1, 0, 5):
raise VersionNotSupportedError(
"Only version 1.0.5 is supported by Astra patch."
)
self._update_astra()
file_type = None
match game.get_channel():
case GameChannel.China:
file_type = "cn"
case GameChannel.Overseas:
file_type = "os"
# Restore
for file in ["UnityPlayer.dll", "StarRailBase.dll"]:
if game.path.joinpath(f"{file}.bak").exists():
game.path.joinpath(file).unlink()
game.path.joinpath(f"{file}.bak").rename(game.path.joinpath(file))
# Remove files
for file in self._astra.joinpath(f"{file_type}/files/").rglob("*"):
if file.suffix == ".bat":
continue
file_rel = file.relative_to(self._astra.joinpath(f"{file_type}/files/"))
game_path = game.path.joinpath(file_rel)
if game_path.is_file():
game_path.unlink()
elif game_path.is_dir():
try:
game_path.rmdir()
except OSError:
pass
def _unpatch_jadeite(self):
rmtree(self._jadeite, ignore_errors=True)
def patch_game(self, game: Game):
if not game.is_installed():
raise PatcherError(GameNotInstalledError("Game is not installed"))
@ -134,10 +167,18 @@ class Patcher(PatcherABC):
return self._patch_jadeite()
def unpatch_game(self, game: Game):
pass
if not game.is_installed():
raise PatcherError(GameNotInstalledError("Game is not installed"))
match self._patch_type:
case PatchType.Astra:
self._unpatch_astra(game)
case PatchType.Jadeite:
self._unpatch_jadeite()
def check_telemetry(self):
pass
def check_telemetry(self) -> list[str]:
return telemetry.check_telemetry()
def block_telemetry(self):
pass
def block_telemetry(self, telemetry_list: list[str] = None):
if not telemetry_list:
telemetry_list = telemetry.check_telemetry()
telemetry.block_telemetry(telemetry_list)

View File

@ -1,8 +1,20 @@
import requests
import platform
from zipfile import ZipFile
from io import BytesIO
from pathlib import Path
match platform.system():
case "Linux":
from vollerei.utils.linux import append_text
case _:
def append_text(text: str, path: Path) -> None:
raise NotImplementedError(
"append_text is not implemented for this platform"
)
# Re-exports
from vollerei.utils.git import Git
from vollerei.utils.xdelta3 import Xdelta3
@ -20,3 +32,27 @@ def download_and_extract(url: str, path: Path) -> None:
f.seek(0)
with ZipFile(f) as z:
z.extractall(path)
def append_text_to_file(path: Path, text: str) -> None:
try:
with open(path, "a") as f:
f.write(text)
except FileNotFoundError:
with open(path, "w") as f:
f.write(text)
except (PermissionError, OSError):
append_text(text, path)
def write_hosts(hosts: list[str]) -> None:
hosts_str = ""
for line in hosts:
hosts_str += f"0.0.0.0 {line}\n"
match platform.system():
case "Linux":
append_text_to_file(Path("/etc/hosts"), hosts_str)
case "Windows":
append_text_to_file(
Path("C:/Windows/System32/drivers/etc/hosts"), hosts_str
)

View File

@ -35,7 +35,7 @@ class HDiffPatch:
# (or use Wine if they prefer that)
raise RuntimeError("Only Windows, Linux and macOS are supported by HDiffPatch")
def _get_hdiffpatch_exec(self, exec_name) -> str | None:
def _get_exec(self, exec_name) -> str | None:
if which(exec_name):
return exec_name
if not self.data_path.exists():
@ -48,12 +48,12 @@ class HDiffPatch:
file.chmod(0o755)
return str(file)
def _hpatchz(self):
def hpatchz(self) -> str | None:
hpatchz_name = "hpatchz" + (".exe" if platform.system() == "Windows" else "")
return self._get_hdiffpatch_exec(hpatchz_name)
return self._get_exec(hpatchz_name)
def patch_file(self, in_file, out_file, patch_file):
hpatchz = self.get_hpatchz_executable()
hpatchz = self.hpatchz()
if not hpatchz:
raise RuntimeError("hpatchz executable not found")
subprocess.check_call([hpatchz, "-f", in_file, patch_file, out_file])

39
vollerei/utils/linux.py Normal file
View File

@ -0,0 +1,39 @@
import subprocess
from pathlib import Path
__all__ = ["exec_su", "write_text", "append_text"]
def exec_su(args, stdin: str = None):
"""Execute a command using pkexec (friendly gui)"""
if not Path("/usr/bin/pkexec").exists():
raise FileNotFoundError("pkexec not found.")
proc = subprocess.Popen(
args, shell=True, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL
)
if stdin:
proc.stdin.write(stdin.encode())
proc.stdin.close()
proc.wait()
match proc.returncode:
case 127:
raise OSError("Authentication failed.")
case 128:
raise RuntimeError("User cancelled the authentication.")
return proc
def write_text(text, path: str | Path):
"""Write text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee "{path}"', stdin=text)
def append_text(text, path: str | Path):
"""Append text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee -a "{path}"', stdin=text)