Source code for eodatasets3.serialise

import uuid
from datetime import datetime
from functools import partial
from pathlib import Path, PurePath
from typing import IO, Dict, Iterable, Mapping, Tuple, Union
from uuid import UUID

import attr
import cattr
import ciso8601
import click
import jsonschema
import numpy
import shapely
import shapely.affinity
import shapely.ops
from affine import Affine
from datacube.model import SCHEMA_PATH as DATACUBE_SCHEMAS_PATH
from datacube.utils import read_documents
from ruamel.yaml import YAML, Representer
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from shapely.geometry import shape
from shapely.geometry.base import BaseGeometry

from eodatasets3.model import ODC_DATASET_SCHEMA_URL, DatasetDoc, Eo3Dict
from eodatasets3.properties import FileFormat


def _format_representer(dumper, data: FileFormat):
    return dumper.represent_scalar("tag:yaml.org,2002:str", f"{data.name}")


def _uuid_representer(dumper, data):
    """
    :type dumper: yaml.representer.BaseRepresenter
    :type data: uuid.UUID
    :rtype: yaml.nodes.Node
    """
    return dumper.represent_scalar("tag:yaml.org,2002:str", f"{data}")


def _represent_datetime(self, data: datetime):
    """
    The default Ruamel representer strips 'Z' suffixes for UTC.

    But we like to be explicit.
    """
    # If there's a non-utc timezone, use it.
    if data.tzinfo is not None and (data.utcoffset().total_seconds() > 0):
        value = data.isoformat(" ")
    else:
        # Otherwise it's UTC (including when tz==null).
        value = data.replace(tzinfo=None).isoformat(" ") + "Z"
    return self.represent_scalar("tag:yaml.org,2002:timestamp", value)


def _represent_numpy_datetime(self, data: numpy.datetime64):
    return _represent_datetime(self, data.astype("M8[ms]").tolist())


def _represent_paths(self, data: PurePath):
    return Representer.represent_str(self, data.as_posix())


def _init_yaml() -> YAML:
    yaml = YAML()

    yaml.representer.add_representer(FileFormat, _format_representer)
    yaml.representer.add_multi_representer(UUID, _uuid_representer)
    yaml.representer.add_representer(datetime, _represent_datetime)
    yaml.representer.add_multi_representer(PurePath, _represent_paths)

    # WAGL spits out many numpy primitives in docs.
    yaml.representer.add_representer(numpy.int8, Representer.represent_int)
    yaml.representer.add_representer(numpy.uint8, Representer.represent_int)
    yaml.representer.add_representer(numpy.int16, Representer.represent_int)
    yaml.representer.add_representer(numpy.uint16, Representer.represent_int)
    yaml.representer.add_representer(numpy.int32, Representer.represent_int)
    yaml.representer.add_representer(numpy.uint32, Representer.represent_int)
    yaml.representer.add_representer(numpy.int64, Representer.represent_int)
    yaml.representer.add_representer(numpy.uint64, Representer.represent_int)
    yaml.representer.add_representer(numpy.float32, Representer.represent_float)
    yaml.representer.add_representer(numpy.float64, Representer.represent_float)
    yaml.representer.add_representer(numpy.ndarray, Representer.represent_list)
    yaml.representer.add_representer(numpy.datetime64, _represent_numpy_datetime)

    # Match yamllint default expectations. (Explicit start/end are recommended to tell if a file is cut off)
    yaml.width = 80
    yaml.explicit_start = True
    yaml.explicit_end = True

    return yaml


def dump_yaml(output_yaml: Path, *docs: Mapping) -> None:
    if not output_yaml.name.lower().endswith(".yaml"):
        raise ValueError(
            f"YAML filename doesn't end in *.yaml (?). Received {output_yaml!r}"
        )

    yaml = _init_yaml()
    with output_yaml.open("w") as stream:
        yaml.dump_all(docs, stream)


def dumps_yaml(stream, *docs: Mapping) -> None:
    """Dump yaml through a stream, using the default serialisation settings."""
    return _init_yaml().dump_all(docs, stream=stream)


def load_yaml(p: Path) -> Dict:
    with p.open() as f:
        return _yaml().load(f)


def _yaml():
    return YAML(typ="safe")


def loads_yaml(stream: Union[str, IO]) -> Iterable[Dict]:
    """Dump yaml through a stream, using the default deserialisation settings."""
    return _yaml().load_all(stream)


