import asyncio import tarfile import constants import appdirs import aiofiles from pathlib import Path import shutil import aiohttp class Patcher: def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None): self._gamedir = gamedir self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace('http://', 'https://') if not data_dir: self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) self._patch_path = Path(self._appdirs.user_data_dir).joinpath("Patch") self._temp_path = Path(self._appdirs.user_cache_dir) else: if not isinstance(data_dir, Path): data_dir = Path(data_dir) self._patch_path = data_dir.joinpath("Patch") self._temp_path = data_dir.joinpath("Temp") @staticmethod async def _get(url, **kwargs) -> aiohttp.ClientResponse: async with aiohttp.ClientSession() as session: rsp = await session.get(url, **kwargs) rsp.raise_for_status() return rsp async def _get_git_archive(self, archive_format="tar.gz", branch="master"): """ Get the git archive of the patch repository. This supports Gitea API and also introduce workaround for :return: Archive file in bytes """ # Replace http with https if self._patch_url.startswith(''): archive_url = self._patch_url + '/archive/master.{}'.format(archive_format) return await (await self._get(archive_url)).read() try: url_split = self._patch_url.split('//') git_server = url_split[0] git_owner, git_repo = url_split[1].split('/') archive_url = git_server + '/api/v1/repos/{}/{}/archive/{}.{}'.format( git_owner, git_repo, branch, archive_format ) archive = await self._get(archive_url) except aiohttp.ClientResponseError: pass else: return await return async def _download_repo(self): if shutil.which("git"): if not self._patch_path.exists() or not self._patch_path.is_dir() \ or not self._patch_path.joinpath(".git").exists(): await asyncio.create_subprocess_exec("git", "clone", self._patch_url, str(self._patch_path)) else: await asyncio.create_subprocess_exec("git", "pull", cwd=str(self._patch_path)) else: archive = await self._get_git_archive() if not archive: raise RuntimeError("Cannot download patch repository") with as tar: tar.extractall(self._patch_path) def override_patch_url(self, url) -> None: """ Override the patch url. :param url: Patch repository url, the url must be a valid git repository. :return: None """ self._patch_url = url async def download_patch(self) -> None: """ If `git` exists, this will clone the patch git url and save it to a temporary directory. Else, this will download the patch from the patch url and save it to a temporary directory. (Not reliable) :return: None """ await self._download_repo() def apply_patch(self, crash_fix=False) -> None: """ Patch the game (and optionally patch the login door crash fix if specified) :param crash_fix: Whether to patch the login door crash fix or not :return: None """ pass def revert_patch(self): """ Revert the patch (and revert the login door crash fix if patched) :return: None """ pass