Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.5.1-dev

* [FEAT] add `DecodeOptions.strict_merge` for Node `qs` 6.15 `strictMerge` parity
* [FIX] align `decode` `list_limit` semantics with Node `qs` `arrayLimit` as a maximum element count
* [FIX] combine bracket-array duplicate assignments regardless of `DecodeOptions.duplicates`
* [FIX] align `decode` with Node `qs` 6.15.2 by normalizing dotted keys before preserving `depth=0` input
* [FIX] align `encode` with Node `qs` 6.15.2 by using the configured delimiter after `charset_sentinel`

Expand Down
42 changes: 37 additions & 5 deletions docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,38 @@ change the behavior when duplicate keys are encountered
qs.DecodeOptions(duplicates=qs.Duplicates.LAST),
) == {'foo': 'baz'}

Bracket-array keys always combine, regardless of the duplicate strategy:

.. code:: python

import qs_codec as qs

assert qs.decode(
'a=1&a=2&b[]=1&b[]=2',
qs.DecodeOptions(duplicates=qs.Duplicates.LAST),
) == {'a': '2', 'b': ['1', '2']}

When a key appears as both an object and a scalar,
:py:attr:`strict_merge <qs_codec.models.decode_options.DecodeOptions.strict_merge>` wraps the conflicting values in a
``list`` by default:

.. code:: python

import qs_codec as qs

assert qs.decode('a[b]=c&a=d') == {'a': [{'b': 'c'}, 'd']}

Set ``strict_merge`` to ``False`` to restore the legacy behavior, where non-empty string scalars become object keys:

.. code:: python

import qs_codec as qs

assert qs.decode(
'a[b]=c&a=d',
qs.DecodeOptions(strict_merge=False),
) == {'a': {'b': 'c', 'd': True}}

If you have to deal with legacy browsers or services, there’s also
support for decoding percent-encoded octets as :py:attr:`LATIN1 <qs_codec.enums.charset.Charset.LATIN1>`:

Expand Down Expand Up @@ -310,11 +342,11 @@ Note that an empty ``str``\ing is also a value and will be preserved:
assert qs.decode('a[0]=b&a[1]=&a[2]=c') == {'a': ['b', '', 'c']}

:py:attr:`decode <qs_codec.decode>` will also limit specifying indices
in a ``list`` to a maximum index of ``20``. Any ``list`` members with an
index of greater than ``20`` will instead be converted to a ``dict`` with
the index as the key. This is needed to handle cases when someone sent,
for example, ``a[999999999]`` and it will take significant time to iterate
over this huge ``list``.
in a ``list`` to a maximum element count of ``20``. Index ``19`` is the
last index that can create a default ``list``; index ``20`` and higher
are converted to a ``dict`` with the index as the key. This is needed to
handle cases when someone sent, for example, ``a[999999999]`` and it
would take significant time to iterate over this huge ``list``.

.. code:: python

Expand Down
75 changes: 46 additions & 29 deletions src/qs_codec/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
from .enums.duplicates import Duplicates
from .enums.sentinel import Sentinel
from .models.decode_options import DecodeOptions
from .models.overflow_dict import OverflowDict
from .models.overflow_dict import CommaOverflowDict, OverflowDict
from .models.structured_key_scan import StructuredKeyScan
from .models.undefined import UNDEFINED
from .utils.decode_utils import DecodeUtils
from .utils.utils import Utils


def _list_limit_exceeded_message(limit: int) -> str:
return f"List limit exceeded: Only {limit} element{'' if limit == 1 else 's'} allowed in a list."


