from __future__ import annotations
# PKGBUILD related stuff that lilac uses (excluding APIs)
import os
import time
import subprocess
from typing import Dict, List, Optional, Union
from pathlib import Path
from contextlib import suppress
import pyalpm
from .vendor.myutils import safe_overwrite
from .const import _G, OFFICIAL_REPOS
from .cmd import UNTRUSTED_PREFIX
from .typing import PkgVers
_official_packages: Dict[str, int] = {}
_official_groups: Dict[str, int] = {}
_repo_package_versions: Dict[str, str] = {}
class ConflictWithOfficialError(Exception):
def __init__(self, groups, packages):
self.groups = groups
self.packages = packages
class DowngradingError(Exception):
def __init__(self, pkgname, built_version, repo_version):
self.pkgname = pkgname
self.built_version = built_version
self.repo_version = repo_version
def _load_timed_dict(
path: os.PathLike, deadline: int,
) -> Dict[str, int]:
data = {}
with suppress(FileNotFoundError), open(path) as f:
for line in f:
name, t_str = line.split(None, 1)
t = int(t_str)
if t >= deadline:
data[name] = t
return data
def _save_timed_dict(
path: os.PathLike, data: Dict[str, int],
) -> None:
data_str = ''.join(f'{k} {v}\n' for k, v in data.items())
safe_overwrite(str(path), data_str, mode='w')
def update_pacmandb(dbpath: Path, pacman_conf: Optional[str] = None,
*, quiet: bool = False) -> None:
stdout = subprocess.DEVNULL if quiet else None
for update_arg in ['-Sy', '-Fy']:
cmd: List[Union[str, Path]] = [
'fakeroot', 'pacman', update_arg, '--dbpath', dbpath,
]
if pacman_conf is not None:
cmd += ['--config', pacman_conf]
for _ in range(3):
p = subprocess.run(cmd, stdout = stdout)
if p.returncode == 0:
break
else:
p.check_returncode()
def update_data(dbpath: Path, pacman_conf: Optional[str],
*, quiet: bool = False) -> None:
update_pacmandb(dbpath, pacman_conf, quiet=quiet)
now = int(time.time())
deadline = now - 90 * 86400
pkgs = _load_timed_dict(dbpath / 'packages.txt', deadline)
groups = _load_timed_dict(dbpath / 'groups.txt', deadline)
H = pyalpm.Handle('/', str(dbpath))
for repo in OFFICIAL_REPOS:
db = H.register_syncdb(repo, 0)
pkgs.update((p.name, now) for p in db.pkgcache)
groups.update((g[0], now) for g in db.grpcache)
_save_timed_dict(dbpath / 'packages.txt', pkgs)
_save_timed_dict(dbpath / 'groups.txt', groups)
def load_data(dbpath: Path) -> None:
global _repo_package_versions
now = int(time.time())
deadline = now - 90 * 86400
_official_packages.update(
_load_timed_dict(dbpath / 'packages.txt', deadline))
_official_groups.update(
_load_timed_dict(dbpath / 'groups.txt', deadline))
if hasattr(_G, 'repo'):
H = pyalpm.Handle('/', str(dbpath))
db = H.register_syncdb(_G.repo.name, 0)
_repo_package_versions = {p.name: p.version for p in db.pkgcache}
def check_srcinfo() -> PkgVers:
srcinfo = get_srcinfo().decode('utf-8').splitlines()
bad_groups = []
bad_packages = []
pkgnames = []
for line in srcinfo:
line = line.strip()
if line.startswith('groups = '):
g = line.split()[-1]
if g in _official_groups:
bad_groups.append(g)
elif line.startswith('replaces = '):
pkg = line.split()[-1]
if pkg in _official_packages:
bad_packages.append(pkg)
elif line.startswith('pkgname = '):
pkgnames.append(line.split()[-1])
pkgvers = _get_package_version(srcinfo)
# check if the newly built package is older than the existing
# package in repos or not
built_version = str(pkgvers)
for pkgname in pkgnames:
try:
repo_version = _repo_package_versions[pkgname]
if pyalpm.vercmp(built_version, repo_version) < 0:
raise DowngradingError(pkgname, built_version, repo_version)
except KeyError:
# the newly built package is not in repos yet - fine
pass
if bad_groups or bad_packages:
raise ConflictWithOfficialError(bad_groups, bad_packages)
return pkgvers
[docs]
def get_srcinfo() -> bytes:
pwd = os.getcwd()
basename = os.path.basename(pwd)
# makepkg wants *.install file and write permissions to simply print out info :-(
extra_binds = ['--bind', pwd, f'/tmp/{basename}', '--chdir', f'/tmp/{basename}']
out = subprocess.check_output(
UNTRUSTED_PREFIX + extra_binds + ['makepkg', '--printsrcinfo'], # type: ignore
)
return out
def _get_package_version(srcinfo: List[str]) -> PkgVers:
epoch = pkgver = pkgrel = None
for line in srcinfo:
line = line.strip()
if not epoch and line.startswith('epoch = '):
epoch = line.split()[-1]
elif not pkgver and line.startswith('pkgver = '):
pkgver = line.split()[-1]
elif not pkgrel and line.startswith('pkgrel = '):
pkgrel = line.split()[-1]
assert pkgver is not None
assert pkgrel is not None
return PkgVers(epoch, pkgver, pkgrel)