Skip to content

Commit b3462eb

Browse files
authored
Add support for custom pg_rewind options (#3484)
### Summary This PR introduces the `postgresql.rewind` configuration parameter, allowing users to pass custom command-line options to `pg_rewind`. This provides flexibility for the PostgreSQL users, and is also a necessary enhancement for supporting TDE (Transparent Data Encryption) on proprietary Postgres, like EPAS (EDB Postgres Advanced Server), which requires passing custom flags during the rewind process. ### Key Changes * **Refactor:** The `process_user_options` function was first moved from `bootstrap.py` to `utils.py` to make it a generic helper. Its unit tests were moved, and a minor bug in its error reporting was fixed. * **Feature:** `patroni/postgresql/rewind.py` now reads the new `postgresql.rewind` configuration parameter, parses it using the `process_user_options` helper, and appends the resulting flags to the `pg_rewind` command. * **Testing:** New unit tests have been added to `test_rewind.py` to validate the new feature, including checking for disallowed options. * **Docs:** The new `postgresql.rewind` parameter is documented in `yaml_configuration.rst` with a usage example. * **Validation:** The configuration schema in `validator.py` is updated to include the new parameter. --------- Signed-off-by: Israel Barth Rubio <israel.barth@enterprisedb.com>
1 parent 024930e commit b3462eb

File tree

8 files changed

+220
-141
lines changed

8 files changed

+220
-141
lines changed

docs/yaml_configuration.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,16 @@ PostgreSQL
320320
- **- mapname1 systemname2 pguser2**
321321
- **pg\_ctl\_timeout**: How long should pg_ctl wait when doing ``start``, ``stop`` or ``restart``. Default value is 60 seconds.
322322
- **use\_pg\_rewind**: try to use pg\_rewind on the former leader when it joins cluster as a replica. Either the cluster must be initialized with ``data page checksums`` (``--data-checksums`` option for ``initdb``) and/or ``wal_log_hints`` must be set to ``on``, or ``pg_rewind`` will not work.
323+
- **rewind**: (optional) custom options to pass to the ``pg_rewind`` command. Can be specified as a list of strings and/or single key-value dictionaries. Not allowed options include: ``target-pgdata``, ``source-pgdata``, ``source-server``, ``write-recovery-conf``, ``dry-run``, ``restore-target-wal``, ``config-file``, ``no-ensure-shutdown``, ``version``, and ``help``. Example usage:
324+
325+
.. code:: YAML
326+
327+
postgresql:
328+
rewind:
329+
- debug
330+
- progress
331+
- sync-method: fsync
332+
323333
- **remove\_data\_directory\_on\_rewind\_failure**: If this option is enabled, Patroni will remove the PostgreSQL data directory and recreate the replica. Otherwise it will try to follow the new leader. Default value is **false**.
324334
- **remove\_data\_directory\_on\_diverged\_timelines**: Patroni will remove the PostgreSQL data directory and recreate the replica if it notices that timelines are diverging and the former primary can not start streaming from the new primary. This option is useful when ``pg_rewind`` can not be used. While performing timelines divergence check on PostgreSQL v10 and older Patroni will try to connect with replication credential to the "postgres" database. Hence, such access should be allowed in the pg_hba.conf. Default value is **false**.
325335
- **replica\_method**: for each create_replica_methods other than basebackup, you would add a configuration section of the same name. At a minimum, this should include "command" with a full path to the actual script to be executed. Other configuration parameters will be passed along to the script in the form "parameter=value".

patroni/postgresql/bootstrap.py

Lines changed: 4 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
import tempfile
55
import time
66

7-
from typing import Any, Callable, cast, Dict, List, Optional, Tuple, TYPE_CHECKING, Union
7+
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
88

99
from ..async_executor import CriticalTask
1010
from ..collections import EMPTY_DICT
1111
from ..dcs import Leader, Member, RemoteMember
1212
from ..psycopg import quote_ident, quote_literal
13-
from ..utils import deep_compare, unquote
13+
from ..utils import deep_compare, process_user_options
1414
from .misc import PostgresqlState
1515

1616
if TYPE_CHECKING: # pragma: no cover
@@ -33,95 +33,14 @@ def running_custom_bootstrap(self) -> bool:
3333
def keep_existing_recovery_conf(self) -> bool:
3434
return self._running_custom_bootstrap and self._keep_existing_recovery_conf
3535

36-
@staticmethod
37-
def process_user_options(tool: str, options: Any,
38-
not_allowed_options: Tuple[str, ...],
39-
error_handler: Callable[[str], None]) -> List[str]:
40-
"""Format *options* in a list or dictionary format into command line long form arguments.
41-
42-
.. note::
43-
The format of the output of this method is to prepare arguments for use in the ``initdb``
44-
method of `self._postgres`.
45-
46-
:Example:
47-
48-
The *options* can be defined as a dictionary of key, values to be converted into arguments:
49-
>>> Bootstrap.process_user_options('foo', {'foo': 'bar'}, (), print)
50-
['--foo=bar']
51-
52-
Or as a list of single string arguments
53-
>>> Bootstrap.process_user_options('foo', ['yes'], (), print)
54-
['--yes']
55-
56-
Or as a list of key, value options
57-
>>> Bootstrap.process_user_options('foo', [{'foo': 'bar'}], (), print)
58-
['--foo=bar']
59-
60-
Or a combination of single and key, values
61-
>>> Bootstrap.process_user_options('foo', ['yes', {'foo': 'bar'}], (), print)
62-
['--yes', '--foo=bar']
63-
64-
Options that contain spaces are passed as is to ``subprocess.call``
65-
>>> Bootstrap.process_user_options('foo', [{'foo': 'bar baz'}], (), print)
66-
['--foo=bar baz']
67-
68-
Options that are quoted will be unquoted, so the quotes aren't interpreted
69-
literally by the postgres command
70-
>>> Bootstrap.process_user_options('foo', [{'foo': '"bar baz"'}], (), print)
71-
['--foo=bar baz']
72-
73-
.. note::
74-
The *error_handler* is called when any of these conditions are met:
75-
76-
* Key, value dictionaries in the list form contains multiple keys.
77-
* If a key is listed in *not_allowed_options*.
78-
* If the options list is not in the required structure.
79-
80-
:param tool: The name of the tool used in error reports to *error_handler*
81-
:param options: Options to parse as a list of key, values or single values, or a dictionary
82-
:param not_allowed_options: List of keys that cannot be used in the list of key, value formatted options
83-
:param error_handler: A function which will be called when an error condition is encountered
84-
:returns: List of long form arguments to pass to the named tool
85-
"""
86-
user_options: List[str] = []
87-
88-
def option_is_allowed(name: str) -> bool:
89-
ret = name not in not_allowed_options
90-
if not ret:
91-
error_handler('{0} option for {1} is not allowed'.format(name, tool))
92-
return ret
93-
94-
if isinstance(options, dict):
95-
for key, val in cast(Dict[str, str], options).items():
96-
if key and val:
97-
user_options.append('--{0}={1}'.format(key, unquote(val)))
98-
elif isinstance(options, list):
99-
for opt in cast(List[Any], options):
100-
if isinstance(opt, str) and option_is_allowed(opt):
101-
user_options.append('--{0}'.format(opt))
102-
elif isinstance(opt, dict):
103-
args = cast(Dict[str, Any], opt)
104-
keys = list(args.keys())
105-
if len(keys) == 1 and isinstance(args[keys[0]], str) and option_is_allowed(keys[0]):
106-
user_options.append('--{0}={1}'.format(keys[0], unquote(args[keys[0]])))
107-
else:
108-
error_handler('Error when parsing {0} key-value option {1}: only one key-value is allowed'
109-
' and value should be a string'.format(tool, args[keys[0]]))
110-
else:
111-
error_handler('Error when parsing {0} option {1}: value should be string value'
112-
' or a single key-value pair'.format(tool, opt))
113-
else:
114-
error_handler('{0} options must be list or dict'.format(tool))
115-
return user_options
116-
11736
def _initdb(self, config: Any) -> bool:
11837
self._postgresql.set_state(PostgresqlState.INITDB)
11938
not_allowed_options = ('pgdata', 'nosync', 'pwfile', 'sync-only', 'version')
12039

12140
def error_handler(e: str) -> None:
12241
raise Exception(e)
12342

124-
options = self.process_user_options('initdb', config or [], not_allowed_options, error_handler)
43+
options = process_user_options('initdb', config or [], not_allowed_options, error_handler)
12544
pwfile = None
12645

12746
if self._postgresql.config.superuser:
@@ -341,7 +260,7 @@ def basebackup(self, conn_url: str, env: Dict[str, str], options: Dict[str, Any]
341260
ret = 1
342261
not_allowed_options = ('pgdata', 'format', 'wal-method', 'xlog-method', 'gzip',
343262
'version', 'compress', 'dbname', 'host', 'port', 'username', 'password')
344-
user_options = self.process_user_options('basebackup', options, not_allowed_options, logger.error)
263+
user_options = process_user_options('basebackup', options, not_allowed_options, logger.error)
345264
cmd = [
346265
self._postgresql.pgcommand("pg_basebackup"),
347266
"--pgdata=" + self._postgresql.data_dir,

patroni/postgresql/rewind.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ..async_executor import CriticalTask
1313
from ..collections import EMPTY_DICT
1414
from ..dcs import Leader, RemoteMember
15+
from ..utils import process_user_options
1516
from . import Postgresql
1617
from .connection import get_connection_cursor
1718
from .misc import format_lsn, fsync_dir, parse_history, parse_lsn, PostgresqlRole
@@ -443,6 +444,10 @@ def pg_rewind(self, conn_kwargs: Dict[str, Any]) -> bool:
443444
444445
:returns: ``True`` if ``pg_rewind`` finished successfully, ``False`` otherwise.
445446
"""
447+
options = self._postgresql.config.get('rewind', [])
448+
not_allowed_options = ('target-pgdata', 'source-pgdata', 'source-server', 'write-recovery-conf', 'dry-run',
449+
'restore-target-wal', 'config-file', 'no-ensure-shutdown', 'version', 'help')
450+
user_options = process_user_options('rewind', options, not_allowed_options, logger.error)
446451
# prepare pg_rewind connection string
447452
env = self._postgresql.config.write_pgpass(conn_kwargs)
448453
env.update(LANG='C', LC_ALL='C', PGOPTIONS='-c statement_timeout=0')
@@ -466,6 +471,7 @@ def pg_rewind(self, conn_kwargs: Dict[str, Any]) -> bool:
466471
cmd.append('--config-file={0}'.format(self._postgresql.config.postgresql_conf))
467472

468473
cmd.extend(['-D', self._postgresql.data_dir, '--source-server', dsn])
474+
cmd.extend(user_options)
469475

470476
while True:
471477
results: Dict[str, bytes] = {}

patroni/utils.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,3 +1280,86 @@ def get_major_version(bin_dir: Optional[str] = None, bin_name: str = 'postgres')
12801280
"""
12811281
full_version = get_postgres_version(bin_dir, bin_name)
12821282
return re.sub(r'\.\d+$', '', full_version)
1283+
1284+
1285+
def process_user_options(tool: str, options: Any,
1286+
not_allowed_options: Tuple[str, ...],
1287+
error_handler: Callable[[str], None]) -> List[str]:
1288+
"""Format *options* in a list or dictionary format into command line long form arguments.
1289+
1290+
:Example:
1291+
1292+
The *options* can be defined as a dictionary of key, values to be converted into arguments:
1293+
>>> process_user_options('foo', {'foo': 'bar'}, (), print)
1294+
['--foo=bar']
1295+
1296+
Or as a list of single string arguments
1297+
>>> process_user_options('foo', ['yes'], (), print)
1298+
['--yes']
1299+
1300+
Or as a list of key, value options
1301+
>>> process_user_options('foo', [{'foo': 'bar'}], (), print)
1302+
['--foo=bar']
1303+
1304+
Or a combination of single and key, values
1305+
>>> process_user_options('foo', ['yes', {'foo': 'bar'}], (), print)
1306+
['--yes', '--foo=bar']
1307+
1308+
Options that contain spaces are passed as is to ``subprocess.call``
1309+
>>> process_user_options('foo', [{'foo': 'bar baz'}], (), print)
1310+
['--foo=bar baz']
1311+
1312+
Options that are quoted will be unquoted, so the quotes aren't interpreted
1313+
literally by the postgres command
1314+
>>> process_user_options('foo', [{'foo': '"bar baz"'}], (), print)
1315+
['--foo=bar baz']
1316+
1317+
.. note::
1318+
The *error_handler* is called when any of these conditions are met:
1319+
1320+
* Key, value dictionaries in the list form contains multiple keys.
1321+
* If a key is listed in *not_allowed_options*.
1322+
* If the options list is not in the required structure.
1323+
1324+
:param tool: The name of the tool used in error reports to *error_handler*
1325+
:param options: Options to parse as a list of key, values or single values, or a dictionary
1326+
:param not_allowed_options: List of keys that cannot be used in the list of key, value formatted options
1327+
:param error_handler: A function which will be called when an error condition is encountered
1328+
:returns: List of long form arguments to pass to the named tool
1329+
"""
1330+
user_options: List[str] = []
1331+
1332+
def option_is_allowed(name: str) -> bool:
1333+
ret = name not in not_allowed_options
1334+
if not ret:
1335+
error_handler('{0} option for {1} is not allowed'.format(name, tool))
1336+
return ret
1337+
1338+
if isinstance(options, dict):
1339+
for key, val in cast(Dict[str, str], options).items():
1340+
if key and val:
1341+
user_options.append('--{0}={1}'.format(key, unquote(val)))
1342+
elif isinstance(options, list):
1343+
for opt in cast(List[Any], options):
1344+
if isinstance(opt, str):
1345+
# This if needs to be nested, otherwise we confuse the user by logging two errors -- one issued by
1346+
# option_is_allowed and another by the else clause below.
1347+
if option_is_allowed(opt):
1348+
user_options.append('--{0}'.format(opt))
1349+
elif isinstance(opt, dict):
1350+
args = cast(Dict[str, Any], opt)
1351+
keys = list(args.keys())
1352+
if len(keys) == 1 and isinstance(args[keys[0]], str):
1353+
# This if needs to be nested, otherwise we confuse the user by logging two errors -- one issued by
1354+
# option_is_allowed and another by the else clause below.
1355+
if option_is_allowed(keys[0]):
1356+
user_options.append('--{0}={1}'.format(keys[0], unquote(args[keys[0]])))
1357+
else:
1358+
error_handler('Error when parsing {0} key-value option {1}: only one key-value is allowed'
1359+
' and value should be a string'.format(tool, args[keys[0]]))
1360+
else:
1361+
error_handler('Error when parsing {0} option {1}: value should be string value'
1362+
' or a single key-value pair'.format(tool, opt))
1363+
else:
1364+
error_handler('{0} options must be list or dict'.format(tool))
1365+
return user_options

patroni/validator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,8 @@ def validate_watchdog_mode(value: Any) -> None:
10581058
Optional("max_worker_processes"): IntValidator(0, 262143, raise_assert=True),
10591059
},
10601060
Optional("use_pg_rewind"): bool,
1061+
Optional("rewind"): [Or(str, dict)],
1062+
Optional("basebackup"): [Or(str, dict)],
10611063
Optional("pg_hba"): [str],
10621064
Optional("pg_ident"): [str],
10631065
Optional("pg_ctl_timeout"): IntValidator(min=0, raise_assert=True),
@@ -1178,7 +1180,9 @@ def validate_watchdog_mode(value: Any) -> None:
11781180
Optional("pg_hba"): [str],
11791181
Optional("pg_ident"): [str],
11801182
Optional("pg_ctl_timeout"): IntValidator(min=0, raise_assert=True),
1181-
Optional("use_pg_rewind"): bool
1183+
Optional("use_pg_rewind"): bool,
1184+
Optional("rewind"): [Or(str, dict)],
1185+
Optional("basebackup"): [Or(str, dict)],
11821186
},
11831187
Optional("watchdog"): {
11841188
Optional("mode"): validate_watchdog_mode,

tests/test_bootstrap.py

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import os
2-
import sys
32

43
from unittest.mock import Mock, patch, PropertyMock
54

@@ -115,58 +114,6 @@ def test__initdb(self):
115114
self.assertRaises(Exception, self.b.bootstrap, {'initdb': [1]})
116115
self.assertRaises(Exception, self.b.bootstrap, {'initdb': 1})
117116

118-
def test__process_user_options(self):
119-
def error_handler(msg):
120-
raise Exception(msg)
121-
122-
self.assertEqual(self.b.process_user_options('initdb', ['string'], (), error_handler), ['--string'])
123-
self.assertEqual(
124-
self.b.process_user_options(
125-
'initdb',
126-
[{'key': 'value'}],
127-
(), error_handler
128-
),
129-
['--key=value'])
130-
if sys.platform != 'win32':
131-
self.assertEqual(
132-
self.b.process_user_options(
133-
'initdb',
134-
[{'key': 'value with spaces'}],
135-
(), error_handler
136-
),
137-
["--key=value with spaces"])
138-
self.assertEqual(
139-
self.b.process_user_options(
140-
'initdb',
141-
[{'key': "'value with spaces'"}],
142-
(), error_handler
143-
),
144-
["--key=value with spaces"])
145-
self.assertEqual(
146-
self.b.process_user_options(
147-
'initdb',
148-
{'key': 'value with spaces'},
149-
(), error_handler
150-
),
151-
["--key=value with spaces"])
152-
self.assertEqual(
153-
self.b.process_user_options(
154-
'initdb',
155-
{'key': "'value with spaces'"},
156-
(), error_handler
157-
),
158-
["--key=value with spaces"])
159-
# not allowed options in list of dicts/strs are filtered out
160-
self.assertEqual(
161-
self.b.process_user_options(
162-
'pg_basebackup',
163-
[{'checkpoint': 'fast'}, {'dbname': 'dbname=postgres'}, 'gzip', {'label': 'standby'}, 'verbose'],
164-
('dbname', 'verbose'),
165-
print
166-
),
167-
['--checkpoint=fast', '--gzip', '--label=standby'],
168-
)
169-
170117
@patch.object(CancellableSubprocess, 'call', Mock())
171118
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
172119
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))

0 commit comments

Comments
 (0)