from __future__ import annotations
# PKGBUILD related stuff that lilac uses (excluding APIs)
import os
import time
import subprocess
from typing import Dict, List, Optional, Union
from pathlib import Path
from contextlib import suppress
import logging
import pyalpm
from .vendor.myutils import safe_overwrite
from .const import _G, OFFICIAL_REPOS
from .cmd import UNTRUSTED_PREFIX
from .typing import PkgVers
logger = logging.getLogger(__name__)
_official_packages: Dict[str, int] = {}
_official_groups: Dict[str, int] = {}
_repo_package_versions: Dict[str, str] = {}
class ConflictWithOfficialError(Exception):
def __init__(self, groups, packages):
self.groups = groups
self.packages = packages
class DowngradingError(Exception):
def __init__(self, pkgname, built_version, repo_version):
self.pkgname = pkgname
self.built_version = built_version
self.repo_version = repo_version
def _load_timed_dict(
path: os.PathLike, deadline: int,
) -> Dict[str, int]:
data = {}
with suppress(FileNotFoundError), open(path) as f:
for line in f:
name, t_str = line.split(None, 1)
t = int(t_str)
if t >= deadline:
data[name] = t
return data
def _save_timed_dict(
path: os.PathLike, data: Dict[str, int],
) -> None:
data_str = ''.join(f'{k} {v}\n' for k, v in data.items())
safe_overwrite(str(path), data_str, mode='w')
def update_pacmandb(
dbpath: Path, pacman_conf: Optional[str] = None, *,
quiet: bool = False,
update_pacfiles: bool = False,
) -> None:
stdout = subprocess.DEVNULL if quiet else None
for update_arg in ['-Sy', '-Fy']:
cmd: List[Union[str, Path]] = [
'fakeroot', 'pacman', update_arg, '--dbpath', dbpath,
'--disable-sandbox', # unusable without root
]
if pacman_conf is not None:
cmd += ['--config', pacman_conf]
for _ in range(3):
p = subprocess.run(cmd, stdout = stdout)
if p.returncode == 0:
break
else:
p.check_returncode()
if update_pacfiles:
cmd = ['pacfiles', '--dbpath', dbpath, '--update-db']
subprocess.check_call(cmd, stdout = stdout)
def update_data(
pacman_conf: Optional[str], *,
quiet: bool = False, update_pacfiles: bool = False,
) -> None:
from .const import PACMAN_DB_DIR
dbpath = PACMAN_DB_DIR
update_pacmandb(dbpath, pacman_conf,
quiet=quiet, update_pacfiles=update_pacfiles)
now = int(time.time())
deadline = now - 90 * 86400
pkgs = _load_timed_dict(dbpath / 'packages.txt', deadline)
groups = _load_timed_dict(dbpath / 'groups.txt', deadline)
H = pyalpm.Handle('/', str(dbpath))
for repo in OFFICIAL_REPOS:
db = H.register_syncdb(repo, 0)
pkgs.update((p.name, now) for p in db.pkgcache)
groups.update((g[0], now) for g in db.grpcache)
_save_timed_dict(dbpath / 'packages.txt', pkgs)
_save_timed_dict(dbpath / 'groups.txt', groups)
def load_data(dbpath: Path) -> None:
global _repo_package_versions
now = int(time.time())
deadline = now - 90 * 86400
_official_packages.update(
_load_timed_dict(dbpath / 'packages.txt', deadline))
_official_groups.update(
_load_timed_dict(dbpath / 'groups.txt', deadline))
if hasattr(_G, 'reponame'):
H = pyalpm.Handle('/', str(dbpath))
db = H.register_syncdb(_G.reponame, 0)
_repo_package_versions = {p.name: p.version for p in db.pkgcache}
def check_srcinfo() -> PkgVers:
srcinfo = get_srcinfo().decode('utf-8').splitlines()
bad_groups = []
bad_packages = []
pkgnames = []
for line in srcinfo:
line = line.strip()
if line.startswith('groups = '):
g = line.split()[-1]
if g in _official_groups:
bad_groups.append(g)
elif line.startswith('replaces = '):
pkg = line.split()[-1]
if pkg in _official_packages:
bad_packages.append(pkg)
elif line.startswith('pkgname = '):
pkgnames.append(line.split()[-1])
pkgvers = _get_package_version(srcinfo)
# check if the newly built package is older than the existing
# package in repos or not
built_version = str(pkgvers)
for pkgname in pkgnames:
try:
repo_version = _repo_package_versions[pkgname]
logger.debug('comparing versions: built=%s, repo=%s',
built_version, repo_version)
if pyalpm.vercmp(built_version, repo_version) <= 0:
raise DowngradingError(pkgname, built_version, repo_version)
except KeyError:
logger.debug('new package: %s %s', pkgname, built_version)
# the newly built package is not in repos yet - fine
if bad_groups or bad_packages:
raise ConflictWithOfficialError(bad_groups, bad_packages)
return pkgvers
[docs]
def get_srcinfo() -> bytes:
pwd = os.getcwd()
basename = os.path.basename(pwd)
# makepkg wants *.install file and write permissions to simply print out info :-(
extra_binds = ['--bind', pwd, f'/tmp/{basename}', '--chdir', f'/tmp/{basename}']
out = subprocess.check_output(
UNTRUSTED_PREFIX + extra_binds + ['makepkg', '--printsrcinfo'], # type: ignore
)
return out
def _get_package_version(srcinfo: List[str]) -> PkgVers:
epoch = pkgver = pkgrel = None
for line in srcinfo:
line = line.strip()
if not epoch and line.startswith('epoch = '):
epoch = line.split()[-1]
elif not pkgver and line.startswith('pkgver = '):
pkgver = line.split()[-1]
elif not pkgrel and line.startswith('pkgrel = '):
pkgrel = line.split()[-1]
assert pkgver is not None
assert pkgrel is not None
return PkgVers(epoch, pkgver, pkgrel)
if __name__ == '__main__':
import sys
conf = sys.argv[1] if len(sys.argv) == 2 else None
update_data(conf)