def decode(
value: t.Optional[t.Union[str, Mapping[str, t.Any]]],
options: t.Optional[DecodeOptions] = None,
Expand Down Expand Up @@ -86,9 +90,7 @@ def decode(
parse_lists_effective = False

if decode_from_string:
temp_obj: t.Optional[t.Dict[str, t.Any]] = _parse_query_string_values(
str_value, opts, parse_lists=parse_lists_effective
)
temp_obj: t.Optional[t.Dict[str, t.Any]] = _parse_query_string_values(str_value, opts)
else:
temp_obj = dict(mapping_value)
if not temp_obj:
Expand Down Expand Up @@ -221,20 +223,26 @@ def _interpret_numeric_entities(value: str) -> str:
return re.sub(r"&#(\d+);", lambda match: chr(int(match.group(1))), value)


def _parse_array_value(value: t.Any, options: DecodeOptions, current_list_length: int) -> t.Any:
def _parse_array_value(
value: t.Any,
options: DecodeOptions,
current_list_length: int,
*,
enforce_comma_limit: bool = True,
) -> t.Any:
"""Post-process a raw scalar for list semantics and enforce ``list_limit``.

Behavior
--------
- If ``comma=True`` and ``value`` is a string that contains commas, split into a list.
When ``enforce_comma_limit`` is ``True``, over-limit comma values raise or degrade to an ``OverflowDict`` here.
Raw query-string parsing passes ``False`` so the caller can account for bracket-array key context first.
- Otherwise, enforce the per-list length limit by comparing ``current_list_length`` to ``options.list_limit``.
When ``raise_on_limit_exceeded=True``, violations raise ``ValueError``.
- When ``list_limit`` is negative:
* if ``raise_on_limit_exceeded=True``, **any** list-growth operation here (e.g., comma-splitting)
raises immediately;
* if ``raise_on_limit_exceeded=False`` (default), comma-splitting still returns a list; numeric
bracket indices are handled later by ``_parse_object`` (where negative ``list_limit`` disables
numeric-index parsing only).
- When ``list_limit`` is negative, any non-empty comma split exceeds the limit: raising mode raises,
while non-raising mode degrades to an ``OverflowDict``/``CommaOverflowDict``. Raw query-string
parsing temporarily returns the split list when ``enforce_comma_limit=False`` so the caller can
apply bracket-array wrapping before the final limit check.
Comment thread
techouse marked this conversation as resolved.

Returns
-------
Expand All @@ -243,23 +251,19 @@ def _parse_array_value(value: t.Any, options: DecodeOptions, current_list_length
"""
if isinstance(value, str) and value and options.comma and "," in value:
split_val: t.List[str] = value.split(",")
if options.raise_on_limit_exceeded and len(split_val) > options.list_limit:
raise ValueError(
f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list."
)
if enforce_comma_limit and len(split_val) > options.list_limit:
if options.raise_on_limit_exceeded:
raise ValueError(_list_limit_exceeded_message(options.list_limit))
return CommaOverflowDict({str(i): item for i, item in enumerate(split_val)})
return split_val

if options.raise_on_limit_exceeded and current_list_length >= options.list_limit:
raise ValueError(
f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list."
)
raise ValueError(_list_limit_exceeded_message(options.list_limit))

Comment thread
techouse marked this conversation as resolved.
return value


def _parse_query_string_values(
value: str, options: DecodeOptions, *, parse_lists: t.Optional[bool] = None
) -> t.Dict[str, t.Any]:
def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str, t.Any]:
"""Tokenize a raw query string into a flat ``Dict[str, Any]``.

Responsibilities
Expand All @@ -273,7 +277,7 @@ def _parse_query_string_values(
* Decode key/value via ``options.decoder`` (default: percent-decoding using the selected ``charset``).
Keys are passed with ``kind=DecodeKind.KEY`` and values with ``kind=DecodeKind.VALUE``; a custom decoder
may return the raw token or ``None``.
* Apply comma-split list logic to values (handled here). Index-based list growth from bracket segments is applied later in ``_parse_object``. When ``list_limit < 0`` and ``raise_on_limit_exceeded=True``, any comma-split that would increase the list length raises immediately; otherwise the split proceeds.
* Apply comma-split list logic to values (handled here). Index-based list growth from bracket segments is applied later in ``_parse_object``. When ``list_limit < 0``, comma-split values always exceed the limit: they raise under ``raise_on_limit_exceeded=True`` and degrade to overflow dictionaries otherwise.
* Interpret numeric entities for Latin-1 when requested.
* Handle empty brackets ``[]`` as list markers (wrapping exactly once).
* Merge duplicate keys according to ``duplicates`` policy.
Expand All @@ -282,7 +286,6 @@ def _parse_query_string_values(
``_parse_keys`` / ``_parse_object``.
"""
obj: t.Dict[str, t.Any] = {}
parse_lists_enabled = options.parse_lists if parse_lists is None else parse_lists

clean_str: str = value.replace("?", "", 1) if options.ignore_query_prefix else value
# Normalize %5B/%5D to literal brackets before splitting (case-insensitive).
Expand Down Expand Up @@ -354,9 +357,11 @@ def _parse_query_string_values(
continue
bracket_equals_pos: int = part.find("]=")
pos: int = part.find("=") if bracket_equals_pos == -1 else (bracket_equals_pos + 1)
bracket_array_assignment = pos != -1 and "[]=" in part

Comment thread
techouse marked this conversation as resolved.
# Decode key and value with a key-aware decoder; skip pairs whose key decodes to None
raw_key = ""
list_limit_exceeded = False
if pos == -1:
key_decoded = decoder_fn(part, charset, kind=DecodeKind.KEY)
if key_decoded is None:
Expand All @@ -377,7 +382,9 @@ def _parse_query_string_values(
part[pos + 1 :],
options,
len(obj[key]) if key in obj and isinstance(obj[key], (list, tuple)) else 0,
enforce_comma_limit=False,
)
list_limit_exceeded = isinstance(parsed_value, (list, tuple)) and len(parsed_value) > options.list_limit
if isinstance(parsed_value, (list, tuple)):
Comment thread
techouse marked this conversation as resolved.
val = [decoder_fn(v, charset, kind=DecodeKind.VALUE) for v in parsed_value]
else:
Expand All @@ -390,15 +397,21 @@ def _parse_query_string_values(

# Upstream parity: if token contains "[]=", only wrap values that are already arrays
# (typically produced by comma splitting), preserving list-of-lists semantics.
if parse_lists_enabled and pos != -1 and "[]=" in part and isinstance(val, (list, tuple)):
if bracket_array_assignment and isinstance(val, (list, tuple)):
val = [val]
list_limit_exceeded = len(val) > options.list_limit
if list_limit_exceeded and isinstance(val, (list, tuple)):
if options.raise_on_limit_exceeded:
raise ValueError(_list_limit_exceeded_message(options.list_limit))
val = CommaOverflowDict({str(i): item for i, item in enumerate(val)})

existing: bool = key in obj
part_duplicates = Duplicates.COMBINE if bracket_array_assignment else duplicates

# Combine/overwrite according to the configured duplicates policy.
if existing and duplicates == Duplicates.COMBINE:
if existing and part_duplicates == Duplicates.COMBINE:
obj[key] = Utils.combine(obj[key], val, options)
elif not existing or duplicates == Duplicates.LAST:
elif not existing or part_duplicates == Duplicates.LAST:
obj[key] = val

return obj
Expand Down Expand Up @@ -518,10 +531,14 @@ def _parse_object(
and root != decoded_root
and str(index) == decoded_root
and parse_lists_enabled
and index <= options.list_limit
):
obj = [UNDEFINED for _ in range(index + 1)]
obj[index] = leaf
if index < options.list_limit:
obj = [UNDEFINED for _ in range(index + 1)]
obj[index] = leaf
elif options.raise_on_limit_exceeded:
raise ValueError(_list_limit_exceeded_message(options.list_limit))
else:
obj[decoded_root] = leaf
else:
# Preserve the literal decoded key for non-array roots (e.g. "[01]" -> "01"),
# matching Node `qs` behavior for leading-zero numeric-like segments.
Expand Down
26 changes: 19 additions & 7 deletions src/qs_codec/models/decode_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ class DecodeOptions:
"""Set to ``True`` to allow empty ``list`` values inside ``dict``\\s in the encoded input."""

list_limit: int = 20
"""Maximum number of **indexed** items allowed in a single list (default: ``20``).
"""Maximum number of items allowed in a single decoded list (default: ``20``).

During decoding, keys like ``a[0]``, ``a[1]``, … are treated as list indices. If an
index exceeds this limit, the container is treated as a ``dict`` instead, with the
numeric index kept as a string key (e.g., ``{"999": "x"}``) to prevent creation of
massive sparse lists (e.g., ``a[999999999]``).

This limit also applies to comma–split lists when ``comma=True``. Set a larger value if
you explicitly need more items, or set a smaller one to harden against abuse.
index is greater than or equal to this limit, the container is treated as a ``dict``
instead, with the numeric index kept as a string key (e.g., ``{"999": "x"}``) to
prevent creation of massive sparse lists (e.g., ``a[999999999]``). With the default
limit, index ``19`` is the last index that can create a list; index ``20`` already
overflows to a ``dict``.

This limit also applies to decoded list growth from comma-split values when ``comma=True``.
For bracket-array assignments such as ``foo[]=1,2,3``, the comma-split payload is wrapped
as a single outer list element, so the inner payload may contain more values than
``list_limit`` while still respecting the outer container limit.
"""

charset: Charset = Charset.UTF8
Expand Down Expand Up @@ -140,6 +144,14 @@ class DecodeOptions:
Prefer ``decoder`` which may optionally accept a ``kind`` argument. When both are supplied,
``decoder`` takes precedence (mirroring Kotlin/C#/Swift/Dart behavior)."""

strict_merge: bool = True
"""Wrap object/scalar conflicts in a list.

When ``True`` (default), input such as ``a[b]=c&a=d`` decodes to ``{"a": [{"b": "c"}, "d"]}``.
When ``False``, the decoder restores the legacy behavior and adds non-empty string scalars as object keys,
e.g. ``{"a": {"b": "c", "d": True}}``.
"""

def __post_init__(self) -> None:
"""Post-initialization."""
# Default `decode_dot_in_keys` first, then mirror into `allow_dots` when unspecified.
Expand Down
10 changes: 7 additions & 3 deletions src/qs_codec/models/overflow_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ class OverflowDict(dict):

def copy(self) -> "OverflowDict":
"""Return an OverflowDict copy to preserve the overflow marker."""
return OverflowDict(super().copy())
return self.__class__(super().copy())

def __copy__(self) -> "OverflowDict":
"""Return an OverflowDict copy to preserve the overflow marker."""
return OverflowDict(super().copy())
return self.__class__(super().copy())

def __deepcopy__(self, memo: dict[int, object]) -> "OverflowDict":
"""Return an OverflowDict deepcopy to preserve the overflow marker."""
copied = OverflowDict()
copied = self.__class__()
memo[id(self)] = copied
for key, value in self.items():
copied[copy.deepcopy(key, memo)] = copy.deepcopy(value, memo)
return copied


class CommaOverflowDict(OverflowDict):
"""Overflow marker for comma-split values that exceeded `list_limit`."""
Loading