[docs]def from_path(path: Path, skip_validation=False) -> DatasetDoc: """ Parse an EO3 document from a filesystem path :param path: Filesystem path :param skip_validation: Optionally disable validation (it's faster, but I hope your doc is structured correctly) """ if path.suffix.lower() not in (".yaml", ".yml"): raise ValueError(f"Unexpected file type {path.suffix}. Expected yaml") return from_doc(load_yaml(path), skip_validation=skip_validation)
class InvalidDataset(Exception): def __init__(self, path: Path, error_code: str, reason: str) -> None: self.path = path self.error_code = error_code self.reason = reason def _is_json_array(checker, instance) -> bool: """ By default, jsonschema only allows a json array to be a Python list. Let's allow it to be a tuple too. """ return isinstance(instance, (list, tuple)) def _load_schema_validator(p: Path) -> jsonschema.Draft6Validator: """ Create a schema instance for the file. (Assumes they are trustworthy. Only local schemas!) """ with p.open() as f: schema = _yaml().load(f) validator = jsonschema.validators.validator_for(schema) validator.check_schema(schema) # Allow schemas to reference other schemas relatively def doc_reference(path): path = p.parent.joinpath(path) if not path.exists(): raise ValueError(f"Reference not found: {path}") referenced_schema = next(iter(read_documents(path)))[1] return referenced_schema ref_resolver = jsonschema.RefResolver.from_schema( schema, handlers={"": doc_reference} ) custom_validator = jsonschema.validators.extend( validator, type_checker=validator.TYPE_CHECKER.redefine("array", _is_json_array) ) return custom_validator(schema, resolver=ref_resolver) DATASET_SCHEMA = _load_schema_validator(Path(__file__).parent / "dataset.schema.yaml") PRODUCT_SCHEMA = _load_schema_validator( DATACUBE_SCHEMAS_PATH / "dataset-type-schema.yaml" ) METADATA_TYPE_SCHEMA = _load_schema_validator( DATACUBE_SCHEMAS_PATH / "metadata-type-schema.yaml" )
[docs]def from_doc( doc: Dict, skip_validation=False, normalise_properties=False ) -> DatasetDoc: """ Parse a dictionary into an EO3 dataset. By default it will validate it against the schema, which will result in far more useful error messages if fields are missing. :param doc: A dictionary, such as is returned from yaml.load or json.load :param skip_validation: Optionally disable validation (it's faster, but I hope your doc is structured correctly) """ if not skip_validation: DATASET_SCHEMA.validate(doc) # TODO: stable cattrs (<1.0) balks at the $schema variable. doc = doc.copy() del doc["$schema"] location = doc.pop("location", None) if location: doc["locations"] = [location] c = cattr.Converter() c.register_structure_hook(uuid.UUID, _structure_as_uuid) c.register_structure_hook(BaseGeometry, _structure_as_shape) c.register_structure_hook( Eo3Dict, partial(_structure_as_stac_props, normalise_properties=normalise_properties), ) c.register_structure_hook(Affine, _structure_as_affine) c.register_unstructure_hook(Eo3Dict, _unstructure_as_stac_props) return c.structure(doc, DatasetDoc)
def _structure_as_uuid(d, t): return uuid.UUID(str(d)) def _structure_as_stac_props(d, t, normalise_properties=False): """ :param normalise_properties: We don't normalise properties by default as we usually want it to reflect the original file. """ return Eo3Dict( # The passed-in dictionary is stored internally, so we want to make a copy of it # so that our serialised output is fully separate from the input. dict(d), normalise_input=normalise_properties, ) def _structure_as_affine(d: Tuple, t): if len(d) not in [6, 9]: raise ValueError(f"Expected 6 or 9 coefficients in transform. Got {d!r}") if len(d) == 9: if tuple(d[-3:]) != (0.0, 0.0, 1.0): raise ValueError( f"Nine-element affine should always end in [0, 0, 1]. Got {d!r}" ) d = [*d[:-3]] return Affine(*d) def _unstructure_as_stac_props(v: Eo3Dict): return v._props def _structure_as_shape(d, t): return shape(d)
[docs]def to_doc(d: DatasetDoc) -> Dict: """ Serialise a DatasetDoc to a dict If you plan to write this out as a yaml file on disk, you're better off with one of our formatted writers: :func:`.to_stream`, :func:`.to_path`. """ doc = attr.asdict( d, recurse=True, dict_factory=dict, # Exclude fields that are the default. filter=lambda attr, value: "doc_exclude" not in attr.metadata and value != attr.default # Exclude any fields set to None. The distinction should never matter in our docs. and value is not None, retain_collection_types=False, ) doc["$schema"] = ODC_DATASET_SCHEMA_URL if d.geometry is not None: doc["geometry"] = shapely.geometry.mapping(d.geometry) doc["id"] = str(d.id) doc["properties"] = dict(d.properties) if len(doc.get("locations", [])) == 1: doc["location"] = doc.pop("locations")[0] return doc
def to_formatted_doc(d: DatasetDoc) -> CommentedMap: """Serialise a DatasetDoc to a yaml-serialisation-ready dict""" doc = prepare_formatting(to_doc(d)) # Add user-readable names for measurements as a comment if present. if d.measurements: for band_name, band_doc in d.measurements.items(): if band_doc.alias and band_name.lower() != band_doc.alias.lower(): doc["measurements"].yaml_add_eol_comment(band_doc.alias, band_name) return doc
[docs]def to_path(path: Path, *ds: DatasetDoc): """ Output dataset(s) as a formatted YAML to a local path (multiple datasets will result in a multi-document yaml file) """ dump_yaml(path, *(to_formatted_doc(d) for d in ds))
[docs]def to_stream(stream, *ds: DatasetDoc): """ Output dataset(s) as a formatted YAML to an output stream (multiple datasets will result in a multi-document yaml file) """ dumps_yaml(stream, *(to_formatted_doc(d) for d in ds))
def _stac_key_order(key: str): """All keys in alphabetical order, but unprefixed keys first.""" if ":" in key: # Tilde comes after all alphanumerics. return f"~{key}" else: return key def _eo3_key_order(keyval: str): """ Order keys in an an EO3 document. Suitable for sorted() func usage. """ key, val = keyval try: i = _EO3_PROPERTY_ORDER.index(key) if i == -1: return 999 return i except ValueError: return 999 # A logical, readable order for properties to be in a dataset document. _EO3_PROPERTY_ORDER = [ "$schema", # Products / Types "name", "license", "metadata_type", "description", "metadata", # EO3 "id", "label", "product", "location", "locations", "crs", "geometry", "grids", "properties", "measurements", "accessories", "lineage", ] def prepare_formatting(d: Mapping) -> CommentedMap: """ Format an eo3 dataset dict for human-readable yaml serialisation. This will order fields, add whitespace, comments, etc. Output is intended for ruamel.yaml. """ # Sort properties for readability. doc = CommentedMap(sorted(d.items(), key=_eo3_key_order)) doc["properties"] = CommentedMap( sorted(doc["properties"].items(), key=_stac_key_order) ) # Whitespace doc.yaml_set_comment_before_after_key("$schema", before="Dataset") if "geometry" in doc: # Set some numeric fields to be compact yaml format. _use_compact_format(doc["geometry"], "coordinates") if "grids" in doc: for grid in doc["grids"].values(): _use_compact_format(grid, "shape", "transform") _add_space_before( doc, "label" if "label" in doc else "id", "crs", "properties", "measurements", "accessories", "lineage", "location", "locations", ) p: CommentedMap = doc["properties"] p.yaml_add_eol_comment("# Ground sample distance (m)", "eo:gsd") return doc def _use_compact_format(d: dict, *keys): """Change the given sequence to compact YAML form""" for key in keys: if key in d: d[key] = CommentedSeq(d[key]) d[key].fa.set_flow_style() def _add_space_before(d: CommentedMap, *keys): """Add an empty line to the document before a section (key)""" for key in keys: d.yaml_set_comment_before_after_key(key, before="\n") class ClickDatetime(click.ParamType): """ Take a datetime parameter, supporting any ISO8601 date/time/timezone combination. """ name = "date" def convert(self, value, param, ctx): if value is None: return value if isinstance(value, datetime): return value try: return ciso8601.parse_datetime(value) except ValueError: self.fail( ( "Invalid date string {!r}. Expected any ISO date/time format " '(eg. "2017-04-03" or "2014-05-14 12:34")'.format(value) ), param, ctx, )