Module lakota.schema

Expand source code
import shlex
from dataclasses import dataclass
from datetime import date, datetime

from numcodecs import registry
from numpy import (asarray, ascontiguousarray, dtype, frombuffer, issubdtype,
                   ndarray)

from .utils import hexdigest


DTYPES = [dtype(s) for s in ("datetime64[s]", "int64", "float64", "U", "O")]

ALIASES = {
    "date": "M8[D]",
    "timestamp": "M8[s]",
    "float": "f8",
    "int": "i8",
    "str": "U",
}

__all__ = ["Schema"]

# Provide conversion between numpy and python types
DTYPE_MAP = {
    "default": {
        dtype("M8[D]"): date,
        dtype("M8[s]"): datetime,
        dtype("float64"): float,
        dtype("int64"): int,
    },
    "epoch": {
        dtype("M8[D]"): ["M8[s]", int],
        dtype("M8[s]"): int,
        dtype("float64"): float,
        dtype("int64"): int,
    },
}


class Codec:
    def __init__(self, dt, *codec_names):
        # Make sure dtype is valid
        dt = dtype(ALIASES.get(dt, dt))
        self.dt = dt
        # Build list of codecs
        if codec_names:
            self.codec_names = codec_names
        else:
            # Adapt dtypes and codec_names
            default_codec_names = ["blosc"]
            if dt in (dtype("O"), dtype("U")):
                default_codec_names = ["msgpack2", "zstd"]
            self.codec_names = default_codec_names

    def encode(self, arr, with_digest=False):
        if len(arr) == 0:
            res = b""
        else:
            # encoding may require contiguous memory
            res = ascontiguousarray(arr)
            # convert to proper type
            res = res.astype(self.dt)
            # Apply codecs
            for codec_name in self.codec_names:
                codec = registry.codec_registry[codec_name]
                kw = {}
                if codec_name == "blosc":
                    kw = {
                        "cname": "zstd",
                        "shuffle": codec.BITSHUFFLE,
                    }
                res = codec(**kw).encode(res)
        if not with_digest:
            return res

        # Extra step: compute digest
        if issubdtype(self.dt, "M"):
            digest = hexdigest(ascontiguousarray(arr.view("i8")))
        elif self.dt in (dtype("O"), dtype("U")):
            digest = hexdigest(res)
        else:
            digest = hexdigest(ascontiguousarray(arr))
        return res, digest

    def decode(self, arr):
        if len(arr) == 0:
            return asarray([], dtype=self.dt)
        # Apply all codecs
        for name in reversed(self.codec_names):
            codec = registry.codec_registry[name]
            arr = codec().decode(arr)
        if self.dt in ("O", "U"):
            return arr.astype(self.dt)
        return frombuffer(arr, dtype=self.dt)

    def __eq__(self, other):
        return self.codec_names == other.codec_names and self.dt == other.dt

    def __repr__(self):
        names = ", ".join(self.codec_names)
        return f"<Codec {self.dt}:{names}>"


class SchemaColumn:
    def __init__(self, name, dt, codecs, idx):
        self.name = name
        self.codec = Codec(dt, *codecs)
        self.idx = idx

    @classmethod
    def from_ui(cls, name, definition):
        parser = shlex.shlex(definition, posix=True, punctuation_chars="|*")
        parser.wordchars += "[]"
        dt, *tokens = parser
        idx = False
        codec_names = []
        state = None
        for tk in tokens:
            if tk == "|":
                state = "codec"
            elif tk == "*":
                idx = True
            elif state == "codec":
                codec_names.append(tk)
            else:
                raise ValueError(f"Unexpected item: {tk}")
        return SchemaColumn(name, dt, codecs=codec_names, idx=idx)

    def cast(self, arr):
        if isinstance(arr, ndarray) and issubdtype(arr.dtype, self.codec.dt):
            return arr
        return asarray(arr, dtype=self.codec.dt)

    def map_dtype(self, arr, style="default"):
        """
        Return `arr` (based on numpy types) converted to python
        type. `style` can be default or `epoch`.
        """
        mapping = DTYPE_MAP[style]
        dts = mapping.get(self.codec.dt)
        if dts is None:
            return arr
        dts = dts if isinstance(dts, (list, tuple)) else [dts]
        for dt in dts:
            arr = arr.astype(dt)
        return arr

    def cast_scalar(self, value):
        return dtype(self.codec.dt).type(value)

    def dumps(self):
        return {
            "dt": str(self.codec.dt),
            "codecs": self.codec.codec_names,
            "idx": self.idx,
        }

    def __eq__(self, other):
        return (
            self.name == other.name
            and self.idx == other.idx
            and self.codec == other.codec
        )

    def zero(self):
        return "" if self.codec.dt == "str" else 0


class Schema:
    def __init__(self, **columns):
        self.kind = None
        self.columns = {}
        for name, definition in columns.items():
            if not isinstance(definition, SchemaColumn):
                definition = SchemaColumn.from_ui(name, definition)
            self.columns[name] = definition

        self.idx = {n: c for n, c in self.columns.items() if c.idx}
        self.non_idx = {n: c for n, c in self.columns.items() if not c.idx}
        if len(self.idx) == 0:
            raise ValueError("Invalid schema, no index defined")

    def clone(self, *keep):
        cols = keep or list(self)
        return Schema(**{c: self[c] for c in cols})

    @classmethod
    def kv(cls, **columns):
        schema = Schema(**columns)
        schema.kind = "kv"
        return schema

    @classmethod
    def from_frame(cls, frame, idx_columns=None):
        """
        Instantiate a schema based on the column names and type if the
        given frame (a dict or a dataframe)
        """
        idx_columns = idx_columns or list(frame)
        col_defs = {}
        for name in frame:
            arr = frame[name]
            col_defs[name] = SchemaColumn(name, arr.dtype, [], name in idx_columns)
        return Schema(**col_defs)

    def serialize(self, values):
        if not values:
            return tuple()
        if not isinstance(values, (list, tuple)):
            values = (values,)
        # TODO implement column type based repr
        return tuple(str(val) for col, val in zip(self.columns.values(), values))

    def deserialize(self, values=tuple()):
        if not values:
            return tuple()
        if not isinstance(values, (list, tuple)):
            values = (values,)
        res = tuple(
            col.cast_scalar(val) for col, val in zip(self.columns.values(), values)
        )
        return res

    @classmethod
    def loads(self, data):
        columns = {
            name: SchemaColumn(name, **opts) for name, opts in data["columns"].items()
        }
        if data["kind"] == "kv":
            return Schema.kv(**columns)
        return Schema(**columns)

    def dumps(self):
        columns = {c.name: c.dumps() for c in self.columns.values()}
        return {"kind": self.kind, "columns": columns}

    def __iter__(self):
        # TODO return self.columns.values and add an ls method (to behave like repo.ls)
        return iter(self.columns.keys())

    def __repr__(self):
        cols = [f"{c.name} {c.codec.dt}" for c in self.columns.values()]
        return "<Schema {}>".format(" ".join(cols))

    def __eq__(self, other):
        return all(
            x == y for x, y in zip(self.columns.values(), other.columns.values())
        )

    def cast(self, df=None):
        if df is None:
            df = {}
            columns = list(self)
        else:
            columns = [c for c in df if c in self]

        res = {}
        for name in columns:
            col = self[name]
            res[col.name] = col.cast(df.get(col.name, []))
        return res

    def __getitem__(self, name):
        return self.columns[name]

    def row(self, df, pos, full=True):
        """
        Extract a row of the dataframe-like object at
        given position
        """
        cols = self.columns if full else self.idx
        return tuple(df[n][pos] for n in cols)

    def __matmul__(self, labels):
        if not isinstance(labels, (list, tuple)):
            labels = [labels]
        return SeriesDefinition(self, labels)


@dataclass
class SeriesDefinition:
    schema: Schema
    labels: list

Classes

class Schema (**columns)
Expand source code
class Schema:
    def __init__(self, **columns):
        self.kind = None
        self.columns = {}
        for name, definition in columns.items():
            if not isinstance(definition, SchemaColumn):
                definition = SchemaColumn.from_ui(name, definition)
            self.columns[name] = definition

        self.idx = {n: c for n, c in self.columns.items() if c.idx}
        self.non_idx = {n: c for n, c in self.columns.items() if not c.idx}
        if len(self.idx) == 0:
            raise ValueError("Invalid schema, no index defined")

    def clone(self, *keep):
        cols = keep or list(self)
        return Schema(**{c: self[c] for c in cols})

    @classmethod
    def kv(cls, **columns):
        schema = Schema(**columns)
        schema.kind = "kv"
        return schema

    @classmethod
    def from_frame(cls, frame, idx_columns=None):
        """
        Instantiate a schema based on the column names and type if the
        given frame (a dict or a dataframe)
        """
        idx_columns = idx_columns or list(frame)
        col_defs = {}
        for name in frame:
            arr = frame[name]
            col_defs[name] = SchemaColumn(name, arr.dtype, [], name in idx_columns)
        return Schema(**col_defs)

    def serialize(self, values):
        if not values:
            return tuple()
        if not isinstance(values, (list, tuple)):
            values = (values,)
        # TODO implement column type based repr
        return tuple(str(val) for col, val in zip(self.columns.values(), values))

    def deserialize(self, values=tuple()):
        if not values:
            return tuple()
        if not isinstance(values, (list, tuple)):
            values = (values,)
        res = tuple(
            col.cast_scalar(val) for col, val in zip(self.columns.values(), values)
        )
        return res

    @classmethod
    def loads(self, data):
        columns = {
            name: SchemaColumn(name, **opts) for name, opts in data["columns"].items()
        }
        if data["kind"] == "kv":
            return Schema.kv(**columns)
        return Schema(**columns)

    def dumps(self):
        columns = {c.name: c.dumps() for c in self.columns.values()}
        return {"kind": self.kind, "columns": columns}

    def __iter__(self):
        # TODO return self.columns.values and add an ls method (to behave like repo.ls)
        return iter(self.columns.keys())

    def __repr__(self):
        cols = [f"{c.name} {c.codec.dt}" for c in self.columns.values()]
        return "<Schema {}>".format(" ".join(cols))

    def __eq__(self, other):
        return all(
            x == y for x, y in zip(self.columns.values(), other.columns.values())
        )

    def cast(self, df=None):
        if df is None:
            df = {}
            columns = list(self)
        else:
            columns = [c for c in df if c in self]

        res = {}
        for name in columns:
            col = self[name]
            res[col.name] = col.cast(df.get(col.name, []))
        return res

    def __getitem__(self, name):
        return self.columns[name]

    def row(self, df, pos, full=True):
        """
        Extract a row of the dataframe-like object at
        given position
        """
        cols = self.columns if full else self.idx
        return tuple(df[n][pos] for n in cols)

    def __matmul__(self, labels):
        if not isinstance(labels, (list, tuple)):
            labels = [labels]
        return SeriesDefinition(self, labels)

Static methods

def from_frame(frame, idx_columns=None)

Instantiate a schema based on the column names and type if the given frame (a dict or a dataframe)

Expand source code
@classmethod
def from_frame(cls, frame, idx_columns=None):
    """
    Instantiate a schema based on the column names and type if the
    given frame (a dict or a dataframe)
    """
    idx_columns = idx_columns or list(frame)
    col_defs = {}
    for name in frame:
        arr = frame[name]
        col_defs[name] = SchemaColumn(name, arr.dtype, [], name in idx_columns)
    return Schema(**col_defs)
def kv(**columns)
Expand source code
@classmethod
def kv(cls, **columns):
    schema = Schema(**columns)
    schema.kind = "kv"
    return schema
def loads(data)
Expand source code
@classmethod
def loads(self, data):
    columns = {
        name: SchemaColumn(name, **opts) for name, opts in data["columns"].items()
    }
    if data["kind"] == "kv":
        return Schema.kv(**columns)
    return Schema(**columns)

Methods

def cast(self, df=None)
Expand source code
def cast(self, df=None):
    if df is None:
        df = {}
        columns = list(self)
    else:
        columns = [c for c in df if c in self]

    res = {}
    for name in columns:
        col = self[name]
        res[col.name] = col.cast(df.get(col.name, []))
    return res
def clone(self, *keep)
Expand source code
def clone(self, *keep):
    cols = keep or list(self)
    return Schema(**{c: self[c] for c in cols})
def deserialize(self, values=())
Expand source code
def deserialize(self, values=tuple()):
    if not values:
        return tuple()
    if not isinstance(values, (list, tuple)):
        values = (values,)
    res = tuple(
        col.cast_scalar(val) for col, val in zip(self.columns.values(), values)
    )
    return res
def dumps(self)
Expand source code
def dumps(self):
    columns = {c.name: c.dumps() for c in self.columns.values()}
    return {"kind": self.kind, "columns": columns}
def row(self, df, pos, full=True)

Extract a row of the dataframe-like object at given position

Expand source code
def row(self, df, pos, full=True):
    """
    Extract a row of the dataframe-like object at
    given position
    """
    cols = self.columns if full else self.idx
    return tuple(df[n][pos] for n in cols)
def serialize(self, values)
Expand source code
def serialize(self, values):
    if not values:
        return tuple()
    if not isinstance(values, (list, tuple)):
        values = (values,)
    # TODO implement column type based repr
    return tuple(str(val) for col, val in zip(self.columns.values(), values))