Skip to content
Snippets Groups Projects
Verified Commit dafb01e1 authored by Karel Koci's avatar Karel Koci :metal:
Browse files

nsfarm/cli: use only one thread for FDLogging

The FDLogging uses poll to propagate data from one feed to the other.
This is not limited to only two file descriptors but technically any
number of them. Thus it is way more efficient to simply add new feeds to
existing thread over spawning the new one.
parent 2e6bb481
Branches
Tags
1 merge request!38Draft: Fix and improve CLI handling and few other fixes
......@@ -302,27 +302,35 @@ class LineBytesAggregate:
class FDLogging:
"""Live logging with data passtrough.
This is stream logging that logs communication comming from and to file descript trough socket. This intended use is
This is stream logging that logs communication comming from and to file descript trough socket. The intended use is
for direct output to be visible live in logs.
This has one primary limitation and that is output only in lines. Log is created only when new line character is
located not before that.
located not before that. The reason for this is readibility of logs.
"""
_thread: typing.Optional[threading.Thread] = None
_lock: threading.Lock = threading.Lock()
_poll: select.poll = select.poll()
_output: dict[int, int] = {}
_propagation: dict[int, bool] = {}
_aggregate: dict[int, LineBytesAggregate] = {}
def __init__(self, fileno: int, logger: logging.Logger, in_level=logging.INFO, out_level=logging.DEBUG):
self._logger = logger
self._in_level = in_level
self._out_level = out_level
self._fileno = fileno
self._fileno = fileno if isinstance(fileno, int) else fileno.fileno()
self._our_sock, self._user_sock = socket.socketpair()
self._propagate = True
self._orig_filestatus = fcntl.fcntl(self._fileno, fcntl.F_GETFL)
fcntl.fcntl(self._fileno, fcntl.F_SETFL, self._orig_filestatus | os.O_NONBLOCK)
self._our_sock.setblocking(False)
self._thread = threading.Thread(target=self._thread_func, daemon=True)
self._thread.start()
self._add_socket(
self._fileno,
LineBytesAggregate(lambda line: self._log_line("> ", line, in_level)),
self._our_sock.fileno(),
LineBytesAggregate(lambda line: self._log_line("< ", line, out_level)),
)
@property
def socket(self):
......@@ -333,15 +341,20 @@ class FDLogging:
"""Configures if input should be propagated to socket or not. Output is still propagated to file but input read
from file is simply logged and dropped.
"""
self._propagate = propagate
self._set_propagation(self._fileno, propagate)
@classmethod
def _set_propagation(cls, fileno: int, propagate: bool):
with cls._lock:
cls._propagation[fileno] = propagate
def close(self):
"""Close socket and stop logging."""
if self._our_sock is None:
return
self._del_socket(self._fileno, self._our_sock.fileno())
self._our_sock.close()
self._our_sock = None
self._thread.join()
fcntl.fcntl(self._fileno, fcntl.F_SETFL, self._orig_filestatus)
def __del__(self):
......@@ -350,28 +363,49 @@ class FDLogging:
def _log_line(self, prefix, line, level):
self._logger.log(level, prefix + repr(line.expandtabs())[2:-1])
def _thread_func(self):
aggregates = {
self._fileno: LineBytesAggregate(lambda line: self._log_line("> ", line, self._in_level)),
self._our_sock.fileno(): LineBytesAggregate(lambda line: self._log_line("< ", line, self._out_level)),
}
output = {
self._fileno: self._our_sock.fileno(),
self._our_sock.fileno(): self._fileno,
}
poll = select.poll()
poll.register(self._fileno, select.POLLIN)
poll.register(self._our_sock.fileno(), select.POLLIN | select.POLLNVAL)
while True:
for poll_event in poll.poll():
@classmethod
def _add_socket(
cls, fileno_in: int, aggregate_in: LineBytesAggregate, fileno_out: int, aggregate_out: LineBytesAggregate
):
with cls._lock:
cls._output.update({fileno_in: fileno_out, fileno_out: fileno_in})
cls._propagation.update({fileno_in: True, fileno_out: True})
cls._aggregate.update({fileno_in: aggregate_in, fileno_out: aggregate_out})
cls._poll.register(fileno_in, select.POLLIN)
cls._poll.register(fileno_out, select.POLLIN | select.POLLNVAL)
if cls._thread is None:
cls._thread = threading.Thread(target=cls._thread_func, daemon=True)
if not cls._thread.is_alive():
cls._thread.start()
@classmethod
def _del_socket(cls, fileno_in: int, fileno_out: int):
with cls._lock:
cls._poll.unregister(fileno_in)
cls._poll.unregister(fileno_out)
del cls._output[fileno_in]
del cls._output[fileno_out]
del cls._propagation[fileno_in]
del cls._propagation[fileno_out]
del cls._aggregate[fileno_in]
del cls._aggregate[fileno_out]
@classmethod
def _thread_func(cls):
while cls._output: # We run until we have some output then we can terminate
for poll_event in cls._poll.poll():
fileno, event = poll_event
if event == select.POLLNVAL:
return
continue
data = os.read(fileno, io.DEFAULT_BUFFER_SIZE)
if fileno != self._fileno or self._propagate:
os.write(output[fileno], data)
aggregates[fileno].add(data)
with cls._lock:
if fileno not in cls._output:
# This covers race condition with _del_socket as it might win lock over us and remove fileno in
# the meantime we were waiting for the lock.
continue
if cls._propagation[fileno]:
os.write(cls._output[fileno], data)
cls._aggregate[fileno].add(data)
class PexpectLogging:
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment