Source code for lilac2.vendor.myutils

from __future__ import annotations

import os, sys
import re
import datetime
import time
from functools import lru_cache, wraps
import logging
import contextlib
import signal
import hashlib
import base64
import fcntl
from typing import Tuple, Union, Optional, Dict, Any, Generator

logger = logging.getLogger(__name__)

def safe_overwrite(fname: str, data: Union[bytes, str], *,
                   method: str = 'write', mode: str = 'w', encoding: Optional[str] = None) -> None:
  # FIXME: directory has no read perm
  # FIXME: symlinks and hard links
  tmpname = fname + '.tmp'
  # if not using "with", write can fail without exception
  with open(tmpname, mode, encoding=encoding) as f:
    getattr(f, method)(data)
    # see also: https://thunk.org/tytso/blog/2009/03/15/dont-fear-the-fsync/
    f.flush()
    os.fsync(f.fileno())
  # if the above write failed (because disk is full etc), the old data should be kept
  os.rename(tmpname, fname)

UNITS = 'KMGTPEZY'

def filesize(size: int) -> str:
  amt, unit = filesize_ex(size)
  if unit:
    return '%.1f%siB' % (amt, unit)
  else:
    return '%dB' % amt

def filesize_ex(size: int) -> Tuple[Union[float, int], str]:
  left: Union[int, float] = abs(size)
  unit = -1
  n = len(UNITS)
  while left > 1100 and unit < n:
    left = left / 1024
    unit += 1
  if unit == -1:
    return size, ''
  else:
    if size < 0:
      left = -left
    return left, UNITS[unit]

class FileSize(int):
  def __str__(self) -> str:
    return filesize(self).rstrip('iB')

def parse_filesize(s: str) -> int:
  s1 = s.rstrip('iB')
  if not s1:
    raise ValueError(s)

  last = s1[-1]
  try:
    idx = UNITS.index(last)
  except ValueError:
    return int(float(s1))

  v = float(s1[:-1]) * 1024 ** (idx+1)
  return int(v)

def humantime(t: int) -> str:
  '''seconds -> XhYmZs'''
  if t < 0:
    sign = '-'
    t = -t
  else:
    sign = ''

  m, s = divmod(t, 60)
  h, m = divmod(m, 60)
  d, h = divmod(h, 24)
  ret = ''
  if d:
    ret += '%dd' % d
  if h:
    ret += '%dh' % h
  if m:
    ret += '%dm' % m
  if s:
    ret += '%ds' % s
  if not ret:
    ret = '0s'
  return sign + ret

def dehumantime(s: str) -> int:
  '''XhYmZs -> seconds'''
  m = re.match(r'(?:(?P<d>\d+)d)?(?:(?P<h>\d+)h)?(?:(?P<m>\d+)m)?(?:(?P<s>\d+)s)?$', s)
  if m:
    return (
      int(m.group('d') or 0) * 3600 * 24 +
      int(m.group('h') or 0) * 3600 +
      int(m.group('m') or 0) * 60 +
      int(m.group('s') or 0)
    )
  else:
    raise ValueError(s)

def _timed_read(file, timeout):
  from select import select
  if select([file], [], [], timeout)[0]:
    return file.read(1)

def getchar(
  prompt: str,
  hidden: bool = False,
  end: str = '\n',
  timeout: Optional[float] = None,
):
  '''读取一个字符'''
  import termios
  sys.stdout.write(prompt)
  sys.stdout.flush()
  fd = sys.stdin.fileno()
  ch: Optional[str]

  def _read() -> Optional[str]:
    ch: Optional[str]
    if timeout is None:
      ch = sys.stdin.read(1)
    else:
      ch = _timed_read(sys.stdin, timeout)
    return ch

  if os.isatty(fd):
    old = termios.tcgetattr(fd)
    new = termios.tcgetattr(fd)
    if hidden:
      new[3] = new[3] & ~termios.ICANON & ~termios.ECHO
    else:
      new[3] = new[3] & ~termios.ICANON
    new[6][termios.VMIN] = 1
    new[6][termios.VTIME] = 0
    try:
      termios.tcsetattr(fd, termios.TCSANOW, new)
      termios.tcsendbreak(fd, 0)
      ch = _read()
    finally:
      termios.tcsetattr(fd, termios.TCSAFLUSH, old)
  else:
    ch = _read()

  sys.stdout.write(end)
  return ch

def loadso(fname):
  '''ctypes.CDLL 的 wrapper,从 sys.path 中搜索文件'''
  from ctypes import CDLL

  for d in sys.path:
    p = os.path.join(d, fname)
    if os.path.exists(p):
      return CDLL(p)
  raise ImportError('%s not found' % fname)

def dofile(path):
  G = {}
  with open(path) as f:
    exec(f.read(), G)
  return G

