worthless-launcher/worthless/linux.py
tretrauit a5659f7ff3
refactor: convert all task-intensive functions to async.
chore: rename gui.py to cli.py
fix: internal downloader can resume download now.
feat: add verify_game, verify_from_pkg_version, clear_cache to installer.py.
feat: add clear_cache to patcher.py.
fix: linux now check for pkexec before executing it.
fix: add get_name to voicepack.py, latest.py, diff.py to get name from path (since the developer didn't set a name to these files in the sdk url)
chore: remove deprecation message in read_version_from_config in installer.py
misc: use chunk from self._download_chunk instead of being hardcoded to 8192.
fix: is_telemetry_blocked will only wait 15s for a connection.
chore: move appdirs to constants.py

This commit refactor almost all functions to be compatible with asyncio, also restructured CLI to use asyncio.run on main function instead of executing it randomly.
Also prioritize the use of asyncio.gather, sometimes making tasks faster
2022-06-25 01:13:47 +07:00

41 lines
1.4 KiB
Python

import asyncio
from pathlib import Path
from aiopath import AsyncPath
class LinuxUtils:
"""Utilities for Linux-specific tasks.
"""
def __init__(self):
pass
@staticmethod
async def _exec_command(args):
"""Execute a command using pkexec (friendly gui)
"""
if not await AsyncPath("/usr/bin/pkexec").exists():
raise FileNotFoundError("pkexec not found.")
rsp = await asyncio.create_subprocess_shell(args)
await rsp.wait()
match rsp.returncode:
case 127:
raise OSError("Authentication failed.")
case 128:
raise RuntimeError("User cancelled the authentication.")
return rsp
async def write_text_to_file(self, text, file_path: str | Path | AsyncPath):
"""Write text to a file using pkexec (friendly gui)
"""
if isinstance(file_path, Path | AsyncPath):
file_path = str(file_path)
await self._exec_command('echo -e "{}" | pkexec tee {}'.format(text, file_path))
async def append_text_to_file(self, text, file_path: str | Path | AsyncPath):
"""Append text to a file using pkexec (friendly gui)
"""
if isinstance(file_path, Path):
file_path = str(file_path)
await self._exec_command('echo -e "{}" | pkexec tee -a {}'.format(text, file_path))