diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 69f14d5ccb..f9cad61798 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -482,8 +482,9 @@ def run(self, args): func = get_func(args) # do not use loggers before this! is_serve = func == self.do_serve - self.log_json = args.log_json or is_serve - setup_logging(level=args.log_level, json=self.log_json) + self.log_json = args.log_json and not is_serve + func_name = getattr(func, "__name__", "none") + setup_logging(level=args.log_level, is_serve=is_serve, log_json=self.log_json, func=func_name) args.progress |= is_serve self._setup_implied_logging(vars(args)) self._setup_topic_debugging(args) diff --git a/src/borg/helpers/__init__.py b/src/borg/helpers/__init__.py index e42d784151..1a745c49a2 100644 --- a/src/borg/helpers/__init__.py +++ b/src/borg/helpers/__init__.py @@ -33,7 +33,7 @@ from .process import daemonize, daemonizing from .process import signal_handler, raising_signal_handler, sig_int, ignore_sigint, SigHup, SigTerm from .process import popen_with_error_handling, is_terminal, prepare_subprocess_env, create_filter_process -from .progress import ProgressIndicatorPercent, ProgressIndicatorEndless, ProgressIndicatorMessage +from .progress import ProgressIndicatorPercent, ProgressIndicatorMessage from .time import parse_timestamp, timestamp, safe_timestamp, safe_s, safe_ns, MAX_S, SUPPORT_32BIT_PLATFORMS from .time import format_time, format_timedelta, OutputTimestamp, archive_ts_now from .yes_no import yes, TRUISH, FALSISH, DEFAULTISH diff --git a/src/borg/helpers/progress.py b/src/borg/helpers/progress.py index 782c7fb340..8d6f8caff0 100644 --- a/src/borg/helpers/progress.py +++ b/src/borg/helpers/progress.py @@ -1,28 +1,15 @@ import logging import json -import sys import time -from shutil import get_terminal_size from ..logger import create_logger logger = create_logger() -from .parseformat import ellipsis_truncate - - -def justify_to_terminal_size(message): - terminal_space = get_terminal_size(fallback=(-1, -1))[0] - # justify only if we are outputting to a terminal - if terminal_space != -1: - return message.ljust(terminal_space) - return message - class ProgressIndicatorBase: LOGGER = "borg.output.progress" JSON_TYPE: str = None - json = False operation_id_counter = 0 @@ -33,73 +20,27 @@ def operation_id(cls): return cls.operation_id_counter def __init__(self, msgid=None): - self.handler = None self.logger = logging.getLogger(self.LOGGER) self.id = self.operation_id() self.msgid = msgid - # If there are no handlers, set one up explicitly because the - # terminator and propagation needs to be set. If there are, - # they must have been set up by BORG_LOGGING_CONF: skip setup. - if not self.logger.handlers: - self.handler = logging.StreamHandler(stream=sys.stderr) - self.handler.setLevel(logging.INFO) - logger = logging.getLogger("borg") - # Some special attributes on the borg logger, created by setup_logging - # But also be able to work without that - try: - formatter = logger.formatter - terminator = "\n" if logger.json else "\r" - self.json = logger.json - except AttributeError: - terminator = "\r" - else: - self.handler.setFormatter(formatter) - self.handler.terminator = terminator - - self.logger.addHandler(self.handler) - if self.logger.level == logging.NOTSET: - self.logger.setLevel(logging.WARN) - self.logger.propagate = False - - # If --progress is not set then the progress logger level will be WARN - # due to setup_implied_logging (it may be NOTSET with a logging config file, - # but the interactions there are generally unclear), so self.emit becomes - # False, which is correct. - # If --progress is set then the level will be INFO as per setup_implied_logging; - # note that this is always the case for serve processes due to a "args.progress |= is_serve". - # In this case self.emit is True. - self.emit = self.logger.getEffectiveLevel() == logging.INFO - - def __del__(self): - if self.handler is not None: - self.logger.removeHandler(self.handler) - self.handler.close() - - def output_json(self, *, finished=False, **kwargs): - assert self.json - if not self.emit: - return + def make_json(self, *, finished=False, **kwargs): kwargs.update( dict(operation=self.id, msgid=self.msgid, type=self.JSON_TYPE, finished=finished, time=time.time()) ) - print(json.dumps(kwargs), file=sys.stderr, flush=True) + return json.dumps(kwargs) def finish(self): - if self.json: - self.output_json(finished=True) - else: - self.output("") + j = self.make_json(message="", finished=True) + self.logger.info(j) class ProgressIndicatorMessage(ProgressIndicatorBase): JSON_TYPE = "progress_message" def output(self, msg): - if self.json: - self.output_json(message=msg) - else: - self.logger.info(justify_to_terminal_size(msg)) + j = self.make_json(message=msg) + self.logger.info(j) class ProgressIndicatorPercent(ProgressIndicatorBase): @@ -141,58 +82,11 @@ def show(self, current=None, increase=1, info=None): """ pct = self.progress(current, increase) if pct is not None: - # truncate the last argument, if no space is available if info is not None: - if not self.json: - from ..platform import swidth # avoid circular import - - # no need to truncate if we're not outputting to a terminal - terminal_space = get_terminal_size(fallback=(-1, -1))[0] - if terminal_space != -1: - space = terminal_space - swidth(self.msg % tuple([pct] + info[:-1] + [""])) - info[-1] = ellipsis_truncate(info[-1], space) - return self.output(self.msg % tuple([pct] + info), justify=False, info=info) - - return self.output(self.msg % pct) - - def output(self, message, justify=True, info=None): - if self.json: - self.output_json(message=message, current=self.counter, total=self.total, info=info) - else: - if justify: - message = justify_to_terminal_size(message) - self.logger.info(message) - - -class ProgressIndicatorEndless: - def __init__(self, step=10, file=None): - """ - Progress indicator (long row of dots) - - :param step: every Nth call, call the func - :param file: output file, default: sys.stderr - """ - self.counter = 0 # call counter - self.triggered = 0 # increases 1 per trigger event - self.step = step # trigger every calls - if file is None: - file = sys.stderr - self.file = file - - def progress(self): - self.counter += 1 - trigger = self.counter % self.step == 0 - if trigger: - self.triggered += 1 - return trigger - - def show(self): - trigger = self.progress() - if trigger: - return self.output(self.triggered) - - def output(self, triggered): - print(".", end="", file=self.file, flush=True) + return self.output(self.msg % tuple([pct] + info), info=info) + else: + return self.output(self.msg % pct) - def finish(self): - print(file=self.file) + def output(self, message, info=None): + j = self.make_json(message=message, current=self.counter, total=self.total, info=info) + self.logger.info(j) diff --git a/src/borg/logger.py b/src/borg/logger.py index 6788ff1180..f158aa7979 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -36,9 +36,64 @@ import logging.config import logging.handlers # needed for handlers defined there being configurable in logging.conf file import os +import queue +import sys +import time +from typing import Optional import warnings +logging_debugging_path: Optional[str] = None # if set, write borg.logger debugging log to path/borg-*.log + configured = False +borg_serve_log_queue: queue.SimpleQueue = queue.SimpleQueue() + + +class BorgQueueHandler(logging.handlers.QueueHandler): + """borg serve writes log record dicts to a borg_serve_log_queue""" + + def prepare(self, record: logging.LogRecord) -> dict: + return dict( + # kwargs needed for LogRecord constructor: + name=record.name, + level=record.levelno, + pathname=record.pathname, + lineno=record.lineno, + msg=record.msg, + args=record.args, + exc_info=record.exc_info, + func=record.funcName, + sinfo=record.stack_info, + ) + + +class StderrHandler(logging.StreamHandler): + """ + This class is like a StreamHandler using sys.stderr, but always uses + whatever sys.stderr is currently set to rather than the value of + sys.stderr at handler construction time. + """ + + def __init__(self, stream=None): + logging.Handler.__init__(self) + + @property + def stream(self): + return sys.stderr + + +class TextProgressFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + # record.msg contains json (because we always do json for progress log) + j = json.loads(record.msg) + # inside the json, the text log line can be found under "message" + return f"{j['message']}" + + +class JSONProgressFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + # record.msg contains json (because we always do json for progress log) + return f"{record.msg}" + # use something like this to ignore warnings: # warnings.filterwarnings('ignore', r'... regex for warning message to ignore ...') @@ -53,7 +108,22 @@ def _log_warning(message, category, filename, lineno, file=None, line=None): logger.warning(msg) -def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", json=False): +def remove_handlers(logger): + for handler in logger.handlers[:]: + handler.flush() + handler.close() + logger.removeHandler(handler) + + +def teardown_logging(): + global configured + logging.shutdown() + configured = False + + +def setup_logging( + stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, log_json=False, func=None +): """setup logging module according to the arguments provided if conf_fname is given (or the config file name can be determined via @@ -61,6 +131,8 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev otherwise, set up a stream handler logger on stderr (by default, if no stream is provided). + + is_serve: are we setting up the logging for "borg serve"? """ global configured err_msg = None @@ -77,25 +149,56 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev logging.config.fileConfig(f) configured = True logger = logging.getLogger(__name__) - borg_logger = logging.getLogger("borg") - borg_logger.json = json logger.debug(f'using logging configuration read from "{conf_fname}"') warnings.showwarning = _log_warning return None except Exception as err: # XXX be more precise err_msg = str(err) + # if we did not / not successfully load a logging configuration, fallback to this: - logger = logging.getLogger("") - handler = logging.StreamHandler(stream) + level = level.upper() fmt = "%(message)s" - formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt) + formatter = JsonFormatter(fmt) if log_json else logging.Formatter(fmt) + SHandler = StderrHandler if stream is None else logging.StreamHandler + handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) handler.setFormatter(formatter) - borg_logger = logging.getLogger("borg") - borg_logger.formatter = formatter - borg_logger.json = json - logger.addHandler(handler) - logger.setLevel(level.upper()) + logger = logging.getLogger() + remove_handlers(logger) + logger.setLevel(level) + + if logging_debugging_path is not None: + # add an addtl. root handler for debugging purposes + log_fname = os.path.join(logging_debugging_path, f"borg-{'serve' if is_serve else 'client'}-root.log") + handler2 = logging.StreamHandler(open(log_fname, "a")) + handler2.setFormatter(formatter) + logger.addHandler(handler2) + logger.warning(f"--- {func} ---") # only handler2 shall get this + + logger.addHandler(handler) # do this late, so handler is not added while debug handler is set up + + bop_formatter = JSONProgressFormatter() if log_json else TextProgressFormatter() + bop_handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) + bop_handler.setFormatter(bop_formatter) + bop_logger = logging.getLogger("borg.output.progress") + remove_handlers(bop_logger) + bop_logger.setLevel("INFO") + bop_logger.propagate = False + + if logging_debugging_path is not None: + # add an addtl. progress handler for debugging purposes + log_fname = os.path.join(logging_debugging_path, f"borg-{'serve' if is_serve else 'client'}-progress.log") + bop_handler2 = logging.StreamHandler(open(log_fname, "a")) + bop_handler2.setFormatter(bop_formatter) + bop_logger.addHandler(bop_handler2) + json_dict = dict( + message=f"--- {func} ---", operation=0, msgid="", type="progress_message", finished=False, time=time.time() + ) + bop_logger.warning(json.dumps(json_dict)) # only bop_handler2 shall get this + + bop_logger.addHandler(bop_handler) # do this late, so bop_handler is not added while debug handler is set up + configured = True + logger = logging.getLogger(__name__) if err_msg: logger.warning(f'setup_logging for "{conf_fname}" failed with "{err_msg}".') diff --git a/src/borg/remote.py b/src/borg/remote.py index d70ebada38..4976e6c7fa 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,9 +1,9 @@ import errno import functools import inspect -import json import logging import os +import queue import select import shlex import shutil @@ -15,6 +15,7 @@ import traceback from subprocess import Popen, PIPE +import borg.logger from . import __version__ from .compress import Compressor from .constants import * # NOQA @@ -26,7 +27,7 @@ from .helpers import format_file_size from .helpers import safe_unlink from .helpers import prepare_subprocess_env, ignore_sigint -from .logger import create_logger +from .logger import create_logger, borg_serve_log_queue from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version @@ -36,7 +37,7 @@ logger = create_logger(__name__) BORG_VERSION = parse_version(__version__) -MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r" +MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" MAX_INFLIGHT = 100 @@ -125,6 +126,7 @@ class RepositoryServer: # pragma: no cover "scan", "negotiate", "open", + "close", "info", "put", "rollback", @@ -151,29 +153,41 @@ def filter_args(self, f, kwargs): known = set(inspect.signature(f).parameters) return {name: kwargs[name] for name in kwargs if name in known} + def send_queued_log(self): + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os_write(self.stdout_fd, msg) + def serve(self): - stdin_fd = sys.stdin.fileno() - stdout_fd = sys.stdout.fileno() - stderr_fd = sys.stdout.fileno() - os.set_blocking(stdin_fd, False) - os.set_blocking(stdout_fd, True) - os.set_blocking(stderr_fd, True) + self.stdin_fd = sys.stdin.fileno() + self.stdout_fd = sys.stdout.fileno() + os.set_blocking(self.stdin_fd, False) + os.set_blocking(self.stdout_fd, True) unpacker = get_limited_unpacker("server") + shutdown_serve = False while True: - r, w, es = select.select([stdin_fd], [], [], 10) + # before processing any new RPCs, send out all pending log output + self.send_queued_log() + + if shutdown_serve: + # shutdown wanted! get out of here after sending all log output. + if self.repository is not None: + self.repository.close() + return + + # process new RPCs + r, w, es = select.select([self.stdin_fd], [], [], 10) if r: - data = os.read(stdin_fd, BUFSIZE) + data = os.read(self.stdin_fd, BUFSIZE) if not data: - if self.repository is not None: - self.repository.close() - else: - os_write( - stderr_fd, - "Borg {}: Got connection close before repository was opened.\n".format( - __version__ - ).encode(), - ) - return + shutdown_serve = True + continue unpacker.feed(data) for unpacked in unpacker: if isinstance(unpacked, dict): @@ -234,12 +248,12 @@ def serve(self): } ) - os_write(stdout_fd, msg) + os_write(self.stdout_fd, msg) else: - os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) + os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) if es: - self.repository.close() - return + shutdown_serve = True + continue def negotiate(self, client_data): if isinstance(client_data, dict): @@ -301,6 +315,12 @@ def open( self.repository.__enter__() # clean exit handled by serve() method return self.repository.id + def close(self): + if self.repository is not None: + self.repository.__exit__(None, None, None) + borg.logger.teardown_logging() + self.send_queued_log() + def inject_exception(self, kind): s1 = "test string" s2 = "test string2" @@ -726,10 +746,18 @@ def handle_error(unpacked): self.rx_bytes += len(data) self.unpacker.feed(data) for unpacked in self.unpacker: - if isinstance(unpacked, dict): - msgid = unpacked[MSGID] - else: + if not isinstance(unpacked, dict): raise UnexpectedRPCDataFormatFromServer(data) + + lr_dict = unpacked.get(LOG) + if lr_dict is not None: + # Re-emit remote log messages locally. + _logger = logging.getLogger(lr_dict["name"]) + if _logger.isEnabledFor(lr_dict["level"]): + _logger.handle(logging.LogRecord(**lr_dict)) + continue + + msgid = unpacked[MSGID] if msgid in self.ignore_responses: self.ignore_responses.remove(msgid) # async methods never return values, but may raise exceptions. @@ -755,8 +783,14 @@ def handle_error(unpacked): if lines and not lines[-1].endswith((b"\r", b"\n")): self.stderr_received = lines.pop() # now we have complete lines in and any partial line in self.stderr_received. + _logger = logging.getLogger() for line in lines: - handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences + # borg serve (remote/server side) should not emit stuff on stderr, + # but e.g. the ssh process (local/client side) might output errors there. + assert line.endswith((b"\r", b"\n")) + # something came in on stderr, log it to not lose it. + # decode late, avoid partial utf-8 sequences. + _logger.warning("stderr: " + line.decode().strip()) if w: while ( (len(self.to_send) <= maximum_to_send) @@ -872,6 +906,7 @@ def break_lock(self): """actual remoting is done via self.call in the @api decorator""" def close(self): + self.call("close", {}, wait=True) if self.p: self.p.stdin.close() self.p.stdout.close() @@ -886,57 +921,6 @@ def preload(self, ids): self.preload_ids += ids -def handle_remote_line(line): - """ - Handle a remote log line. - - This function is remarkably complex because it handles multiple wire formats. - """ - assert line.endswith(("\r", "\n")) - if line.startswith("{"): - msg = json.loads(line) - - if msg["type"] not in ("progress_message", "progress_percent", "log_message"): - logger.warning("Dropped remote log message with unknown type %r: %s", msg["type"], line) - return - - if msg["type"] == "log_message": - # Re-emit log messages on the same level as the remote to get correct log suppression and verbosity. - level = getattr(logging, msg["levelname"], logging.CRITICAL) - assert isinstance(level, int) - target_logger = logging.getLogger(msg["name"]) - msg["message"] = "Remote: " + msg["message"] - # In JSON mode, we manually check whether the log message should be propagated. - if logging.getLogger("borg").json and level >= target_logger.getEffectiveLevel(): - sys.stderr.write(json.dumps(msg) + "\n") - else: - target_logger.log(level, "%s", msg["message"]) - elif msg["type"].startswith("progress_"): - # Progress messages are a bit more complex. - # First of all, we check whether progress output is enabled. This is signalled - # through the effective level of the borg.output.progress logger - # (also see ProgressIndicatorBase in borg.helpers). - progress_logger = logging.getLogger("borg.output.progress") - if progress_logger.getEffectiveLevel() == logging.INFO: - # When progress output is enabled, we check whether the client is in - # --log-json mode, as signalled by the "json" attribute on the "borg" logger. - if logging.getLogger("borg").json: - # In --log-json mode we re-emit the progress JSON line as sent by the server, - # with the message, if any, prefixed with "Remote: ". - if "message" in msg: - msg["message"] = "Remote: " + msg["message"] - sys.stderr.write(json.dumps(msg) + "\n") - elif "message" in msg: - # In text log mode we write only the message to stderr and terminate with \r - # (carriage return, i.e. move the write cursor back to the beginning of the line) - # so that the next message, progress or not, overwrites it. This mirrors the behaviour - # of local progress displays. - sys.stderr.write("Remote: " + msg["message"] + "\r") - else: - # We don't know what priority the line had. - logging.getLogger("").warning("stderr/remote: " + line.strip()) - - class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index 932dc0f9c6..56b66a4391 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -21,8 +21,8 @@ from ...helpers import Location from ...helpers import EXIT_SUCCESS from ...helpers import bin_to_hex +from ...logger import teardown_logging from ...manifest import Manifest -from ...logger import setup_logging from ...remote import RemoteRepository from ...repository import Repository from .. import has_lchflags @@ -82,7 +82,10 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b"", binary_outpu except SystemExit as e: output_text.flush() return e.code, output.getvalue() if binary_output else output.getvalue().decode() - ret = archiver.run(args) + try: + ret = archiver.run(args) + finally: + teardown_logging() # usually done via atexit, but we do not exit here output_text.flush() return ret, output.getvalue() if binary_output else output.getvalue().decode() finally: @@ -155,7 +158,6 @@ def tearDown(self): os.chdir(self._old_wd) # note: ignore_errors=True as workaround for issue #862 shutil.rmtree(self.tmpdir, ignore_errors=True) - setup_logging() def cmd(self, *args, **kw): exit_code = kw.pop("exit_code", 0) diff --git a/src/borg/testsuite/archiver/check_cmd.py b/src/borg/testsuite/archiver/check_cmd.py index 9669ee7ac5..390199baf0 100644 --- a/src/borg/testsuite/archiver/check_cmd.py +++ b/src/borg/testsuite/archiver/check_cmd.py @@ -1,4 +1,3 @@ -import logging import shutil import unittest from unittest.mock import patch @@ -26,8 +25,6 @@ def test_check_usage(self): self.assert_in("Starting repository check", output) self.assert_in("Starting archive consistency check", output) self.assert_in("Checking segments", output) - # reset logging to new process default to avoid need for fork=True on next check - logging.getLogger("borg.output.progress").setLevel(logging.NOTSET) output = self.cmd(f"--repo={self.repository_location}", "check", "-v", "--repository-only", exit_code=0) self.assert_in("Starting repository check", output) self.assert_not_in("Starting archive consistency check", output) diff --git a/src/borg/testsuite/helpers.py b/src/borg/testsuite/helpers.py index eba51b311a..3dee5755f5 100644 --- a/src/borg/testsuite/helpers.py +++ b/src/borg/testsuite/helpers.py @@ -34,7 +34,7 @@ from ..helpers import StableDict, bin_to_hex from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams from ..helpers import archivename_validator, text_validator -from ..helpers import ProgressIndicatorPercent, ProgressIndicatorEndless +from ..helpers import ProgressIndicatorPercent from ..helpers import swidth_slice from ..helpers import chunkit from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS @@ -998,65 +998,36 @@ def test_yes_env_output(capfd, monkeypatch): assert "yes" in err -def test_progress_percentage_sameline(capfd, monkeypatch): - # run the test as if it was in a 4x1 terminal - monkeypatch.setenv("COLUMNS", "4") - monkeypatch.setenv("LINES", "1") +def test_progress_percentage(capfd): pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%") pi.logger.setLevel("INFO") pi.show(0) out, err = capfd.readouterr() - assert err == " 0%\r" + assert err == " 0%\n" pi.show(420) pi.show(680) out, err = capfd.readouterr() - assert err == " 42%\r 68%\r" + assert err == " 42%\n 68%\n" pi.show(1000) out, err = capfd.readouterr() - assert err == "100%\r" + assert err == "100%\n" pi.finish() out, err = capfd.readouterr() - assert err == " " * 4 + "\r" - - -@pytest.mark.skipif(is_win32, reason="no working swidth() implementation on this platform") -def test_progress_percentage_widechars(capfd, monkeypatch): - st = "スター・トレック" # "startrek" :-) - assert swidth(st) == 16 - path = "/カーク船長です。" # "Captain Kirk" - assert swidth(path) == 17 - spaces = " " * 4 # to avoid usage of "..." - width = len("100%") + 1 + swidth(st) + 1 + swidth(path) + swidth(spaces) - monkeypatch.setenv("COLUMNS", str(width)) - monkeypatch.setenv("LINES", "1") - pi = ProgressIndicatorPercent(100, step=5, start=0, msg=f"%3.0f%% {st} %s") - pi.logger.setLevel("INFO") - pi.show(0, info=[path]) - out, err = capfd.readouterr() - assert err == f" 0% {st} {path}{spaces}\r" - pi.show(100, info=[path]) - out, err = capfd.readouterr() - assert err == f"100% {st} {path}{spaces}\r" - pi.finish() - out, err = capfd.readouterr() - assert err == " " * width + "\r" + assert err == "\n" -def test_progress_percentage_step(capfd, monkeypatch): - # run the test as if it was in a 4x1 terminal - monkeypatch.setenv("COLUMNS", "4") - monkeypatch.setenv("LINES", "1") +def test_progress_percentage_step(capfd): pi = ProgressIndicatorPercent(100, step=2, start=0, msg="%3.0f%%") pi.logger.setLevel("INFO") pi.show() out, err = capfd.readouterr() - assert err == " 0%\r" + assert err == " 0%\n" pi.show() out, err = capfd.readouterr() assert err == "" # no output at 1% as we have step == 2 pi.show() out, err = capfd.readouterr() - assert err == " 2%\r" + assert err == " 2%\n" def test_progress_percentage_quiet(capfd): @@ -1073,35 +1044,6 @@ def test_progress_percentage_quiet(capfd): assert err == "" -def test_progress_endless(capfd): - pi = ProgressIndicatorEndless(step=1, file=sys.stderr) - pi.show() - out, err = capfd.readouterr() - assert err == "." - pi.show() - out, err = capfd.readouterr() - assert err == "." - pi.finish() - out, err = capfd.readouterr() - assert err == "\n" - - -def test_progress_endless_step(capfd): - pi = ProgressIndicatorEndless(step=2, file=sys.stderr) - pi.show() - out, err = capfd.readouterr() - assert err == "" # no output here as we have step == 2 - pi.show() - out, err = capfd.readouterr() - assert err == "." - pi.show() - out, err = capfd.readouterr() - assert err == "" # no output here as we have step == 2 - pi.show() - out, err = capfd.readouterr() - assert err == "." - - def test_partial_format(): assert partial_format("{space:10}", {"space": " "}) == " " * 10 assert partial_format("{foobar}", {"bar": "wrong", "foobar": "correct"}) == "correct" @@ -1322,19 +1264,19 @@ def os_unlink(_): class TestPassphrase: def test_passphrase_new_verification(self, capsys, monkeypatch): - monkeypatch.setattr(getpass, "getpass", lambda prompt: "12aöäü") + monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü") monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no") Passphrase.new() out, err = capsys.readouterr() - assert "12" not in out - assert "12" not in err + assert "1234" not in out + assert "1234" not in err monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes") passphrase = Passphrase.new() out, err = capsys.readouterr() - assert "313261c3b6c3a4c3bc" not in out - assert "313261c3b6c3a4c3bc" in err - assert passphrase == "12aöäü" + assert "3132333461c3b6c3a4c3bc" not in out + assert "3132333461c3b6c3a4c3bc" in err + assert passphrase == "1234aöäü" monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234/@=") Passphrase.new() diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 93957584a6..2b33bf9c1d 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -1,4 +1,3 @@ -import io import logging import os import shutil @@ -13,7 +12,7 @@ from ..helpers import IntegrityError from ..helpers import msgpack from ..locking import Lock, LockFailed -from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line +from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT from ..repoobj import RepoObj from . import BaseTestCase @@ -98,13 +97,12 @@ def test1(self): self.repository.delete(key50) self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50)) self.repository.commit(compact=False) - self.repository.close() - with self.open() as repository2: - self.assert_raises(Repository.ObjectNotFound, lambda: repository2.get(key50)) - for x in range(100): - if x == 50: - continue - self.assert_equal(pdchunk(repository2.get(H(x))), b"SOMEDATA") + self.reopen() + self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50)) + for x in range(100): + if x == 50: + continue + self.assert_equal(pdchunk(self.repository.get(H(x))), b"SOMEDATA") def test2(self): """Test multiple sequential transactions""" @@ -159,17 +157,14 @@ def test_single_kind_transactions(self): # put self.repository.put(H(0), fchunk(b"foo")) self.repository.commit(compact=False) - self.repository.close() + self.reopen() # replace - self.repository = self.open() - with self.repository: - self.repository.put(H(0), fchunk(b"bar")) - self.repository.commit(compact=False) + self.repository.put(H(0), fchunk(b"bar")) + self.repository.commit(compact=False) + self.reopen() # delete - self.repository = self.open() - with self.repository: - self.repository.delete(H(0)) - self.repository.commit(compact=False) + self.repository.delete(H(0)) + self.repository.commit(compact=False) def test_list(self): for x in range(100): @@ -276,14 +271,11 @@ def test_flags_persistence(self): # we do not set flags for H(0), so we can later check their default state. self.repository.flags(H(1), mask=0x00000007, value=0x00000006) self.repository.commit(compact=False) - self.repository.close() - - self.repository = self.open() - with self.repository: - # we query all flags to check if the initial flags were all zero and - # only the ones we explicitly set to one are as expected. - self.assert_equal(self.repository.flags(H(0), mask=0xFFFFFFFF), 0x00000000) - self.assert_equal(self.repository.flags(H(1), mask=0xFFFFFFFF), 0x00000006) + self.reopen() + # we query all flags to check if the initial flags were all zero and + # only the ones we explicitly set to one are as expected. + self.assert_equal(self.repository.flags(H(0), mask=0xFFFFFFFF), 0x00000000) + self.assert_equal(self.repository.flags(H(1), mask=0xFFFFFFFF), 0x00000006) class LocalRepositoryTestCase(RepositoryTestCaseBase): @@ -337,12 +329,10 @@ def test_uncommitted_garbage(self): last_segment = self.repository.io.get_latest_segment() with open(self.repository.io.segment_filename(last_segment + 1), "wb") as f: f.write(MAGIC + b"crapcrapcrap") - self.repository.close() + self.reopen() # usually, opening the repo and starting a transaction should trigger a cleanup. - self.repository = self.open() - with self.repository: - self.repository.put(H(0), fchunk(b"bar")) # this may trigger compact_segments() - self.repository.commit(compact=True) + self.repository.put(H(0), fchunk(b"bar")) # this may trigger compact_segments() + self.repository.commit(compact=True) # the point here is that nothing blows up with an exception. @@ -385,11 +375,10 @@ def test_replay_lock_upgrade_old(self): os.unlink(os.path.join(self.repository.path, name)) with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade: self.reopen(exclusive=None) # simulate old client that always does lock upgrades - with self.repository: - # the repo is only locked by a shared read lock, but to replay segments, - # we need an exclusive write lock - check if the lock gets upgraded. - self.assert_raises(LockFailed, lambda: len(self.repository)) - upgrade.assert_called_once_with() + # the repo is only locked by a shared read lock, but to replay segments, + # we need an exclusive write lock - check if the lock gets upgraded. + self.assert_raises(LockFailed, lambda: len(self.repository)) + upgrade.assert_called_once_with() def test_replay_lock_upgrade(self): self.add_keys() @@ -398,11 +387,10 @@ def test_replay_lock_upgrade(self): os.unlink(os.path.join(self.repository.path, name)) with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade: self.reopen(exclusive=False) # current client usually does not do lock upgrade, except for replay - with self.repository: - # the repo is only locked by a shared read lock, but to replay segments, - # we need an exclusive write lock - check if the lock gets upgraded. - self.assert_raises(LockFailed, lambda: len(self.repository)) - upgrade.assert_called_once_with() + # the repo is only locked by a shared read lock, but to replay segments, + # we need an exclusive write lock - check if the lock gets upgraded. + self.assert_raises(LockFailed, lambda: len(self.repository)) + upgrade.assert_called_once_with() def test_crash_before_deleting_compacted_segments(self): self.add_keys() @@ -932,7 +920,7 @@ def test_hints_behaviour(self): class RemoteRepositoryTestCase(RepositoryTestCase): repository = None # type: RemoteRepository - def open(self, create=False): + def open(self, create=False, exclusive=UNSPECIFIED): return RemoteRepository( Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")), exclusive=True, create=create ) @@ -1064,75 +1052,3 @@ def test_repair_missing_commit_segment(self): def test_repair_missing_segment(self): # skip this test, files in RemoteRepository cannot be deleted pass - - -class RemoteLoggerTestCase(BaseTestCase): - def setUp(self): - self.stream = io.StringIO() - self.handler = logging.StreamHandler(self.stream) - logging.getLogger().handlers[:] = [self.handler] - logging.getLogger("borg.repository").handlers[:] = [] - logging.getLogger("borg.repository.foo").handlers[:] = [] - # capture stderr - sys.stderr.flush() - self.old_stderr = sys.stderr - self.stderr = sys.stderr = io.StringIO() - - def tearDown(self): - sys.stderr = self.old_stderr - - def test_stderr_messages(self): - handle_remote_line("unstructured stderr message\n") - self.assert_equal(self.stream.getvalue(), "stderr/remote: unstructured stderr message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_post11_format_messages(self): - self.handler.setLevel(logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "borg >= 1.1 format message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_remote_messages_screened(self): - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format info message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "") - - def test_info_to_correct_local_child(self): - logging.getLogger("borg.repository").setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").setLevel(logging.INFO) - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - child_stream = io.StringIO() - child_handler = logging.StreamHandler(child_stream) - child_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository").handlers[:] = [child_handler] - foo_stream = io.StringIO() - foo_handler = logging.StreamHandler(foo_stream) - foo_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler] - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format child message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(foo_stream.getvalue(), "") - self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n") - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "")