"""Loader and Dumper to serialize FancyDicts"""
import re
import urllib.request
import urllib.parse
from pathlib import Path
from io import IOBase
import yaml
from fancy_dict.errors import NoLoaderForSourceAvailable
from fancy_dict import merger, conditions
from fancy_dict.annotations import Annotations
[docs]class AnnotationsDecoder:
"""Interface to decode annotations"""
[docs] @classmethod
def decode(cls, key=None, value=None):
"""Decodes an annotation from a key/value-pair
Args:
key: can be used to decode annotation
value: can be used to decode annotation
Returns:
dict with the following keys:
* key: decoded key
* value: decoded value
* decoded: annotation
"""
raise NotImplementedError()
[docs]class AnnotationsEncoder:
"""Interface to encode annotations"""
[docs] @classmethod
def encode(cls, annotation, key=None, value=None):
"""Encodes an annotation into a key/value-pair
Args:
annotation: annotation to encode
key: initial key value to encode annotation into
value: initial value to encode annotation into
Returns:
dict with the following keys:
* key: key with annotation encoded
* value: value with annotation encoded
"""
raise NotImplementedError()
[docs]class KeyAnnotationsConverter(AnnotationsEncoder, AnnotationsDecoder):
"""Encodes/Decodes an Annotations from the key
The following format is used to encode and decode annotation strings:
?(key)[add]
* The first character defines the condition (optional)
* Followed by the key name
* if the key is in round brackets, the key gets finalized (optional)
* at the end the merge method can be specified in square brackets
"""
MERGE_METHODS = {
"add": merger.add,
"overwrite": merger.overwrite,
"update": merger.update,
}
CONDITIONS = {
"#": conditions.always,
"?": conditions.if_existing,
"+": conditions.if_not_existing,
}
[docs] @classmethod
def decode(cls, key=None, value=None):
annotations = Annotations(
merge_method=cls._parse_merge_method(key),
condition=cls._parse_condition(key),
finalized=cls._parse_finalized(key),
)
key = cls._parse_key(key)
return {
"key": key,
"value": value,
"annotations": annotations
}
@classmethod
def _parse_finalized(cls, annotated_key):
key = cls._parse_key(annotated_key)
locking_pattern = r"\({key}\)".format(key=key)
match = re.search(locking_pattern, annotated_key)
return bool(match)
@classmethod
def _parse_key(cls, annotated_key):
condition_marker = "".join(cls.CONDITIONS.keys())
key_pattern = r"^[{}]?\({{0,1}}(?P<key>[^)[]+)\){{0,1}}".format(
condition_marker
)
return re.match(key_pattern, annotated_key).group("key")
@classmethod
def _parse_merge_method(cls, annotated_key):
method_pattern = r"\[(?P<name>.+)\]$"
match = re.search(method_pattern, annotated_key)
if match:
return cls.MERGE_METHODS[match.group("name")]
return None
@classmethod
def _parse_condition(cls, annotated_key):
for marker, condition in cls.CONDITIONS.items():
if annotated_key.startswith(marker):
return condition
return None
[docs] @classmethod
def encode(cls, annotation, key=None, value=None):
return {
"key": cls._to_string(annotation).format(key),
"value": value
}
@classmethod
def _to_string(cls, annotation):
annotated_key = ""
if annotation.get("condition"):
reversed_conditions = {v: k for k, v in cls.CONDITIONS.items()}
condition = reversed_conditions[annotation.condition]
annotated_key += condition
if annotation.get("finalized"):
annotated_key += "({})"
else:
annotated_key += "{}"
if annotation.get("merge_method"):
reversed_merge_methods \
= {v: k for k, v in cls.MERGE_METHODS.items()}
method_name = reversed_merge_methods[annotation.merge_method]
annotated_key += "[{}]".format(method_name)
return annotated_key
[docs]class LoaderInterface:
"""Interface for a FancyDict Loader"""
def __init__(self, output_type):
self.type = output_type
[docs] @classmethod
def can_load(cls, source):
"""Checks if the loader can load the given source
Args:
source: source to load
Returns:
True if Loader can load the source else False
"""
raise NotImplementedError()
[docs] def load(self, source, annotations_decoder=None):
"""Loads a FancyDict from a given source
If an annotations_decoder is given, Annotations can be decoded from
the source data.
Args:
source: source to load from
annotations_decoder: Decoder to decode annotations from source data
Returns:
FancyDict
"""
raise NotImplementedError()
[docs]class DictLoader(LoaderInterface):
"""Loads a dict as FancyDict"""
[docs] @classmethod
def can_load(cls, source):
return isinstance(source, dict)
[docs] def load(self, source, annotations_decoder=None):
return self.type(
self._load_without_running_annotations(source, annotations_decoder)
)
def _load_without_running_annotations(self, dct, annotations_decoder=None):
loaded_dict = self.type()
for key, value in dct.items():
if annotations_decoder:
key, value = self._annotate(loaded_dict, key, value,
annotations_decoder)
if isinstance(value, dict):
value = self._load_without_running_annotations(
value, annotations_decoder=annotations_decoder
)
if isinstance(value, list):
value = [self.type(item) if isinstance(item, dict) else item
for item in value]
loaded_dict[key] = value
return loaded_dict
@staticmethod
def _annotate(dct, key, value, annotations_decoder):
decoded = annotations_decoder.decode(key=key, value=value)
key = decoded["key"]
value = decoded["value"]
dct.annotate(key, decoded["annotations"])
return key, value
[docs]class IoLoader(DictLoader):
"""Loads a FancyDict from an IO-like object"""
[docs] @classmethod
def can_load(cls, source):
return isinstance(source, IOBase)
@staticmethod
def _load_dict(source):
return yaml.load(source)
[docs] def load(self, source, annotations_decoder=None):
data = self._load_dict(source)
return super().load(data, annotations_decoder=annotations_decoder)
[docs]class FileLoader(IoLoader):
"""Loads a FancyDict from a YAML/JSON file
Looks up files in given base directoies.
Supports a special include key to include other files.
"""
DEFAULT_INCLUDE_PATHS = ('.',)
def __init__(self, output_type,
include_paths=DEFAULT_INCLUDE_PATHS, include_key=None):
super().__init__(output_type)
self._include_paths = include_paths
self._include_key = include_key
[docs] @classmethod
def can_load(cls, source):
return cls._path_exists(source)
[docs] def load(self, source, annotations_decoder=None):
dct = self._load_fancy_dict(source, annotations_decoder)
base_dict = self._build_base_dict_with_includes(
dct.pop(self._include_key, ()), annotations_decoder
)
base_dict.update(dct)
return base_dict
@staticmethod
def _path_exists(path):
"""Hanldes OSError on Windows if URL is given as path"""
try:
return Path(path).exists()
except OSError:
return False
@staticmethod
def _find_filepath(filename, include_paths):
for base_dir in include_paths:
full_path = Path(Path(base_dir) / Path(filename))
if FileLoader._path_exists(full_path):
return full_path
raise FileNotFoundError(filename)
def _load_fancy_dict(self, full_path, annotations_decoder):
with open(full_path, "r") as data_file:
return super()._load_without_running_annotations(
super()._load_dict(data_file),
annotations_decoder=annotations_decoder
)
def _build_base_dict_with_includes(self, includes, annotations_decoder):
base_dict = self.type()
for include in includes:
full_path = self._find_filepath(include, self._include_paths)
base_dict.update(
self.load(full_path, annotations_decoder=annotations_decoder)
)
return base_dict
[docs]class HttpLoader(IoLoader):
"""Loads YAML/JSON files from an URL"""
[docs] @classmethod
def can_load(cls, source):
return urllib.parse.urlparse(source).scheme in ["http", "https"]
[docs] def load(self, source, annotations_decoder=None):
content = urllib.request.urlopen(source).read()
return super().load(content, annotations_decoder=annotations_decoder)
[docs]class CompositeLoader(LoaderInterface):
"""Composition of different Loader
Selects the right Loader for the source.
Can load from dicts and yaml/json files.
"""
LOADER = [
DictLoader,
IoLoader,
FileLoader,
HttpLoader,
]
def __init__(self, output_type, **loader_args):
super().__init__(output_type)
self.loader_args = loader_args
[docs] @classmethod
def can_load(cls, source):
return cls._select_loader_type(source) is not None
[docs] def load(self, source, annotations_decoder=None):
loader_type = self._select_loader_type(source)
if loader_type is None:
raise NoLoaderForSourceAvailable(source)
return loader_type(self.type, **self.loader_args).load(
source, annotations_decoder
)
@classmethod
def _select_loader_type(cls, source):
for loader in cls.LOADER:
if loader.can_load(source):
return loader
return None