#############################
# Fork: https://github.com/HessamLa/recursivenamespace
# %%
from __future__ import annotations
import contextlib
import dataclasses
import functools
import json
import logging
import re
import sys
import warnings
from copy import deepcopy
from pathlib import Path
from types import SimpleNamespace
from typing import (
Any,
Callable,
Dict,
Generator,
Iterator,
List,
Optional,
TypeVar,
Union,
)
# Conditional import for TOML support
try:
import tomllib # type: ignore[import-not-found] # Python 3.11+
except ImportError:
try:
import tomli as tomllib # type: ignore[import-not-found]
except ImportError:
tomllib = None
from . import utils
from .errors import GetChainKeyError, SerializationError, SetChainKeyError
T = TypeVar("T")
_KEY_NORMALIZE_RE = re.compile(r"[.\-\s]")
__all__ = [
"recursivenamespace",
"GetChainKeyError",
"SerializationError",
"SetChainKeyError",
]
# Phase 1 of the method-proxy migration: public methods will eventually
# live exclusively under ``obj._.<method>(...)``. For now the class still
# exposes them directly as shims that warn and delegate to _StaticImpl,
# the single source of truth.
_DEPRECATION_TEMPLATE = (
"Calling '{name}' directly on an RNS instance is deprecated and "
"will be removed in a future major release. Use 'obj._.{name}(...)' "
"instead."
)
_SHADOW_TEMPLATE = (
"Data field '{name}' shadows the deprecated method '{name}'. The "
"value is stored and reachable via obj['{name}'] / obj.{name}; "
"call the method via 'obj._.{name}(...)'."
)
def _deprecated(func: Callable[..., Any]) -> Callable[..., Any]:
"""Mark a class-level shim as deprecated in favor of ``obj._.method(...)``.
Emits DeprecationWarning at call time using the wrapped function's
name, then forwards. Remove these decorators (and the shims they
decorate) in Phase 3 of the migration.
"""
msg = _DEPRECATION_TEMPLATE.format(name=func.__name__)
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
return wrapper
[docs]
class recursivenamespace(SimpleNamespace):
__HASH__ = "#"
_logger_ = logging.getLogger(__name__)
# ``_`` is bound to a data descriptor after the class is defined,
# see ``recursivenamespace._ = _Descriptor()`` below.
[docs]
def __init__(
self,
data: Optional[Dict[str, Any]] = None,
accepted_iter_types: Optional[List[type]] = None,
use_raw_key: bool = False,
**kwargs: Any,
) -> None:
if data is None:
data = {}
if accepted_iter_types is None:
accepted_iter_types = []
self._key_ = ""
self._use__raw_key_ = use_raw_key
self._supported__types_ = list(
dict.fromkeys([list, tuple, set] + accepted_iter_types)
)
self._protected__keys_: set[str] = set() # init attr in __dict__
self._protected__keys_ = (
set(self.__dict__.keys()) | _HARD_PROTECTED_CLASS_ATTRS
)
if isinstance(data, dict):
kwargs.update(data)
for key, val in kwargs.items():
key = self._re_(key)
if isinstance(val, dict):
val = recursivenamespace(val, accepted_iter_types, use_raw_key)
_StaticImpl.set_key(val, key)
elif isinstance(val, recursivenamespace):
_StaticImpl.set_key(val, key)
else:
val = self._process_(val)
self[key] = val
def _process_(
self,
val: Any,
accepted_iter_types: Optional[List[type]] = None,
use_raw_key: bool = False,
) -> Any:
if isinstance(val, dict):
return recursivenamespace(val, accepted_iter_types, use_raw_key)
elif isinstance(val, str):
return val
elif hasattr(val, "__iter__") and type(val) in self._supported__types_:
lst = [
self._process_(v, accepted_iter_types, use_raw_key) for v in val
]
try:
return type(val)(lst)
except Exception as e:
print(
f"Failed to make iterable object of type {type(val)}",
e,
file=sys.stderr,
)
return val
else:
return val
def _re_(self, key: str) -> str:
return key if self._use__raw_key_ else _KEY_NORMALIZE_RE.sub("_", key)
def _remove_protected_key_(self, key: str) -> None: # NOSONAR
"""Use with be-careful!"""
self._protected__keys_.remove(key)
self.__dict__.pop(key)
# ── Dunders ───────────────────────────────────────────────────
[docs]
def __eq__(self, other: object) -> bool:
if isinstance(other, recursivenamespace):
return vars(self) == vars(other)
elif isinstance(other, dict):
return vars(self) == other
return False
[docs]
def __repr__(self) -> str:
s = ""
for k, v in _StaticImpl.items(self):
s += f"{k}={v}, "
if len(s) > 0:
s = s[:-2]
return f"RNS({s})"
def __str__(self) -> str:
return self.__repr__()
[docs]
def __len__(self) -> int:
return sum(1 for k in self.__dict__ if k not in self._protected__keys_)
def __delattr__(self, key: str) -> None:
key = self._re_(key)
if key in self._protected__keys_:
raise AttributeError(
f"The key '{key}' is protected — reserved method proxy"
if key == "_"
else f"The key '{key}' is protected."
)
del self.__dict__[key]
[docs]
def __setitem__(self, key: str, value: Any) -> None:
key = self._re_(key)
if key in self._protected__keys_:
raise KeyError(f"The key '{key}' is protected.")
if key in _DEPRECATED_PUBLIC_METHODS:
# FutureWarning (not DeprecationWarning) so the signal is
# visible under Python's default filter. The shadow is an
# end-user-data event — the caller's data collides with a
# library method name — and they need to see it without
# opting in via -W default.
warnings.warn(
_SHADOW_TEMPLATE.format(name=key),
FutureWarning,
stacklevel=2,
)
setattr(self, key, value)
[docs]
def __getitem__(self, key: str) -> Any:
key = self._re_(key)
if key in self._protected__keys_:
raise KeyError(f"The key '{key}' is protected.")
return getattr(self, key)
[docs]
def __delitem__(self, key: str) -> None:
key = self._re_(key)
delattr(self, key)
[docs]
def __contains__(self, key: str) -> bool:
key = self._re_(key)
return key in self.__dict__
def __copy__(self) -> "recursivenamespace":
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
def __deepcopy__(self, memo: Dict[int, Any]) -> "recursivenamespace":
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
setattr(result, k, deepcopy(v, memo))
return result
def __iter__(self) -> Iterator[str]:
if sys._getframe(1).f_code.co_name == "dict":
return iter(_StaticImpl.to_dict(self))
return iter(_StaticImpl.keys(self))
# ── Private chain-key helpers ────────────────────────────────
def _iter_to_dict_(self, iterable: Any) -> Any:
elements = []
for val in iterable:
if isinstance(val, recursivenamespace):
elements.append(_StaticImpl.to_dict(val))
elif isinstance(val, dict):
elements.append(val)
elif (
hasattr(val, "__iter__")
and type(val) in self._supported__types_
):
elements.append(self._iter_to_dict_(val))
else:
elements.append(val)
return type(iterable)(elements)
def _chain_set_array_(self, key: str, subs: List[str], value: Any) -> None:
target = self._get_or_create_list_target_(key)
if not subs:
raise KeyError(
f"Invalid array key '{key}'. Required the 'index' as well, "
f"e.g.: key[].#"
)
head, *rest = subs
if head == self.__HASH__:
self._array_append_(target, rest, value)
else:
self._array_set_at_(target, key, int(head), rest, value)
def _get_or_create_list_target_(self, key: str) -> List[Any]:
# Use ``key in self`` (which checks ``__dict__``) rather than
# ``hasattr`` — ``hasattr`` resolves class attributes too, so a
# deprecated method name like ``items`` would falsely look
# "present" and the bound method would be returned below.
if key not in self:
self[key] = []
target = self[key]
if not isinstance(target, list):
raise KeyError(
f"Invalid array key '{key}'. It is required a list, but "
f"got {type(target)}"
)
return target
def _array_append_(
self, target: List[Any], sub_keys: List[str], value: Any
) -> None:
if not sub_keys:
target.append(value)
return
new_item = recursivenamespace(
None, self._supported__types_, self._use__raw_key_
)
_StaticImpl.val_set(new_item, utils.join_key(sub_keys), value)
target.append(new_item)
def _array_set_at_(
self,
target: List[Any],
key: str,
index: int,
sub_keys: List[str],
value: Any,
) -> None:
if not sub_keys:
target[index] = value
return
child = target[index]
sub_key = utils.join_key(sub_keys)
if isinstance(child, recursivenamespace):
_StaticImpl.val_set(child, sub_key, value)
else:
raise SetChainKeyError(child, f"{key}[{index}]", sub_key)
def _chain_set_value_(self, key: str, subs: List[str], value: Any) -> None:
# See note in ``_get_or_create_list_target_``: ``hasattr`` would
# find class-level deprecated method shims and skip auto-vivify.
if key not in self:
self[key] = recursivenamespace(
None, self._supported__types_, self._use__raw_key_
)
target = self[key]
sub_key = utils.join_key(subs)
if isinstance(target, recursivenamespace):
_StaticImpl.val_set(target, sub_key, value)
else:
raise SetChainKeyError(target, key, sub_key)
def _chain_get_array_(self, key: str, subs: List[str]) -> Any:
# Without this guard, a missing data field whose name matches a
# class method (e.g. ``items``) would resolve to a bound method
# via attribute lookup and hit the type-mismatch branch below
# with a confusing "method" type in the message.
if key not in self:
raise GetChainKeyError(None, key, utils.join_key(subs))
target = self[key]
subs_len = len(subs)
if not isinstance(target, list):
raise KeyError(
f"Invalid array key '{key}'. It is required a list, but got {type(target)}"
)
if subs_len == 0:
raise KeyError(
f"Invalid array key '{key}'. Required the 'index' as well, e.g.: key[].#"
)
index = -1 if subs[0] == self.__HASH__ else int(subs[0])
subs = subs[1:]
subs_len -= 1
if subs_len == 0:
return target[index]
target = target[index]
sub_key = utils.join_key(subs)
if isinstance(target, recursivenamespace):
return _StaticImpl.val_get(target, sub_key)
elif subs_len == 1:
return getattr(target, sub_key)
else:
raise GetChainKeyError(target, key, sub_key)
def _chain_get_value_(self, key: str, subs: List[str]) -> Any:
sub_key = utils.join_key(subs)
# See ``_chain_get_array_``: guard against class-method shadow.
if key not in self:
raise GetChainKeyError(None, key, sub_key)
target = self[key]
if isinstance(target, recursivenamespace):
return _StaticImpl.val_get(target, sub_key)
elif len(subs) == 1:
return getattr(target, sub_key)
else:
raise GetChainKeyError(target, key, sub_key)
@staticmethod
def _toml_escape_str_(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"')
@staticmethod
def _toml_format_scalar_(value: Any) -> str:
"""Render a primitive TOML value (caller already handled None)."""
if isinstance(value, bool):
return str(value).lower()
if isinstance(value, str):
return f'"{recursivenamespace._toml_escape_str_(value)}"'
return str(value)
@staticmethod
def _toml_format_array_(key: str, value: Any) -> str:
"""Render ``key = [v1, v2, ...]`` or a comment if not serializable."""
if not all(isinstance(v, (str, int, float, bool)) for v in value):
return f"# {key} = [complex array - not serialized]"
parts = [recursivenamespace._toml_format_scalar_(v) for v in value]
return f"{key} = [{', '.join(parts)}]"
@staticmethod
def _toml_format_line_(key: str, value: Any) -> Optional[str]:
"""Render a single ``key = value`` line, or None to skip the entry."""
if value is None:
return None
if isinstance(value, (str, int, float, bool)):
return f"{key} = {recursivenamespace._toml_format_scalar_(value)}"
if isinstance(value, (list, tuple)):
return recursivenamespace._toml_format_array_(key, value)
return f"# {key} = [type {type(value).__name__} not supported]"
@staticmethod
def _dict_to_toml_(data: Dict[str, Any], prefix: str = "") -> str:
"""Convert dict to TOML format."""
lines: List[str] = []
tables: List[tuple[str, Dict[str, Any]]] = []
for key, value in data.items():
full_key = f"{prefix}.{key}" if prefix else key
if isinstance(value, dict):
tables.append((full_key, value))
continue
line = recursivenamespace._toml_format_line_(key, value)
if line is not None:
lines.append(line)
for table_key, table_value in tables:
lines.append("")
lines.append(f"[{table_key}]")
lines.append(recursivenamespace._dict_to_toml_(table_value, ""))
return "\n".join(lines)
# ── Classmethod factories (kept on the class, no deprecation) ─
[docs]
@classmethod
def from_json(
cls,
json_str: str,
accepted_iter_types: Optional[List[type]] = None,
use_raw_key: bool = False,
) -> "recursivenamespace":
try:
data = json.loads(json_str)
if not isinstance(data, dict):
raise SerializationError(
f"JSON must represent a dict, got {type(data)}"
)
return cls(data, accepted_iter_types, use_raw_key)
except json.JSONDecodeError as e:
raise SerializationError(f"Invalid JSON: {e}")
except Exception as e:
raise SerializationError(f"Failed to parse JSON: {e}")
[docs]
@classmethod
def load_json(
cls,
filepath: Union[str, Path],
accepted_iter_types: Optional[List[type]] = None,
use_raw_key: bool = False,
) -> "recursivenamespace":
try:
with open(filepath, "r", encoding="utf-8") as f:
return cls.from_json(f.read(), accepted_iter_types, use_raw_key)
except FileNotFoundError:
raise
except Exception as e:
raise SerializationError(f"Failed to load JSON file: {e}")
[docs]
@classmethod
def from_toml(
cls,
toml_str: str,
accepted_iter_types: Optional[List[type]] = None,
use_raw_key: bool = False,
) -> "recursivenamespace":
if tomllib is None:
raise ImportError(
"TOML support requires Python 3.11+ or 'tomli' package. "
"Install with: pip install tomli"
)
try:
data = tomllib.loads(toml_str)
return cls(data, accepted_iter_types, use_raw_key)
except Exception as e:
raise SerializationError(f"Failed to parse TOML: {e}")
[docs]
@classmethod
def load_toml(
cls,
filepath: Union[str, Path],
accepted_iter_types: Optional[List[type]] = None,
use_raw_key: bool = False,
) -> "recursivenamespace":
if tomllib is None:
raise ImportError(
"TOML support requires Python 3.11+ or 'tomli' package. "
"Install with: pip install tomli"
)
try:
with open(filepath, "rb") as f:
data = tomllib.load(f)
return cls(data, accepted_iter_types, use_raw_key)
except FileNotFoundError:
raise
except Exception as e:
raise SerializationError(f"Failed to load TOML file: {e}")
# ── Public-method shims (warn + delegate to _StaticImpl) ──────
[docs]
@_deprecated
def set_key(self, key: str) -> None:
return _StaticImpl.set_key(self, key)
[docs]
@_deprecated
def get_key(self) -> str:
return _StaticImpl.get_key(self)
[docs]
@_deprecated
def update(self, data: Union[Dict[str, Any], "recursivenamespace"]) -> None:
return _StaticImpl.update(self, data)
[docs]
@_deprecated
def copy(self) -> "recursivenamespace":
return _StaticImpl.copy(self)
[docs]
@_deprecated
def deepcopy(self) -> "recursivenamespace":
return _StaticImpl.deepcopy(self)
[docs]
@_deprecated
def pop(self, key: str, default: Optional[T] = None) -> Union[Any, T]:
return _StaticImpl.pop(self, key, default)
[docs]
@_deprecated
def items(self) -> List[tuple[str, Any]]:
return _StaticImpl.items(self)
[docs]
@_deprecated
def keys(self) -> List[str]:
return _StaticImpl.keys(self)
[docs]
@_deprecated
def values(self) -> List[Any]:
return _StaticImpl.values(self)
[docs]
@_deprecated
def to_dict(self, flatten_sep: Union[str, bool] = False) -> Dict[str, Any]:
return _StaticImpl.to_dict(self, flatten_sep)
[docs]
@_deprecated
def val_set(self, key: str, value: Any) -> None:
return _StaticImpl.val_set(self, key, value)
[docs]
@_deprecated
def val_get(self, key: str) -> Any:
return _StaticImpl.val_get(self, key)
[docs]
@_deprecated
def get_or_else(
self, key: str, or_else: Optional[T] = None, show_log: bool = False
) -> Union[Any, T]:
return _StaticImpl.get_or_else(self, key, or_else, show_log)
[docs]
@_deprecated
def as_schema(self, schema_cls: type[T], /, **kwargs: Any) -> T:
return _StaticImpl.as_schema(self, schema_cls, **kwargs)
[docs]
@_deprecated
def temporary(
self,
) -> contextlib.AbstractContextManager["recursivenamespace"]:
return _StaticImpl.temporary(self)
[docs]
@_deprecated
def overlay(
self, overrides: Dict[str, Any]
) -> contextlib.AbstractContextManager["recursivenamespace"]:
return _StaticImpl.overlay(self, overrides)
[docs]
@_deprecated
def to_json(
self,
indent: Optional[int] = 2,
sort_keys: bool = False,
ensure_ascii: bool = True,
**kwargs: Any,
) -> str:
return _StaticImpl.to_json(
self, indent, sort_keys, ensure_ascii, **kwargs
)
[docs]
@_deprecated
def save_json(
self,
filepath: Union[str, Path],
indent: Optional[int] = 2,
**kwargs: Any,
) -> None:
return _StaticImpl.save_json(self, filepath, indent, **kwargs)
[docs]
@_deprecated
def to_toml(self) -> str:
return _StaticImpl.to_toml(self)
[docs]
@_deprecated
def save_toml(self, filepath: Union[str, Path]) -> None:
return _StaticImpl.save_toml(self, filepath)
# ──────────────────────────────────────────────────────────────────
# _StaticImpl: single source of truth for the 20 public methods.
# Every method takes the RNS instance explicitly as the first argument.
# Internal cross-method recursion uses _StaticImpl.<other>(rns_ins, ...)
# directly — never the shim — so no deprecation warnings fire on hops
# the user did not make.
# ──────────────────────────────────────────────────────────────────
class _StaticImpl:
"""Real implementation of RNS public instance methods.
All callers (the class shims, the ``_BoundProxy`` for ``obj._``,
direct ``RNS._.method(obj, ...)`` calls, and internal recursion)
converge on these staticmethods.
"""
@staticmethod
def set_key(rns_ins: "recursivenamespace", key: str) -> None:
rns_ins._key_ = rns_ins._re_(key)
@staticmethod
def get_key(rns_ins: "recursivenamespace") -> str:
return rns_ins._key_
@staticmethod
def update(
rns_ins: "recursivenamespace",
data: Union[Dict[str, Any], "recursivenamespace"],
) -> None:
try:
if not isinstance(data, recursivenamespace):
data = recursivenamespace(
data,
rns_ins._supported__types_,
rns_ins._use__raw_key_,
)
except Exception as e:
raise TypeError(
f"Failed to update with data of type {type(data)}"
) from e
for key, val in _StaticImpl.items(data):
rns_ins[key] = val
@staticmethod
def copy(rns_ins: "recursivenamespace") -> "recursivenamespace":
return rns_ins.__copy__()
@staticmethod
def deepcopy(rns_ins: "recursivenamespace") -> "recursivenamespace":
memo: Dict[int, Any] = {}
return rns_ins.__deepcopy__(memo)
@staticmethod
def pop(
rns_ins: "recursivenamespace",
key: str,
default: Optional[T] = None,
) -> Union[Any, T]:
key = rns_ins._re_(key)
if key in rns_ins._protected__keys_:
raise KeyError(f"The key '{key}' is protected.")
if key in rns_ins.__dict__:
val = rns_ins.__dict__[key]
del rns_ins.__dict__[key]
return val
return default
@staticmethod
def items(rns_ins: "recursivenamespace") -> List[tuple[str, Any]]:
return [
(k, v)
for k, v in rns_ins.__dict__.items()
if k not in rns_ins._protected__keys_
]
@staticmethod
def keys(rns_ins: "recursivenamespace") -> List[str]:
return [
k
for k in rns_ins.__dict__.keys()
if k not in rns_ins._protected__keys_
]
@staticmethod
def values(rns_ins: "recursivenamespace") -> List[Any]:
return [
v
for k, v in rns_ins.__dict__.items()
if k not in rns_ins._protected__keys_
]
@staticmethod
def to_dict(
rns_ins: "recursivenamespace",
flatten_sep: Union[str, bool] = False,
) -> Dict[str, Any]:
"""Convert RNS to dict. If flatten_sep is set, flatten keys."""
pairs = []
for k, v in _StaticImpl.items(rns_ins):
if isinstance(v, recursivenamespace):
pairs.append((k, _StaticImpl.to_dict(v)))
elif isinstance(v, dict):
pairs.append((k, v))
elif (
hasattr(v, "__iter__") and type(v) in rns_ins._supported__types_
):
pairs.append((k, rns_ins._iter_to_dict_(v)))
else:
pairs.append((k, v))
d = dict(pairs)
if flatten_sep:
sep = flatten_sep if isinstance(flatten_sep, str) else "."
d = dict(utils.flatten_as_dict(d, sep=sep))
return d
@staticmethod
def val_set(rns_ins: "recursivenamespace", key: str, value: Any) -> None:
"""Set the value by key. Supports chain-keys and arrays.
Patterns: ``a.b.c``, ``a.b.c[].<i>``, ``a.b.c[].#``, etc.
"""
key, *subs = utils.split_key(key)
key = utils.unescape_key(key)
subs_len = len(subs)
is_array = key[-2:] == utils.KEY_ARRAY
if subs_len == 0 and not is_array:
rns_ins[key] = value
return
if is_array:
rns_ins._chain_set_array_(key[:-2], subs, value)
else:
rns_ins._chain_set_value_(key, subs, value)
@staticmethod
def val_get(rns_ins: "recursivenamespace", key: str) -> Any:
"""Get the value by key. Supports chain-keys and arrays."""
key, *subs = utils.split_key(key)
key = utils.unescape_key(key)
subs_len = len(subs)
is_array = key[-2:] == utils.KEY_ARRAY
if subs_len == 0 and not is_array:
return rns_ins[key]
if is_array:
return rns_ins._chain_get_array_(key[:-2], subs)
return rns_ins._chain_get_value_(key, subs)
@staticmethod
def get_or_else(
rns_ins: "recursivenamespace",
key: str,
or_else: Optional[T] = None,
show_log: bool = False,
) -> Union[Any, T]:
try:
return _StaticImpl.val_get(rns_ins, key)
except Exception:
if show_log:
rns_ins._logger_.warning(f"KeyNotFound - {key}", exc_info=True)
return or_else
@staticmethod
def as_schema(
rns_ins: "recursivenamespace",
schema_cls: type[T],
/,
**kwargs: Any,
) -> T:
if not dataclasses.is_dataclass(schema_cls):
raise TypeError("The 'schema_cls' must be a DataClass type.")
fields = dataclasses.fields(schema_cls)
for field in fields:
name = field.name
kwargs[name] = rns_ins[name]
return schema_cls(**kwargs)
@staticmethod
@contextlib.contextmanager
def temporary(
rns_ins: "recursivenamespace",
) -> Generator["recursivenamespace", None, None]:
"""Yield a deep copy; the original is untouched."""
yield _StaticImpl.deepcopy(rns_ins)
@staticmethod
@contextlib.contextmanager
def overlay(
rns_ins: "recursivenamespace", overrides: Dict[str, Any]
) -> Generator["recursivenamespace", None, None]:
"""Temporarily apply *overrides*, restore on exit."""
originals: Dict[str, Any] = {}
added_keys: List[str] = []
for key, value in overrides.items():
nk = rns_ins._re_(key)
if nk in rns_ins.__dict__ and nk not in rns_ins._protected__keys_:
originals[nk] = rns_ins.__dict__[nk]
else:
added_keys.append(nk)
rns_ins[key] = value
try:
yield rns_ins
finally:
for k, v in originals.items():
rns_ins.__dict__[k] = v
for k in added_keys:
rns_ins.__dict__.pop(k, None)
@staticmethod
def to_json(
rns_ins: "recursivenamespace",
indent: Optional[int] = 2,
sort_keys: bool = False,
ensure_ascii: bool = True,
**kwargs: Any,
) -> str:
try:
return json.dumps(
_StaticImpl.to_dict(rns_ins),
indent=indent,
sort_keys=sort_keys,
ensure_ascii=ensure_ascii,
**kwargs,
)
except (TypeError, ValueError) as e:
raise SerializationError(f"Failed to serialize to JSON: {e}")
@staticmethod
def save_json(
rns_ins: "recursivenamespace",
filepath: Union[str, Path],
indent: Optional[int] = 2,
**kwargs: Any,
) -> None:
try:
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "w", encoding="utf-8") as f:
f.write(_StaticImpl.to_json(rns_ins, indent=indent, **kwargs))
except Exception as e:
raise SerializationError(f"Failed to save JSON file: {e}")
@staticmethod
def to_toml(rns_ins: "recursivenamespace") -> str:
try:
return recursivenamespace._dict_to_toml_(
_StaticImpl.to_dict(rns_ins)
)
except Exception as e:
raise SerializationError(f"Failed to serialize to TOML: {e}")
@staticmethod
def save_toml(
rns_ins: "recursivenamespace", filepath: Union[str, Path]
) -> None:
try:
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "w", encoding="utf-8") as f:
f.write(_StaticImpl.to_toml(rns_ins))
except Exception as e:
raise SerializationError(f"Failed to save TOML file: {e}")
# ──────────────────────────────────────────────────────────────────
# Bound proxy + descriptor for ``obj._``
# ──────────────────────────────────────────────────────────────────
class _BoundProxy:
"""Curries the owner into ``_StaticImpl`` calls so ``obj._.to_dict()``
works as a normal bound-method call."""
__slots__ = ("_owner",)
def __init__(self, owner: "recursivenamespace") -> None:
object.__setattr__(self, "_owner", owner)
def __getattr__(self, name: str) -> Any:
if name.startswith("_"):
raise AttributeError(name)
attr = getattr(_StaticImpl, name, None)
if attr is None or not callable(attr):
raise AttributeError(name)
return functools.partial(attr, self._owner)
def __setattr__(self, name: str, value: Any) -> None:
raise AttributeError("RNS '_' proxy is read-only")
def __dir__(self) -> List[str]:
return [n for n in dir(_StaticImpl) if not n.startswith("_")]
def __repr__(self) -> str:
return f"<RNS method proxy for 0x{id(self._owner):x}>"
class _Descriptor:
"""Data descriptor exposing ``recursivenamespace._``.
Class access (``RNS._``) returns the static container so callers
can write ``RNS._.to_dict(obj)``. Instance access (``obj._``)
returns a fresh ``_BoundProxy`` so callers can write
``obj._.to_dict()`` bound-method style. The descriptor's
``__set__`` / ``__delete__`` make it a *data* descriptor that
can't be shadowed by instance ``__dict__`` assignments.
"""
def __get__(
self,
instance: Optional["recursivenamespace"],
owner: type,
) -> Any:
if instance is None:
return _StaticImpl
return _BoundProxy(instance)
def __set__(self, instance: "recursivenamespace", value: Any) -> None:
raise AttributeError("Cannot assign to '_' — reserved method proxy")
def __delete__(self, instance: "recursivenamespace") -> None:
raise AttributeError("Cannot delete '_' — reserved method proxy")
# Bind the descriptor and compute the protected-attribute set.
# Use setattr so static type checkers don't flag the dynamic attribute.
setattr(recursivenamespace, "_", _Descriptor())
# Two-tier protection. Hard-protected names load-bearing for internal
# logic raise KeyError on data collision. Soft-protected public method
# names (deprecated direct-call shims + classmethod factories) emit
# FutureWarning and let the data win — callers can still reach the
# methods via ``obj._.<name>(...)``. FutureWarning (vs DeprecationWarning
# on the @_deprecated call shims) because shadow events are caused by
# end-user data and must surface under Python's default warning filter.
# All single-underscore class attrs — the ``_`` proxy plus every
# private helper (_re_, _process_, _chain_*_, _iter_to_dict_, etc.)
# and ``_logger_``. Excludes Python dunders.
_HARD_PROTECTED_CLASS_ATTRS: frozenset[str] = frozenset(
name
for name in dir(recursivenamespace)
if not name.startswith("__") and name.startswith("_")
)
_DEPRECATED_PUBLIC_METHODS: frozenset[str] = frozenset(
name for name in dir(recursivenamespace) if not name.startswith("_")
)
# %%
def _rns_normalize_return_(
ret_val: Any, use_chain_key: bool, props: str
) -> Any:
"""Convert a decorated function's return value into ``data`` for RNS.
Recognised shapes: a list of ``KV_Pair`` (chain-key mode), a dict,
a dataclass instance, or any other scalar (wrapped under ``props``).
"""
if (
use_chain_key
and isinstance(ret_val, list)
and (not ret_val or isinstance(ret_val[0], utils.KV_Pair))
):
return ret_val
if isinstance(ret_val, dict):
return ret_val
if dataclasses.is_dataclass(ret_val):
return dataclasses.asdict(ret_val) # type: ignore[arg-type]
return {props: ret_val}
def _rns_build_from_data_(
data: Any,
accepted_iter_types_list: List[type],
use_raw_key: bool,
use_chain_key: bool,
) -> "recursivenamespace":
if not use_chain_key:
return recursivenamespace(data, accepted_iter_types_list, use_raw_key)
ret = recursivenamespace(None, accepted_iter_types_list, use_raw_key)
items = data.items() if isinstance(data, dict) else data
for key, value in items:
_StaticImpl.val_set(ret, key, value)
return ret
[docs]
def rns(
accepted_iter_types: Optional[List[type]] = None,
use_raw_key: bool = False,
use_chain_key: bool = False,
props: str = "props",
) -> Callable[[Callable[..., Any]], Callable[..., recursivenamespace]]:
"""Create RNS object"""
accepted_iter_types_list: List[type] = (
[] if accepted_iter_types is None else accepted_iter_types
)
def fn_wrapper(
func: Callable[..., Any],
) -> Callable[..., recursivenamespace]: # NOSONAR
@functools.wraps(func)
def create_rns(*args: Any, **kwargs: Any) -> recursivenamespace:
ret_val = func(*args, **kwargs)
data = _rns_normalize_return_(ret_val, use_chain_key, props)
return _rns_build_from_data_(
data, accepted_iter_types_list, use_raw_key, use_chain_key
)
return create_rns
return fn_wrapper