Module hat.json
JSON data manipulation and validation
Expand source code
"""JSON data manipulation and validation"""
import collections
import enum
import io
import itertools
import json
import pathlib
import typing
import urllib.parse
import jsonpatch
import jsonschema.validators
import yaml
Array: typing.Type = typing.List['Data']
Object: typing.Type = typing.Dict[str, 'Data']
Data: typing.Type = typing.Union[None, bool, int, float, str, Array, Object]
"""JSON data type identifier."""
Format = enum.Enum('Format', ['JSON', 'YAML'])
"""Encoding format"""
Path: typing.Type = typing.Union[int, str, typing.List['Path']]
"""Data path"""
def equals(a: Data,
b: Data
) -> bool:
"""Equality comparison of json serializable data.
Tests for equality of data according to JSON format. Notably, ``bool``
values are not considered equal to numeric values in any case. This is
different from default equality comparison, which considers `False`
equal to `0` and `0.0`; and `True` equal to `1` and `1.0`.
Example::
assert equals(0, 0.0) is True
assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True
assert equals(1, True) is False
"""
if isinstance(a, bool) != isinstance(b, bool):
return False
if a != b:
return False
if isinstance(a, dict):
return all(equals(a[key], b[key]) for key in a)
elif isinstance(a, list):
return all(equals(i, j) for i, j in zip(a, b))
else:
return True
def flatten(data: Data
) -> typing.Iterable[Data]:
"""Flatten JSON data
If `data` is array, this generator recursively yields result of `flatten`
call with each element of input list. For other `Data` types, input data is
yielded.
Example::
data = [1, [], [2], {'a': [3]}]
result = [1, 2, {'a': [3]}]
assert list(flatten(data)) == result
"""
if isinstance(data, list):
for i in data:
yield from flatten(i)
else:
yield data
def get(data: Data,
path: Path,
default: typing.Optional[Data] = None
) -> Data:
"""Get data element referenced by path
Example::
data = {'a': [1, 2, [3, 4]]}
path = ['a', 2, 0]
assert get(data, path) == 3
data = [1, 2, 3]
assert get(data, 0) == 1
assert get(data, 5) is None
assert get(data, 5, default=123) == 123
"""
for i in flatten(path):
if isinstance(i, str):
if not isinstance(data, dict) or i not in data:
return default
data = data[i]
elif isinstance(i, int) and not isinstance(i, bool):
if not isinstance(data, list):
return default
try:
data = data[i]
except IndexError:
return default
else:
raise ValueError('invalid path')
return data
def set_(data: Data,
path: Path,
value: Data
) -> Data:
"""Create new data by setting data path element value
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = set_(data, path, 5)
assert result == [1, {'a': 2, 'b': 5}, 4]
assert result is not data
data = [1, 2, 3]
result = set_(data, 4, 4)
assert result == [1, 2, 3, None, 4]
"""
parents = collections.deque()
for i in flatten(path):
parent = data
if isinstance(i, str):
data = data.get(i) if isinstance(data, dict) else None
elif isinstance(i, int) and not isinstance(i, bool):
try:
data = data[i] if isinstance(data, list) else None
except IndexError:
data = None
else:
raise ValueError('invalid path')
parents.append((parent, i))
while parents:
parent, i = parents.pop()
if isinstance(i, str):
parent = dict(parent) if isinstance(parent, dict) else {}
parent[i] = value
elif isinstance(i, int) and not isinstance(i, bool):
if not isinstance(parent, list):
parent = []
if i >= len(parent):
parent = [*parent,
*itertools.repeat(None, i - len(parent) + 1)]
elif i < 0 and (-i) > len(parent):
parent = [*itertools.repeat(None, (-i) - len(parent)),
*parent]
else:
parent = list(parent)
parent[i] = value
else:
raise ValueError('invalid path')
value = parent
return value
def diff(src: Data,
dst: Data
) -> Data:
"""Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
"""
return jsonpatch.JsonPatch.from_diff(src, dst).patch
def patch(data: Data,
diff: Data
) -> Data:
"""Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]
"""
return jsonpatch.apply_patch(data, diff)
def encode(data: Data,
format: Format = Format.JSON,
indent: typing.Optional[int] = None
) -> str:
"""Encode JSON data.
Args:
data: JSON data
format: encoding format
indent: indentation size
"""
if format == Format.JSON:
return json.dumps(data, indent=indent)
if format == Format.YAML:
dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper')
else yaml.SafeDumper)
return str(yaml.dump(data, indent=indent, Dumper=dumper))
raise ValueError('unsupported format')
def decode(data_str: str,
format: Format = Format.JSON
) -> Data:
"""Decode JSON data.
Args:
data_str: encoded JSON data
format: encoding format
"""
if format == Format.JSON:
return json.loads(data_str)
if format == Format.YAML:
loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader')
else yaml.SafeLoader)
return yaml.load(io.StringIO(data_str), Loader=loader)
raise ValueError('unsupported format')
def encode_file(data: Data,
path: pathlib.PurePath,
format: typing.Optional[Format] = None,
indent: typing.Optional[int] = 4):
"""Encode JSON data to file.
If `format` is ``None``, encoding format is derived from path suffix.
Args:
data: JSON data
path: file path
format: encoding format
indent: indentation size
"""
if format is None:
if path.suffix == '.json':
format = Format.JSON
elif path.suffix in ('.yaml', '.yml'):
format = Format.YAML
else:
raise ValueError('can not determine format from path suffix')
with open(path, 'w', encoding='utf-8') as f:
if format == Format.JSON:
json.dump(data, f, indent=indent)
elif format == Format.YAML:
dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper')
else yaml.SafeDumper)
yaml.dump(data, f, indent=indent, Dumper=dumper,
explicit_start=True, explicit_end=True)
else:
raise ValueError('unsupported format')
def decode_file(path: pathlib.PurePath,
format: typing.Optional[Format] = None
) -> Data:
"""Decode JSON data from file.
If `format` is ``None``, encoding format is derived from path suffix.
Args:
path: file path
format: encoding format
"""
if format is None:
if path.suffix == '.json':
format = Format.JSON
elif path.suffix in ('.yaml', '.yml'):
format = Format.YAML
else:
raise ValueError('can not determine format from path suffix')
with open(path, 'r', encoding='utf-8') as f:
if format == Format.JSON:
return json.load(f)
if format == Format.YAML:
loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader')
else yaml.SafeLoader)
return yaml.load(f, Loader=loader)
raise ValueError('unsupported format')
class SchemaRepository:
"""JSON Schema repository.
A repository that holds json schemas and enables validation against them.
Repository can be initialized with multiple arguments, which can be
instances of ``pathlib.PurePath``, ``Data`` or ``SchemaRepository``.
If an argument is of type ``pathlib.PurePath``, and path points to file
with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded
from the file. Otherwise, it is assumed that path points to a directory,
which is recursively searched for json and yaml files. All decoded schemas
are added to the repository. If a schema with the same `id` was previosly
added, an exception is raised.
If an argument is of type ``Data``, it should be a json serializable data
representation of a JSON schema. If a schema with the same `id` was
previosly added, an exception is raised.
If an argument is of type ``SchemaRepository``, its schemas are added to
the new repository. Previously added schemas with the same `id` are
replaced.
"""
def __init__(self, *args: typing.Union[pathlib.PurePath,
Data,
'SchemaRepository']):
self._data = {}
for arg in args:
if isinstance(arg, pathlib.PurePath):
self._load_path(arg)
elif isinstance(arg, SchemaRepository):
self._load_repository(arg)
else:
self._load_schema(arg)
def validate(self,
schema_id: str,
data: Data):
"""Validate data against JSON schema.
Args:
schema_id: JSON schema identifier
data: data to be validated
Raises:
jsonschema.ValidationError
"""
uri = urllib.parse.urlparse(schema_id)
path = uri.netloc + uri.path
resolver = jsonschema.RefResolver(
base_uri=f'{uri.scheme}://{path}',
referrer=self._data[uri.scheme][path],
handlers={i: self._get_schema
for i in self._data.keys()})
jsonschema.validate(
instance=data,
schema=resolver.resolve_fragment(resolver.referrer, uri.fragment),
resolver=resolver)
def to_json(self) -> Data:
"""Export repository content as json serializable data.
Entire repository content is exported as json serializable data.
New repository can be created from the exported content by using
:meth:`SchemaRepository.from_json`.
"""
return self._data
@staticmethod
def from_json(data: typing.Union[pathlib.PurePath,
Data]
) -> 'SchemaRepository':
"""Create new repository from content exported as json serializable
data.
Creates a new repository from content of another repository that was
exported by using :meth:`SchemaRepository.to_json`.
Args:
data: repository data
"""
if isinstance(data, pathlib.PurePath):
data = decode_file(data)
repo = SchemaRepository()
repo._data = data
return repo
def _get_schema(self, scheme_id):
uri = urllib.parse.urlparse(scheme_id)
path = uri.netloc + uri.path
return self._data[uri.scheme][path]
def _load_path(self, path):
json_suffixes = {'.json', '.yaml', '.yml'}
paths = ([path] if path.suffix in json_suffixes
else list(itertools.chain.from_iterable(
path.rglob(f'*{i}') for i in json_suffixes)))
for path in paths:
schema = decode_file(path)
self._load_schema(schema)
def _load_schema(self, schema):
meta_schema_id = urllib.parse.urldefrag(schema.get('$schema', '')).url
if meta_schema_id not in jsonschema.validators.meta_schemas:
meta_schema_id = jsonschema.Draft7Validator.META_SCHEMA['$schema']
schema = dict(schema)
schema['$schema'] = meta_schema_id
uri = urllib.parse.urlparse(schema['id'])
path = uri.netloc + uri.path
if uri.scheme not in self._data:
self._data[uri.scheme] = {}
if path in self._data[uri.scheme]:
raise Exception(f"duplicate schema id {uri.scheme}://{path}")
self._data[uri.scheme][path] = schema
def _load_repository(self, repo):
for k, v in repo._data.items():
if k not in self._data:
self._data[k] = v
else:
self._data[k].update(v)
_json_schema_repo_path = (pathlib.Path(__file__).parent /
'json_schema_repo.json')
json_schema_repo: SchemaRepository = (
SchemaRepository.from_json(_json_schema_repo_path)
if _json_schema_repo_path.exists()
else SchemaRepository())
"""JSON Schema repository with generic schemas"""
# check upstream changes in jsonpatch and validate performance inpact
# def _monkeypatch_jsonpatch():
# """Monkeypatch jsonpatch.
# Patch incorrect value comparison between ``bool`` and numeric values when
# diffing json serializable data.
# Comparing `False` to `0` or `0.0`; and `True` to `1` or `1.0` incorrectly
# results in no change.
# """
# def _compare_values(self, path, key, src, dst):
# if isinstance(src, jsonpatch.MutableMapping) and \
# isinstance(dst, jsonpatch.MutableMapping):
# self._compare_dicts(jsonpatch._path_join(path, key), src, dst)
# elif isinstance(src, jsonpatch.MutableSequence) and \
# isinstance(dst, jsonpatch.MutableSequence):
# self._compare_lists(jsonpatch._path_join(path, key), src, dst)
# elif isinstance(src, bool) == isinstance(dst, bool) and src == dst:
# pass
# else:
# self._item_replaced(path, key, dst)
# jsonpatch.DiffBuilder._compare_values = _compare_values
# _monkeypatch_jsonpatch()
Global variables
var Data : typing.Type
-
JSON data type identifier.
var Path : typing.Type
-
Data path
var json_schema_repo : SchemaRepository
-
JSON Schema repository with generic schemas
Functions
def decode(data_str: str, format: Format = Format.JSON) ‑> typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]
-
Decode JSON data.
Args
data_str
- encoded JSON data
format
- encoding format
Expand source code
def decode(data_str: str, format: Format = Format.JSON ) -> Data: """Decode JSON data. Args: data_str: encoded JSON data format: encoding format """ if format == Format.JSON: return json.loads(data_str) if format == Format.YAML: loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') else yaml.SafeLoader) return yaml.load(io.StringIO(data_str), Loader=loader) raise ValueError('unsupported format')
def decode_file(path: pathlib.PurePath, format: typing.Union[Format, NoneType] = None) ‑> typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]
-
Decode JSON data from file.
If
format
isNone
, encoding format is derived from path suffix.Args
path
- file path
format
- encoding format
Expand source code
def decode_file(path: pathlib.PurePath, format: typing.Optional[Format] = None ) -> Data: """Decode JSON data from file. If `format` is ``None``, encoding format is derived from path suffix. Args: path: file path format: encoding format """ if format is None: if path.suffix == '.json': format = Format.JSON elif path.suffix in ('.yaml', '.yml'): format = Format.YAML else: raise ValueError('can not determine format from path suffix') with open(path, 'r', encoding='utf-8') as f: if format == Format.JSON: return json.load(f) if format == Format.YAML: loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') else yaml.SafeLoader) return yaml.load(f, Loader=loader) raise ValueError('unsupported format')
def diff(src: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], dst: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]) ‑> typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]
-
Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3] dst = [1, {'a': 4}, 3] result = diff(src, dst) assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
Expand source code
def diff(src: Data, dst: Data ) -> Data: """Generate JSON Patch diff. Example:: src = [1, {'a': 2}, 3] dst = [1, {'a': 4}, 3] result = diff(src, dst) assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] """ return jsonpatch.JsonPatch.from_diff(src, dst).patch
def encode(data: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], format: Format = Format.JSON, indent: typing.Union[int, NoneType] = None) ‑> str
-
Encode JSON data.
Args
data
- JSON data
format
- encoding format
indent
- indentation size
Expand source code
def encode(data: Data, format: Format = Format.JSON, indent: typing.Optional[int] = None ) -> str: """Encode JSON data. Args: data: JSON data format: encoding format indent: indentation size """ if format == Format.JSON: return json.dumps(data, indent=indent) if format == Format.YAML: dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') else yaml.SafeDumper) return str(yaml.dump(data, indent=indent, Dumper=dumper)) raise ValueError('unsupported format')
def encode_file(data: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], path: pathlib.PurePath, format: typing.Union[Format, NoneType] = None, indent: typing.Union[int, NoneType] = 4)
-
Encode JSON data to file.
If
format
isNone
, encoding format is derived from path suffix.Args
data
- JSON data
path
- file path
format
- encoding format
indent
- indentation size
Expand source code
def encode_file(data: Data, path: pathlib.PurePath, format: typing.Optional[Format] = None, indent: typing.Optional[int] = 4): """Encode JSON data to file. If `format` is ``None``, encoding format is derived from path suffix. Args: data: JSON data path: file path format: encoding format indent: indentation size """ if format is None: if path.suffix == '.json': format = Format.JSON elif path.suffix in ('.yaml', '.yml'): format = Format.YAML else: raise ValueError('can not determine format from path suffix') with open(path, 'w', encoding='utf-8') as f: if format == Format.JSON: json.dump(data, f, indent=indent) elif format == Format.YAML: dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') else yaml.SafeDumper) yaml.dump(data, f, indent=indent, Dumper=dumper, explicit_start=True, explicit_end=True) else: raise ValueError('unsupported format')
def equals(a: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], b: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]) ‑> bool
-
Equality comparison of json serializable data.
Tests for equality of data according to JSON format. Notably,
bool
values are not considered equal to numeric values in any case. This is different from default equality comparison, which considersFalse
equal to0
and0.0
; andTrue
equal to1
and1.0
.Example::
assert equals(0, 0.0) is True assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True assert equals(1, True) is False
Expand source code
def equals(a: Data, b: Data ) -> bool: """Equality comparison of json serializable data. Tests for equality of data according to JSON format. Notably, ``bool`` values are not considered equal to numeric values in any case. This is different from default equality comparison, which considers `False` equal to `0` and `0.0`; and `True` equal to `1` and `1.0`. Example:: assert equals(0, 0.0) is True assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True assert equals(1, True) is False """ if isinstance(a, bool) != isinstance(b, bool): return False if a != b: return False if isinstance(a, dict): return all(equals(a[key], b[key]) for key in a) elif isinstance(a, list): return all(equals(i, j) for i, j in zip(a, b)) else: return True
def flatten(data: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]) ‑> typing.Iterable[typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]]
-
Flatten JSON data
If
data
is array, this generator recursively yields result offlatten()
call with each element of input list. For otherData
types, input data is yielded.Example::
data = [1, [], [2], {'a': [3]}] result = [1, 2, {'a': [3]}] assert list(flatten(data)) == result
Expand source code
def flatten(data: Data ) -> typing.Iterable[Data]: """Flatten JSON data If `data` is array, this generator recursively yields result of `flatten` call with each element of input list. For other `Data` types, input data is yielded. Example:: data = [1, [], [2], {'a': [3]}] result = [1, 2, {'a': [3]}] assert list(flatten(data)) == result """ if isinstance(data, list): for i in data: yield from flatten(i) else: yield data
def get(data: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], path: typing.Union[int, str, typing.List[ForwardRef('Path')]], default: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]] = None) ‑> typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]
-
Get data element referenced by path
Example::
data = {'a': [1, 2, [3, 4]]} path = ['a', 2, 0] assert get(data, path) == 3 data = [1, 2, 3] assert get(data, 0) == 1 assert get(data, 5) is None assert get(data, 5, default=123) == 123
Expand source code
def get(data: Data, path: Path, default: typing.Optional[Data] = None ) -> Data: """Get data element referenced by path Example:: data = {'a': [1, 2, [3, 4]]} path = ['a', 2, 0] assert get(data, path) == 3 data = [1, 2, 3] assert get(data, 0) == 1 assert get(data, 5) is None assert get(data, 5, default=123) == 123 """ for i in flatten(path): if isinstance(i, str): if not isinstance(data, dict) or i not in data: return default data = data[i] elif isinstance(i, int) and not isinstance(i, bool): if not isinstance(data, list): return default try: data = data[i] except IndexError: return default else: raise ValueError('invalid path') return data
def patch(data: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], diff: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]) ‑> typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]
-
Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3] d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] result = patch(data, d) assert result == [1, {'a': 4}, 3]
Expand source code
def patch(data: Data, diff: Data ) -> Data: """Apply JSON Patch diff. Example:: data = [1, {'a': 2}, 3] d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] result = patch(data, d) assert result == [1, {'a': 4}, 3] """ return jsonpatch.apply_patch(data, diff)
def set_(data: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], path: typing.Union[int, str, typing.List[ForwardRef('Path')]], value: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]) ‑> typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]
-
Create new data by setting data path element value
Example::
data = [1, {'a': 2, 'b': 3}, 4] path = [1, 'b'] result = set_(data, path, 5) assert result == [1, {'a': 2, 'b': 5}, 4] assert result is not data data = [1, 2, 3] result = set_(data, 4, 4) assert result == [1, 2, 3, None, 4]
Expand source code
def set_(data: Data, path: Path, value: Data ) -> Data: """Create new data by setting data path element value Example:: data = [1, {'a': 2, 'b': 3}, 4] path = [1, 'b'] result = set_(data, path, 5) assert result == [1, {'a': 2, 'b': 5}, 4] assert result is not data data = [1, 2, 3] result = set_(data, 4, 4) assert result == [1, 2, 3, None, 4] """ parents = collections.deque() for i in flatten(path): parent = data if isinstance(i, str): data = data.get(i) if isinstance(data, dict) else None elif isinstance(i, int) and not isinstance(i, bool): try: data = data[i] if isinstance(data, list) else None except IndexError: data = None else: raise ValueError('invalid path') parents.append((parent, i)) while parents: parent, i = parents.pop() if isinstance(i, str): parent = dict(parent) if isinstance(parent, dict) else {} parent[i] = value elif isinstance(i, int) and not isinstance(i, bool): if not isinstance(parent, list): parent = [] if i >= len(parent): parent = [*parent, *itertools.repeat(None, i - len(parent) + 1)] elif i < 0 and (-i) > len(parent): parent = [*itertools.repeat(None, (-i) - len(parent)), *parent] else: parent = list(parent) parent[i] = value else: raise ValueError('invalid path') value = parent return value
Classes
class Format (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Ancestors
- enum.Enum
Class variables
var JSON
var YAML
class SchemaRepository (*args: typing.Union[pathlib.PurePath, NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], ForwardRef('SchemaRepository')])
-
JSON Schema repository.
A repository that holds json schemas and enables validation against them.
Repository can be initialized with multiple arguments, which can be instances of
pathlib.PurePath
,Data
orSchemaRepository
.If an argument is of type
pathlib.PurePath
, and path points to file with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded from the file. Otherwise, it is assumed that path points to a directory, which is recursively searched for json and yaml files. All decoded schemas are added to the repository. If a schema with the sameid
was previosly added, an exception is raised.If an argument is of type
Data
, it should be a json serializable data representation of a JSON schema. If a schema with the sameid
was previosly added, an exception is raised.If an argument is of type
SchemaRepository
, its schemas are added to the new repository. Previously added schemas with the sameid
are replaced.Expand source code
class SchemaRepository: """JSON Schema repository. A repository that holds json schemas and enables validation against them. Repository can be initialized with multiple arguments, which can be instances of ``pathlib.PurePath``, ``Data`` or ``SchemaRepository``. If an argument is of type ``pathlib.PurePath``, and path points to file with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded from the file. Otherwise, it is assumed that path points to a directory, which is recursively searched for json and yaml files. All decoded schemas are added to the repository. If a schema with the same `id` was previosly added, an exception is raised. If an argument is of type ``Data``, it should be a json serializable data representation of a JSON schema. If a schema with the same `id` was previosly added, an exception is raised. If an argument is of type ``SchemaRepository``, its schemas are added to the new repository. Previously added schemas with the same `id` are replaced. """ def __init__(self, *args: typing.Union[pathlib.PurePath, Data, 'SchemaRepository']): self._data = {} for arg in args: if isinstance(arg, pathlib.PurePath): self._load_path(arg) elif isinstance(arg, SchemaRepository): self._load_repository(arg) else: self._load_schema(arg) def validate(self, schema_id: str, data: Data): """Validate data against JSON schema. Args: schema_id: JSON schema identifier data: data to be validated Raises: jsonschema.ValidationError """ uri = urllib.parse.urlparse(schema_id) path = uri.netloc + uri.path resolver = jsonschema.RefResolver( base_uri=f'{uri.scheme}://{path}', referrer=self._data[uri.scheme][path], handlers={i: self._get_schema for i in self._data.keys()}) jsonschema.validate( instance=data, schema=resolver.resolve_fragment(resolver.referrer, uri.fragment), resolver=resolver) def to_json(self) -> Data: """Export repository content as json serializable data. Entire repository content is exported as json serializable data. New repository can be created from the exported content by using :meth:`SchemaRepository.from_json`. """ return self._data @staticmethod def from_json(data: typing.Union[pathlib.PurePath, Data] ) -> 'SchemaRepository': """Create new repository from content exported as json serializable data. Creates a new repository from content of another repository that was exported by using :meth:`SchemaRepository.to_json`. Args: data: repository data """ if isinstance(data, pathlib.PurePath): data = decode_file(data) repo = SchemaRepository() repo._data = data return repo def _get_schema(self, scheme_id): uri = urllib.parse.urlparse(scheme_id) path = uri.netloc + uri.path return self._data[uri.scheme][path] def _load_path(self, path): json_suffixes = {'.json', '.yaml', '.yml'} paths = ([path] if path.suffix in json_suffixes else list(itertools.chain.from_iterable( path.rglob(f'*{i}') for i in json_suffixes))) for path in paths: schema = decode_file(path) self._load_schema(schema) def _load_schema(self, schema): meta_schema_id = urllib.parse.urldefrag(schema.get('$schema', '')).url if meta_schema_id not in jsonschema.validators.meta_schemas: meta_schema_id = jsonschema.Draft7Validator.META_SCHEMA['$schema'] schema = dict(schema) schema['$schema'] = meta_schema_id uri = urllib.parse.urlparse(schema['id']) path = uri.netloc + uri.path if uri.scheme not in self._data: self._data[uri.scheme] = {} if path in self._data[uri.scheme]: raise Exception(f"duplicate schema id {uri.scheme}://{path}") self._data[uri.scheme][path] = schema def _load_repository(self, repo): for k, v in repo._data.items(): if k not in self._data: self._data[k] = v else: self._data[k].update(v)
Static methods
def from_json(data: typing.Union[pathlib.PurePath, NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]) ‑> SchemaRepository
-
Create new repository from content exported as json serializable data.
Creates a new repository from content of another repository that was exported by using :meth:
SchemaRepository.to_json()
.Args
data
- repository data
Expand source code
@staticmethod def from_json(data: typing.Union[pathlib.PurePath, Data] ) -> 'SchemaRepository': """Create new repository from content exported as json serializable data. Creates a new repository from content of another repository that was exported by using :meth:`SchemaRepository.to_json`. Args: data: repository data """ if isinstance(data, pathlib.PurePath): data = decode_file(data) repo = SchemaRepository() repo._data = data return repo
Methods
def to_json(self) ‑> typing.Union[NoneType, bool, int, float, str, typing.List[Data], typing.Dict[str, Data]]
-
Export repository content as json serializable data.
Entire repository content is exported as json serializable data. New repository can be created from the exported content by using :meth:
SchemaRepository.from_json()
.Expand source code
def to_json(self) -> Data: """Export repository content as json serializable data. Entire repository content is exported as json serializable data. New repository can be created from the exported content by using :meth:`SchemaRepository.from_json`. """ return self._data
def validate(self, schema_id: str, data: typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]])
-
Validate data against JSON schema.
Args
schema_id
- JSON schema identifier
data
- data to be validated
Raises
jsonschema.ValidationError
Expand source code
def validate(self, schema_id: str, data: Data): """Validate data against JSON schema. Args: schema_id: JSON schema identifier data: data to be validated Raises: jsonschema.ValidationError """ uri = urllib.parse.urlparse(schema_id) path = uri.netloc + uri.path resolver = jsonschema.RefResolver( base_uri=f'{uri.scheme}://{path}', referrer=self._data[uri.scheme][path], handlers={i: self._get_schema for i in self._data.keys()}) jsonschema.validate( instance=data, schema=resolver.resolve_fragment(resolver.referrer, uri.fragment), resolver=resolver)