def restart_if_failed(func, max_tries, args=(), kwargs={}, secs=60, sleep=None):
  '''
  re-run when some exception happens, until `max_tries` in `secs`
  '''
  import traceback
  from collections import deque

  dq = deque(maxlen=max_tries)
  while True:
    dq.append(time.time())
    try:
      return func(*args, **kwargs)
    except Exception:
      traceback.print_exc()
      if len(dq) == max_tries and time.time() - dq[0] < secs:
        break
      if sleep is not None:
        time.sleep(sleep)
    else:
      break

def daterange(start, stop=datetime.date.today(), step=datetime.timedelta(days=1)):
  d = start
  while d < stop:
    yield d
    d += step

@lru_cache()
def findfont(fontname):
  from subprocess import check_output
  out = check_output(['fc-match', '-v', fontname]).decode()
  for l in out.split('\n'):
    if l.lstrip().startswith('file:'):
      return l.split('"', 2)[1]

def debugfunc(logger=logging, *, _id=[0]):
  def w(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
      myid = _id[0]
      _id[0] += 1
      logger.debug('[func %d] %s(%r, %r)', myid, func.__name__, args, kwargs)
      ret = func(*args, **kwargs)
      logger.debug('[func %d] return: %r', myid, ret)
      return ret
    return wrapper
  return w

@contextlib.contextmanager
def execution_timeout(timeout):
  def timed_out(signum, sigframe):
    raise TimeoutError

  delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout, 0)
  old_hdl = signal.signal(signal.SIGALRM, timed_out)
  now = time.time()
  try:
    yield
  finally:
    # inner timeout must be smaller, or the timer event will be delayed
    if delay:
      elapsed = time.time() - now
      delay = max(delay - elapsed, 0.000001)
    else:
      delay = 0
    signal.setitimer(signal.ITIMER_REAL, delay, interval)
    signal.signal(signal.SIGALRM, old_hdl)

def find_executables(name, path=None):
  '''find all matching executables with specific name in path'''
  if path is None:
    path = os.environ['PATH'].split(os.pathsep)
  elif isinstance(path, str):
    path = path.split(os.pathsep)
  path = [p for p in path if os.path.isdir(p)]

  return [os.path.join(p, f) for p in path for f in os.listdir(p) if f == name]

# The following three are learnt from makepkg
def user_choose(prompt, timeout=None):
  # XXX: hard-coded term characters are ok?
  prompt = '\x1b[1;34m::\x1b[1;37m %s\x1b[0m ' % prompt
  return getchar(prompt, timeout=timeout)

def msg(msg):
  # XXX: hard-coded term characters are ok?
  print('\x1b[1;32m==>\x1b[1;37m %s\x1b[0m' % msg)

def msg2(msg):
  # XXX: hard-coded term characters are ok?
  print('\x1b[1;34m  ->\x1b[1;37m %s\x1b[0m' % msg)

def is_internal_ip(ip):
  import ipaddress
  ip = ipaddress.ip_address(ip)
  return ip.is_loopback or ip.is_private or ip.is_reserved or ip.is_link_local

[docs] @contextlib.contextmanager def at_dir(d: os.PathLike) -> Generator[None, None, None]: old_dir = os.getcwd() os.chdir(d) try: yield finally: os.chdir(old_dir)
def firstExistentPath(paths): for p in paths: if os.path.exists(p): return p def md5sum_of_file(file): with open(file, 'rb') as f: m = hashlib.md5() while True: d = f.read(81920) if not d: break m.update(d) return m.hexdigest() def md5(s, encoding='utf-8'): m = hashlib.md5() m.update(s.encode(encoding)) return m.hexdigest() def base64_encode(s): if isinstance(s, str): s = s.encode() return base64.b64encode(s).decode('ascii') def lock_file(path: os.PathLike) -> None: lock = os.open(path, os.O_WRONLY | os.O_CREAT, 0o600) try: fcntl.flock(lock, fcntl.LOCK_EX|fcntl.LOCK_NB) except BlockingIOError: logger.warning('Waiting for lock to release...') fcntl.flock(lock, fcntl.LOCK_EX) @contextlib.contextmanager def file_lock(file: os.PathLike) -> Generator[None, None, None]: lock = os.open(file, os.O_WRONLY | os.O_CREAT, 0o600) try: fcntl.flock(lock, fcntl.LOCK_EX) yield finally: os.close(lock) def dict_bytes_to_str(d: Dict[Any, Any]) -> Dict[Any, Any]: ret = {} for k, v in d.items(): if isinstance(k, bytes): with contextlib.suppress(UnicodeDecodeError): k = k.decode() if isinstance(v, bytes): with contextlib.suppress(UnicodeDecodeError): v = v.decode() elif isinstance(v, dict): v = dict_bytes_to_str(v) elif isinstance(v, list): with contextlib.suppress(UnicodeDecodeError): v = [x.decode() for x in v] ret[k] = v return ret def xsel(input=None): import subprocess if input is None: return subprocess.getoutput('uniclip') else: p = subprocess.Popen(['uniclip', '-i'], stdin=subprocess.PIPE) p.communicate(input.encode()) return p.wait()