Verified Commit 3e7f0b7b authored by Karel Koci's avatar Karel Koci 🤘
Browse files

svupdater: fix issues reported by mypy

These are mostly type hints related fixes.
parent 2a41b966
......@@ -22,10 +22,12 @@ def updater_supervised() -> bool:
return _pid_locked()
def run(wait_for_network: bool = False, ensure_run: bool = False, timeout: int = const.PKGUPDATE_TIMEOUT,
def run(wait_for_network: typing.Union[bool, int] = False, ensure_run: bool = False, timeout:
int = const.PKGUPDATE_TIMEOUT,
timeout_kill: int = const.PKGUPDATE_TIMEOUT_KILL,
hooklist: typing.Union[None, typing.Iterable[str]] = None):
"""Run updater.
This call will spawn daemon process and returns. But be aware that at first it checks if some other supervisor is
not running and it takes file lock because of that. If someone messed up that lock then it won't return immediately.
Calling this with timeout is advised for time sensitive applications.
......@@ -33,15 +35,14 @@ def run(wait_for_network: bool = False, ensure_run: bool = False, timeout: int =
You can pass hooks (single line shell scripts) to be run after updater.
"""
if not autorun.enabled():
raise UpdaterDisabledError(
"Can't run. Updater is configured to be disabled.")
raise UpdaterDisabledError("Can't run. Updater is configured to be disabled.")
# Fork to daemon
if _daemonize():
return
# Wait for network if configured
if wait_for_network:
if type(wait_for_network == bool):
wait_for_network = const.PING_TIMEOUT
if isinstance(wait_for_network, bool):
wait_for_network = const.TURRIS_REPO_HEALTH_TIMEOUT
_wait_for_network(wait_for_network)
# And run updater
_run(
......
......@@ -20,7 +20,7 @@ class Supervisor:
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.kill_timeout = 0
self.process = None
self.process: typing.Optional[subprocess.Popen] = None
self.trace = ""
self.trace_lock = Lock()
self._devnull = open(os.devnull, "w")
......
......@@ -3,13 +3,15 @@
This is pretty fragile and info provided by this module should not be considered as strictly correct. There is
possibility that some of the logs might fail to match or that logging to syslog is disabled or even syslog being broken.
"""
import abc
import datetime
import io
import os
import re
import sys
import abc
import time
import typing
import datetime
from . import const
from . import opkg_lock as _opkg_lock
......@@ -158,6 +160,7 @@ class LogReader:
for msg in lr:
print(msg)
"""
_log_line = r"([^ ]+ +[^ ]+ [^ ]+) [^ ]+ updater\[[\d]+\]: [^ :]+:[\d]+ \([^)]*\): (.*)"
_lines_repre = {
r"Target Turris OS: (.*)": MsgTargetTurrisOS,
......@@ -181,12 +184,13 @@ class LogReader:
"""
self.log = log
self.blocking = blocking
self._file = None
self._file: typing.Optional[io.BytesIO] = None
self._re_log_line = re.compile(self._log_line)
self._re_lines = {re.compile(regexp): repre for regexp, repre in self._lines_repre.items()}
def open(self):
"""Open log for reading.
You have to call this before any other method.
"""
if self._file is not None:
......@@ -194,21 +198,20 @@ class LogReader:
self._file = open(self.log, "rb")
def close(self):
"""Close log file.
"""
"""Close log file."""
self._file.close()
self._file = None
@staticmethod
def _parse_date(strdate):
date = datetime.datetime.strptime(strdate, '%b %d %H:%M:%S')
date = datetime.datetime.strptime(strdate, "%b %d %H:%M:%S")
# We do not have year in syslog. We have to somehow identify it. Reasonable expectation is current year but we
# have to cover new year so we have to check month and possibly decrease year by one.
now = datetime.datetime.utcnow()
return date.replace(year=now.year if now.month >= date.month else (now.year - 1))
def _clasify_line(self, line):
line = line.decode(sys.getdefaultencoding(), 'ignore').rstrip('\n')
line = line.decode(sys.getdefaultencoding(), "ignore").rstrip("\n")
match = self._re_log_line.fullmatch(line)
if match is not None:
date = self._parse_date(match.group(1))
......@@ -219,11 +222,13 @@ class LogReader:
return self._re_lines[re_line](date, lmatch)
return None
def seek_latest(self, msgtype: Msg = MsgTargetTurrisOS) -> typing.Optional[Msg]:
"""Seeks latest message of given type. In default that is MsgTargetTurrisOS as that is in general the first
message printed by updater.
It returns that message if it was located or None of not.
def seek_latest(self, msgtype: typing.Type[Msg] = MsgTargetTurrisOS) -> typing.Optional[Msg]:
"""Seeks latest message of given type.
In default that is MsgTargetTurrisOS as that is in general the first message printed by updater.
It returns that message if it was located or None if not.
"""
assert self._file is not None, "Log has to be openned first!"
# We do not want to use blocking so disable it for now
blocking = self.blocking
self.blocking = False
......@@ -240,8 +245,9 @@ class LogReader:
self.blocking = blocking
return latest_msg
def seek_after(self, date: datetime.datetime = datetime.datetime.utcnow(),
msgtype: Msg = MsgTargetTurrisOS) -> typing.Optional[Msg]:
def seek_after(
self, date: datetime.datetime = datetime.datetime.utcnow(), msgtype: typing.Type[Msg] = MsgTargetTurrisOS
) -> typing.Optional[Msg]:
"""Seeks message of given type that happens right after given date. In default message type this looks for is
MsgTargetTurrisOS and date is utcnow.
Note that date has to be in UTC as syslog logs in UTC.
......@@ -262,6 +268,7 @@ class LogReader:
It returns parsed updater's line or None if end of the log was reached.
"""
assert self._file is not None, "Log has to be openned first!"
while True:
line = self._file.readline()
if not line:
......@@ -290,8 +297,9 @@ class LogReader:
return res
def latest_message(msgtype: Msg):
def latest_message(msgtype: typing.Type[Msg]) -> typing.Optional[datetime.datetime]:
"""This function looks for latest message of given type in log and returns its date.
This is unreliable. We might not have that in syslog as reboot happened or someone removed log. This also does not
support and so not reads gzipped logs.
......@@ -307,7 +315,8 @@ def latest_message(msgtype: Msg):
def last_check() -> typing.Optional[datetime.datetime]:
"""This is simple function that checks for date of last updater's execution.
"""Simple function that checks for date of last updater's execution.
This function uses latest_message so same note about unreliable result applies here as well. Not only that but this
is also based on unreliable MsgTargetTurrisOS.
......@@ -317,7 +326,8 @@ def last_check() -> typing.Optional[datetime.datetime]:
def last_run() -> typing.Optional[datetime.datetime]:
"""This is simple function that checks for date of last updater's execution.
"""Simple function that checks for date of the last updater's execution.
This function uses latest_message so same note about unreliable result applies here as well.
It returns datetime or None if time of last update is unknown.
......
......@@ -60,7 +60,7 @@ class Package:
self._fields[name] = value
def __init__(self, rootdir, block: typing.Iterable[str]):
self._fields = dict()
self._fields: dict[str, str] = {}
# TODO this does not support description we just skip space indented lines
for line in block:
name, value = self._parse_field(line)
......
......@@ -29,9 +29,13 @@ def turris_repo_health(address: str = const.TURRIS_REPO_HEALTH_URL) -> bool:
return res.returncode == 0 and res.stdout == "ok\n"
def wait_for_network(max_stall: int) -> typing.Optional[bool]:
def wait_for_network(max_stall: int) -> bool:
"""Wait for ability to access the repo.turris.cz.
The max_stall can be any number of seconds but too small numbers (few seconds) should not be used as it might not be
enough time to actually perform even a single network connection test. For zero and negative numbers this function
behaves the same way as call to the turris_repo_health.
Returns True if connection is successful and False if wait timed out.
"""
......@@ -49,8 +53,8 @@ def wait_for_network(max_stall: int) -> typing.Optional[bool]:
time.sleep(sleep_time)
delay *= 2
if max_stall is None:
return None # None means no stall
if max_stall <= 0:
return turris_repo_health()
process = multiprocessing.Process(target=network_test)
process.start()
process.join(max_stall)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment