From 0a484ebc07edb5605a6fa976c2de05ecb6936b0a Mon Sep 17 00:00:00 2001 From: TNeitzel Date: Thu, 10 Feb 2022 22:45:17 +0100 Subject: [PATCH] Add proof of concept for pull mechanism Added a small proof of concept for a pull mechanism implementation. Works fine for simple test cases, but probably needs many improvements to be useful for more complex usage scenarios. --- src/borg/archiver.py | 9 ++++- src/borg/helpers/parseformat.py | 10 +++++ src/borg/remote.py | 68 ++++++++++++++++++++++----------- 3 files changed, 63 insertions(+), 24 deletions(-) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 35ea637efb..cd442fe643 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -174,10 +174,11 @@ def wrapper(self, args, **kwargs): make_parent_dirs = getattr(args, 'make_parent_dirs', False) if argument(args, fake) ^ invert_fake: return method(self, args, repository=None, **kwargs) - elif location.proto == 'ssh': + + elif location.proto == 'ssh' or location.proto == 'serve': repository = RemoteRepository(location.omit_archive(), create=create, exclusive=argument(args, exclusive), lock_wait=self.lock_wait, lock=lock, append_only=append_only, - make_parent_dirs=make_parent_dirs, args=args) + make_parent_dirs=make_parent_dirs, args=args, serve=(location.proto == 'serve')) else: repository = Repository(location.path, create=create, exclusive=argument(args, exclusive), lock_wait=self.lock_wait, lock=lock, append_only=append_only, @@ -296,6 +297,7 @@ def do_serve(self, args): restrict_to_repositories=args.restrict_to_repositories, append_only=args.append_only, storage_quota=args.storage_quota, + pull_command=args.pull_command ).serve() def do_version(self, args): @@ -5252,6 +5254,9 @@ def diff_sort_spec_validator(s): 'When a new repository is initialized, sets the storage quota on the new ' 'repository as well. Default: no quota.') + subparser.add_argument('--pull-command', metavar='cmd', dest='pull_command', + help='command to use for pulling from a borg server started in serve:// mode') + # borg umount umount_epilog = process_epilog(""" This command unmounts a FUSE filesystem that was mounted with ``borg mount``. diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index cf5e117e55..67c2cd0629 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -381,6 +381,10 @@ class Location: (?::(?P\d+))? # :port (optional) """ + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive + serve_re = re.compile(r""" + (?Pserve):// # serve:// + """ + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive + file_re = re.compile(r""" (?Pfile):// # file:// """ + file_path_re + optional_archive_re, re.VERBOSE) # servername/path, path or path::archive @@ -460,6 +464,12 @@ def normpath_special(p): self.path = normpath_special(m.group('path')) self.archive = m.group('archive') return True + m = self.serve_re.match(text) + if m: + self.proto = m.group('proto') + self.path = normpath_special(m.group('path')) + self.archive = m.group('archive') + return True m = self.file_re.match(text) if m: self.proto = m.group('proto') diff --git a/src/borg/remote.py b/src/borg/remote.py index 2e20190db2..50e5421e1e 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -176,7 +176,7 @@ class RepositoryServer: # pragma: no cover 'inject_exception', ) - def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota): + def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, pull_command=None): self.repository = None self.restrict_to_paths = restrict_to_paths self.restrict_to_repositories = restrict_to_repositories @@ -187,6 +187,7 @@ def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, sto self.append_only = append_only self.storage_quota = storage_quota self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information + self.pull_command = pull_command def positional_to_named(self, method, argv): """Translate from positional protocol to named protocol.""" @@ -206,13 +207,27 @@ def filter_args(self, f, kwargs): return {name: kwargs[name] for name in kwargs if name in known} def serve(self): - stdin_fd = sys.stdin.fileno() - stdout_fd = sys.stdout.fileno() - stderr_fd = sys.stdout.fileno() - os.set_blocking(stdin_fd, False) - os.set_blocking(stdout_fd, True) - os.set_blocking(stderr_fd, True) + + if self.pull_command: + self.p = Popen(shlex.split(self.pull_command), bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE) + + stdin_fd = self.p.stdout.fileno() + stdout_fd = self.p.stdin.fileno() + stderr_fd = sys.stderr.fileno() + os.set_blocking(stdin_fd, False) + os.set_blocking(stdout_fd, False) + os.set_blocking(stderr_fd, False) + + else: + stdin_fd = sys.stdin.fileno() + stdout_fd = sys.stdout.fileno() + stderr_fd = sys.stdout.fileno() + os.set_blocking(stdin_fd, False) + os.set_blocking(stdout_fd, True) + os.set_blocking(stderr_fd, True) + unpacker = get_limited_unpacker('server') + while True: r, w, es = select.select([stdin_fd], [], [], 10) if r: @@ -549,7 +564,7 @@ def required_version(self): dictFormat = False # outside of __init__ for testing of legacy free protocol def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, - make_parent_dirs=False, args=None): + make_parent_dirs=False, args=None, serve=False): self.location = self._location = location self.preload_ids = [] self.msgid = 0 @@ -571,20 +586,29 @@ def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock testing = location.host == '__testsuite__' # when testing, we invoke and talk to a borg process directly (no ssh). # when not testing, we invoke the system-installed ssh binary to talk to a remote borg. - env = prepare_subprocess_env(system=not testing) - borg_cmd = self.borg_cmd(args, testing) - if not testing: - borg_cmd = self.ssh_cmd(location) + borg_cmd - logger.debug('SSH command line: %s', borg_cmd) - # we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg. - # borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection. - self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint) - self.stdin_fd = self.p.stdin.fileno() - self.stdout_fd = self.p.stdout.fileno() - self.stderr_fd = self.p.stderr.fileno() - os.set_blocking(self.stdin_fd, False) - os.set_blocking(self.stdout_fd, False) - os.set_blocking(self.stderr_fd, False) + if serve: + self.stdin_fd = sys.stdout.fileno() + self.stdout_fd = sys.stdin.fileno() + self.stderr_fd = sys.stderr.fileno() + + os.set_blocking(self.stdin_fd, True) + os.set_blocking(self.stdout_fd, False) + + else: + env = prepare_subprocess_env(system=not testing) + borg_cmd = self.borg_cmd(args, testing) + if not testing: + borg_cmd = self.ssh_cmd(location) + borg_cmd + logger.debug('SSH command line: %s', borg_cmd) + # we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg. + # borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection. + self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint) + self.stdin_fd = self.p.stdin.fileno() + self.stdout_fd = self.p.stdout.fileno() + self.stderr_fd = self.p.stderr.fileno() + os.set_blocking(self.stdin_fd, False) + os.set_blocking(self.stdout_fd, False) + os.set_blocking(self.stderr_fd, False) self.r_fds = [self.stdout_fd, self.stderr_fd] self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]