Module lakota.series

Expand source code
from numpy import issubdtype, repeat

from .batch import Batch
from .changelog import phi
from .commit import Commit
from .frame import Frame
from .utils import Closed, Interval, Pool, hashed_path, settings

__all__ = ["Series", "KVSeries"]


def intersect(revision, start, stop):
    ok_start = not stop or revision["start"][: len(stop)] <= stop
    ok_stop = not start or revision["stop"][: len(start)] >= start
    if not (ok_start and ok_stop):
        return None
    # return reduced range
    max_start = max(revision["start"], start)
    min_stop = min(revision["stop"], stop) if stop else revision["stop"]
    return (max_start, min_stop)


class Series:
    """
    Combine a pod and a changelog to provide a versioned and
    concurrent management of series.
    """

    def __init__(self, label, collection):
        self.collection = collection
        self.schema = collection.schema
        self.pod = collection.pod
        self.changelog = collection.changelog
        self.label = label

    def segments(
        self,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        from_ci=None,
    ):
        """
        Find matching segments
        """

        if not from_ci:
            # Find leaf commit
            leaf_rev = self.changelog.leaf(before=before)
            if not leaf_rev:
                return []
            from_ci = leaf_rev.commit(self.collection)
        return from_ci.segments(self.label, self.pod, start, stop, closed=closed)

    def period(self, rev):
        """
        Return average period (time delta between two tic) of a given revision
        """
        start = self.schema.deserialize(rev["start"])[0]
        stop = self.schema.deserialize(rev["stop"])[0]
        span = stop - start
        # span is a timedelta64
        span = span.item().total_seconds()
        return span / rev["len"]

    def interval(self, size=500_000):
        """
        Find smallest natural partition that will fit `size` items
        """
        schema = self.schema
        head_col = next(iter(schema.idx))
        assert issubdtype(schema[head_col].codec.dt, "datetime64")

        revisions = self.changelog.log()
        if not revisions:
            return None
        min_period = min(self.period(rev) for rev in revisions)
        target = min_period * size
        return Interval.bisect(target)

    def write(self, frame, start=None, stop=None, closed="b", root=False):
        # Each commit is like a frame. A row in this frame represent a
        # write (aka a segment) and contains one digest per series
        # column + 2*N extra columns that encode start-stop values (N
        # being the number of index columns of the series) + a column
        # containing the series name (like that we can write all the
        # series in one commit)

        frame = Frame(self.schema, frame)

        # Make sure frame is sorted
        # XXX forbid repeated values in index ??
        assert frame.is_sorted(), "Frame is not sorted!"

        # Save segments
        all_dig = []
        arr_length = None
        embedded = {}
        with Pool() as pool:
            for name in self.schema:
                # Cast array & check len
                values = frame[name]
                if arr_length is None:
                    arr_length = len(values)
                elif len(values) != arr_length:
                    raise ValueError("Length mismatch")
                digest, embed_data = self._write_col(name, values, pool)
                all_dig.append(digest)
                if embed_data is not None:
                    embedded[digest] = embed_data

        # Build commit info
        start = frame.start() if start is None else start
        stop = frame.stop() if stop is None else stop
        if not isinstance(start, tuple):
            start = (start,)
        if not isinstance(stop, tuple):
            stop = (stop,)

        # Create new digest
        batch = self.collection.batch
        if batch:
            ci_info = (self.label, start, stop, all_dig, len(frame), closed, embedded)
            if isinstance(batch, Batch):
                batch.append(*ci_info)
            else:
                return ci_info
            return
        return self.commit(
            start,
            stop,
            all_dig,
            len(frame),
            root=root,
            closed=closed,
            embedded=embedded,
        )

    def _write_col(self, name, values, pool):
        # Encode content
        arr = self.schema[name].cast(values)
        # Create digest (based on actual array for simple
        # type, based on encoded content for O and U)
        codec = self.schema[name].codec
        data, digest = codec.encode(arr, with_digest=True)

        embedded_data = None
        if len(data) < settings.embed_max_size:  # every small array gets embedded
            # Put small arrays aside
            embedded_data = data
        else:
            folder, filename = hashed_path(digest)
            # XXX move writing in Series.commit and handle situation where the commit gets to large?
            # XXX keep it here for when a batch gets too large ?
            pool.submit(self.pod.cd(folder).write, filename, data)
        return digest, embedded_data

    def update(self, frame):
        frame = Frame(self.schema, frame)
        start, stop = frame.start(), frame.stop()
        idx = tuple(self.schema.idx)
        upd_cols = tuple(c for c in frame if c not in idx)
        read_cols = tuple(c for c in self.schema.columns if c not in idx + upd_cols)
        db_frm = self.frame(start=start, stop=stop, closed="b", select=idx + read_cols)
        db_start, db_stop = db_frm.start(), db_frm.stop()
        overlap_frm = frame.islice(db_start, db_stop, "b")
        head_frm = frame.islice(None, db_start, "l")
        tail_frm = frame.islice(db_stop, None, "r")

        # Make sure index matches on overlapping part
        for col in idx:
            if (
                len(db_frm) != len(overlap_frm)
                or (db_frm[col] != overlap_frm[col]).any()
            ):
                raise ValueError("Update frame is not aligned with existing index")

        # Update columns
        for col in upd_cols:
            db_frm[col] = overlap_frm[col]

        # Add columns filled with zero-like values in non-overlapping
        # frames
        for frm in (head_frm, tail_frm):
            for col in read_cols:
                frm[col] = repeat(self.schema[col].zero(), len(frm))

        full_frm = Frame.concat(head_frm, db_frm, tail_frm)
        return self.write(full_frm, start, stop, closed="b")

    def commit(
        self, start, stop, all_dig, length, root=False, closed="b", embedded=None
    ):
        # root force commit on phi
        leaf_rev = None if root else self.changelog.leaf()

        # Combine with last commit
        if leaf_rev:
            leaf_ci = leaf_rev.commit(self.collection)
            new_ci = leaf_ci.update(
                self.label,
                start,
                stop,
                all_dig,
                length,
                closed=closed,
                embedded=embedded,
            )
            # TODO early return if new_ci == leaf_ci
        else:
            new_ci = Commit.one(
                self.schema,
                self.label,
                start,
                stop,
                all_dig,
                length,
                closed=closed,
                embedded=embedded,
            )

        payload = new_ci.encode()
        parent = leaf_rev.child if leaf_rev else phi
        return self.changelog.commit(payload, parents=[parent])

    def delete(self, start, stop, closed="b", root=False):
        frm = {k: [] for k in self.schema}
        return self.write(frame=frm, start=start, stop=stop, closed=closed, root=root)

    def __len__(self):
        return sum(len(s) for s in self.segments())

    def __bool__(self):
        return self.label in self.collection.ls()

    def paginate(
        self,
        step=settings.page_len,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):

        return Paginate(
            self,
            step=step,
            start=start,
            stop=stop,
            before=before,
            closed=closed,
            limit=limit,
            offset=offset,
            select=select,
        ).iter()

    def tail(
        self,
        length,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):
        '''
        Return the last `length` values of the series. Optionaly
        pre-filtered between `start` and `stop`.
        '''
        if length <= 0:
            raise ValueError("length argument must be > 0")
        segments = self.segments(
            start=self.schema.deserialize(start),
            stop=self.schema.deserialize(stop),
            before=before,
            closed=closed,
        )

        cnt = 0
        res = []
        # Create one frame per segment, starting from the last one.
        for segment in reversed(list(segments)):
            frm = Frame.from_segments(
                self.schema, [segment], select=select
            )
            if cnt + len(frm) >= length:
                # Last frame: keep the correct amount of lines
                cut = length - cnt
                res.append(frm.slice(start=-cut))
                break
            # We consume the full frame, append it and increase counter
            res.append(frm)
            cnt += len(frm)

        if not res:
            return Frame(self.schema)

        # Re-order frames and concat
        frm = Frame.concat(*reversed(res))

        if (limit, offset) != (None, None):
            start = offset or 0
            stop = start + (limit or 0)
            frm = frm.slice(start, stop)

        return frm

    def frame(
        self,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):
        segments = self.segments(
            start=self.schema.deserialize(start),
            stop=self.schema.deserialize(stop),
            before=before,
            closed=closed,
        )


        return Frame.from_segments(
            self.schema,
            segments,
            limit=limit,
            offset=offset,
            select=select,
        )

    def df(
        self,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):
        return self.frame(
            start=start,
            stop=stop,
            before=before,
            closed=closed,
            limit=limit,
            offset=offset,
            select=select,
        ).df()


class Paginate:
    def __init__(
        self,
        series,
        step=settings.page_len,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):
        self.series = series
        self.start = series.schema.deserialize(start)
        self.stop = series.schema.deserialize(stop)
        self.closed = Closed.cast(closed)
        self.limit = limit
        self.offset = offset or 0
        self.select = select
        self.step = step

        if step <= 0:
            raise ValueError("step argument must be > 0")

        leaf_rev = series.changelog.leaf(before=before)
        self.from_ci = leaf_rev and leaf_rev.commit(self.series.collection)

    def iter(self):
        if not self.from_ci:
            return []
        for frm in self.loop():
            if not frm.empty:
                yield frm

    def loop(self):
        if self.select:
            select_with_idx = self.select + [
                c for c in self.series.schema.idx
                if c not in self.select
            ]
        else:
            select_with_idx = self.select

        while True:
            if self.limit == 0:
                break
            if self.limit is None:
                lmt = self.step
            else:
                lmt = min(self.step, self.limit)

            # Load some segments
            read_len = 0
            segments = []
            for sgm in self.series.segments(self.start, self.stop, closed=self.closed):
                segments.append(sgm)
                read_len += len(sgm)
                if read_len >= lmt:
                    break

            # Create frame & yield it
            frm = Frame.from_segments(
                self.series.schema, segments, limit=lmt, offset=self.offset,
                select=select_with_idx,
            )
            yield frm.select(self.select)

            # Update limit & offset
            self.offset = max(self.offset - read_len, 0)
            if self.offset == 0:
                if self.limit is not None:
                    self.limit = max(self.limit - len(frm), 0)

            # Update start & closed
            new_start = None
            if not frm.empty:
                new_start = frm.stop()
            elif segments:
                new_start = segments[-1].stop

            if new_start is None or new_start == self.start:
                # Start did not move, we must stop
                break
            self.start = new_start
            self.closed = self.closed.set_left(False)


class KVSeries(Series):
    def write(self, frame, start=None, stop=None, closed="b", root=False):
        if root or not (start is None is stop):
            return super().write(frame, start=start, stop=stop, root=root)

        if not isinstance(frame, Frame):
            frame = Frame(self.schema, frame).sorted()

        segments = self.segments(frame.start(), frame.stop(), closed="BOTH")
        db_frm = Frame.from_segments(
            self.schema, segments
        )  # Maybe paginate on large results

        if db_frm.empty:
            return super().write(frame, closed=closed)

        if db_frm == frame:
            # Nothing to do
            return

        # Concat both frames and reduce the result
        new_frm = Frame.concat(frame, db_frm).sorted()
        reduce_kw = {c: c for c in self.schema.idx}
        non_idx = [c for c in self.schema if c not in self.schema.idx]
        reduce_kw.update({c: f"(first self.{c})" for c in non_idx})
        new_frm = new_frm.reduce(**reduce_kw)
        return super().write(new_frm)  # XXX pass closed ?

    def delete(self, *keys):
        # XXX we have 4 delete method (on series, kvseries, collection
        # and repo), we should get rid of some

        # Create a frame with all the existing keys contained
        # between max and min of keys
        if not keys:
            return

        # XXX use changelog pack ?
        start, stop = min(keys), max(keys)
        frm = self.frame(start, stop, closed="BOTH")
        # Keep only keys not given as argument
        # FIXME use frame.mask to filter it
        items = [(k, s) for k, s in zip(frm["label"], frm["meta"]) if k not in keys]
        if len(items) == 0:
            new_frm = self.schema.cast()
        else:
            keep_keys, keep_meta = zip(*items)
            new_frm = {
                "label": keep_keys,
                "meta": keep_meta,
            }

        # Write result to db
        revs = self.write(new_frm, start=start, stop=stop)
        return revs

    # def delete(self, *keys):
    #     if not keys:
    #         return
    #     frm = self.frame()
    #     mask = '(logical_not (isin self.label {}))'.format(
    #         ' '.join(f'"{k}"' for k in keys)
    #     )
    #     new_frm = frm.mask(mask)
    #     self.write(new_frm, start=frm.start(), stop=frm.stop())

Classes

class KVSeries (label, collection)

Combine a pod and a changelog to provide a versioned and concurrent management of series.

Expand source code
class KVSeries(Series):
    def write(self, frame, start=None, stop=None, closed="b", root=False):
        if root or not (start is None is stop):
            return super().write(frame, start=start, stop=stop, root=root)

        if not isinstance(frame, Frame):
            frame = Frame(self.schema, frame).sorted()

        segments = self.segments(frame.start(), frame.stop(), closed="BOTH")
        db_frm = Frame.from_segments(
            self.schema, segments
        )  # Maybe paginate on large results

        if db_frm.empty:
            return super().write(frame, closed=closed)

        if db_frm == frame:
            # Nothing to do
            return

        # Concat both frames and reduce the result
        new_frm = Frame.concat(frame, db_frm).sorted()
        reduce_kw = {c: c for c in self.schema.idx}
        non_idx = [c for c in self.schema if c not in self.schema.idx]
        reduce_kw.update({c: f"(first self.{c})" for c in non_idx})
        new_frm = new_frm.reduce(**reduce_kw)
        return super().write(new_frm)  # XXX pass closed ?

    def delete(self, *keys):
        # XXX we have 4 delete method (on series, kvseries, collection
        # and repo), we should get rid of some

        # Create a frame with all the existing keys contained
        # between max and min of keys
        if not keys:
            return

        # XXX use changelog pack ?
        start, stop = min(keys), max(keys)
        frm = self.frame(start, stop, closed="BOTH")
        # Keep only keys not given as argument
        # FIXME use frame.mask to filter it
        items = [(k, s) for k, s in zip(frm["label"], frm["meta"]) if k not in keys]
        if len(items) == 0:
            new_frm = self.schema.cast()
        else:
            keep_keys, keep_meta = zip(*items)
            new_frm = {
                "label": keep_keys,
                "meta": keep_meta,
            }

        # Write result to db
        revs = self.write(new_frm, start=start, stop=stop)
        return revs

    # def delete(self, *keys):
    #     if not keys:
    #         return
    #     frm = self.frame()
    #     mask = '(logical_not (isin self.label {}))'.format(
    #         ' '.join(f'"{k}"' for k in keys)
    #     )
    #     new_frm = frm.mask(mask)
    #     self.write(new_frm, start=frm.start(), stop=frm.stop())

Ancestors

Methods

def delete(self, *keys)
Expand source code
def delete(self, *keys):
    # XXX we have 4 delete method (on series, kvseries, collection
    # and repo), we should get rid of some

    # Create a frame with all the existing keys contained
    # between max and min of keys
    if not keys:
        return

    # XXX use changelog pack ?
    start, stop = min(keys), max(keys)
    frm = self.frame(start, stop, closed="BOTH")
    # Keep only keys not given as argument
    # FIXME use frame.mask to filter it
    items = [(k, s) for k, s in zip(frm["label"], frm["meta"]) if k not in keys]
    if len(items) == 0:
        new_frm = self.schema.cast()
    else:
        keep_keys, keep_meta = zip(*items)
        new_frm = {
            "label": keep_keys,
            "meta": keep_meta,
        }

    # Write result to db
    revs = self.write(new_frm, start=start, stop=stop)
    return revs
def write(self, frame, start=None, stop=None, closed='b', root=False)
Expand source code
def write(self, frame, start=None, stop=None, closed="b", root=False):
    if root or not (start is None is stop):
        return super().write(frame, start=start, stop=stop, root=root)

    if not isinstance(frame, Frame):
        frame = Frame(self.schema, frame).sorted()

    segments = self.segments(frame.start(), frame.stop(), closed="BOTH")
    db_frm = Frame.from_segments(
        self.schema, segments
    )  # Maybe paginate on large results

    if db_frm.empty:
        return super().write(frame, closed=closed)

    if db_frm == frame:
        # Nothing to do
        return

    # Concat both frames and reduce the result
    new_frm = Frame.concat(frame, db_frm).sorted()
    reduce_kw = {c: c for c in self.schema.idx}
    non_idx = [c for c in self.schema if c not in self.schema.idx]
    reduce_kw.update({c: f"(first self.{c})" for c in non_idx})
    new_frm = new_frm.reduce(**reduce_kw)
    return super().write(new_frm)  # XXX pass closed ?

Inherited members

class Series (label, collection)

Combine a pod and a changelog to provide a versioned and concurrent management of series.

Expand source code
class Series:
    """
    Combine a pod and a changelog to provide a versioned and
    concurrent management of series.
    """

    def __init__(self, label, collection):
        self.collection = collection
        self.schema = collection.schema
        self.pod = collection.pod
        self.changelog = collection.changelog
        self.label = label

    def segments(
        self,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        from_ci=None,
    ):
        """
        Find matching segments
        """

        if not from_ci:
            # Find leaf commit
            leaf_rev = self.changelog.leaf(before=before)
            if not leaf_rev:
                return []
            from_ci = leaf_rev.commit(self.collection)
        return from_ci.segments(self.label, self.pod, start, stop, closed=closed)

    def period(self, rev):
        """
        Return average period (time delta between two tic) of a given revision
        """
        start = self.schema.deserialize(rev["start"])[0]
        stop = self.schema.deserialize(rev["stop"])[0]
        span = stop - start
        # span is a timedelta64
        span = span.item().total_seconds()
        return span / rev["len"]

    def interval(self, size=500_000):
        """
        Find smallest natural partition that will fit `size` items
        """
        schema = self.schema
        head_col = next(iter(schema.idx))
        assert issubdtype(schema[head_col].codec.dt, "datetime64")

        revisions = self.changelog.log()
        if not revisions:
            return None
        min_period = min(self.period(rev) for rev in revisions)
        target = min_period * size
        return Interval.bisect(target)

    def write(self, frame, start=None, stop=None, closed="b", root=False):
        # Each commit is like a frame. A row in this frame represent a
        # write (aka a segment) and contains one digest per series
        # column + 2*N extra columns that encode start-stop values (N
        # being the number of index columns of the series) + a column
        # containing the series name (like that we can write all the
        # series in one commit)

        frame = Frame(self.schema, frame)

        # Make sure frame is sorted
        # XXX forbid repeated values in index ??
        assert frame.is_sorted(), "Frame is not sorted!"

        # Save segments
        all_dig = []
        arr_length = None
        embedded = {}
        with Pool() as pool:
            for name in self.schema:
                # Cast array & check len
                values = frame[name]
                if arr_length is None:
                    arr_length = len(values)
                elif len(values) != arr_length:
                    raise ValueError("Length mismatch")
                digest, embed_data = self._write_col(name, values, pool)
                all_dig.append(digest)
                if embed_data is not None:
                    embedded[digest] = embed_data

        # Build commit info
        start = frame.start() if start is None else start
        stop = frame.stop() if stop is None else stop
        if not isinstance(start, tuple):
            start = (start,)
        if not isinstance(stop, tuple):
            stop = (stop,)

        # Create new digest
        batch = self.collection.batch
        if batch:
            ci_info = (self.label, start, stop, all_dig, len(frame), closed, embedded)
            if isinstance(batch, Batch):
                batch.append(*ci_info)
            else:
                return ci_info
            return
        return self.commit(
            start,
            stop,
            all_dig,
            len(frame),
            root=root,
            closed=closed,
            embedded=embedded,
        )

    def _write_col(self, name, values, pool):
        # Encode content
        arr = self.schema[name].cast(values)
        # Create digest (based on actual array for simple
        # type, based on encoded content for O and U)
        codec = self.schema[name].codec
        data, digest = codec.encode(arr, with_digest=True)

        embedded_data = None
        if len(data) < settings.embed_max_size:  # every small array gets embedded
            # Put small arrays aside
            embedded_data = data
        else:
            folder, filename = hashed_path(digest)
            # XXX move writing in Series.commit and handle situation where the commit gets to large?
            # XXX keep it here for when a batch gets too large ?
            pool.submit(self.pod.cd(folder).write, filename, data)
        return digest, embedded_data

    def update(self, frame):
        frame = Frame(self.schema, frame)
        start, stop = frame.start(), frame.stop()
        idx = tuple(self.schema.idx)
        upd_cols = tuple(c for c in frame if c not in idx)
        read_cols = tuple(c for c in self.schema.columns if c not in idx + upd_cols)
        db_frm = self.frame(start=start, stop=stop, closed="b", select=idx + read_cols)
        db_start, db_stop = db_frm.start(), db_frm.stop()
        overlap_frm = frame.islice(db_start, db_stop, "b")
        head_frm = frame.islice(None, db_start, "l")
        tail_frm = frame.islice(db_stop, None, "r")

        # Make sure index matches on overlapping part
        for col in idx:
            if (
                len(db_frm) != len(overlap_frm)
                or (db_frm[col] != overlap_frm[col]).any()
            ):
                raise ValueError("Update frame is not aligned with existing index")

        # Update columns
        for col in upd_cols:
            db_frm[col] = overlap_frm[col]

        # Add columns filled with zero-like values in non-overlapping
        # frames
        for frm in (head_frm, tail_frm):
            for col in read_cols:
                frm[col] = repeat(self.schema[col].zero(), len(frm))

        full_frm = Frame.concat(head_frm, db_frm, tail_frm)
        return self.write(full_frm, start, stop, closed="b")

    def commit(
        self, start, stop, all_dig, length, root=False, closed="b", embedded=None
    ):
        # root force commit on phi
        leaf_rev = None if root else self.changelog.leaf()

        # Combine with last commit
        if leaf_rev:
            leaf_ci = leaf_rev.commit(self.collection)
            new_ci = leaf_ci.update(
                self.label,
                start,
                stop,
                all_dig,
                length,
                closed=closed,
                embedded=embedded,
            )
            # TODO early return if new_ci == leaf_ci
        else:
            new_ci = Commit.one(
                self.schema,
                self.label,
                start,
                stop,
                all_dig,
                length,
                closed=closed,
                embedded=embedded,
            )

        payload = new_ci.encode()
        parent = leaf_rev.child if leaf_rev else phi
        return self.changelog.commit(payload, parents=[parent])

    def delete(self, start, stop, closed="b", root=False):
        frm = {k: [] for k in self.schema}
        return self.write(frame=frm, start=start, stop=stop, closed=closed, root=root)

    def __len__(self):
        return sum(len(s) for s in self.segments())

    def __bool__(self):
        return self.label in self.collection.ls()

    def paginate(
        self,
        step=settings.page_len,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):

        return Paginate(
            self,
            step=step,
            start=start,
            stop=stop,
            before=before,
            closed=closed,
            limit=limit,
            offset=offset,
            select=select,
        ).iter()

    def tail(
        self,
        length,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):
        '''
        Return the last `length` values of the series. Optionaly
        pre-filtered between `start` and `stop`.
        '''
        if length <= 0:
            raise ValueError("length argument must be > 0")
        segments = self.segments(
            start=self.schema.deserialize(start),
            stop=self.schema.deserialize(stop),
            before=before,
            closed=closed,
        )

        cnt = 0
        res = []
        # Create one frame per segment, starting from the last one.
        for segment in reversed(list(segments)):
            frm = Frame.from_segments(
                self.schema, [segment], select=select
            )
            if cnt + len(frm) >= length:
                # Last frame: keep the correct amount of lines
                cut = length - cnt
                res.append(frm.slice(start=-cut))
                break
            # We consume the full frame, append it and increase counter
            res.append(frm)
            cnt += len(frm)

        if not res:
            return Frame(self.schema)

        # Re-order frames and concat
        frm = Frame.concat(*reversed(res))

        if (limit, offset) != (None, None):
            start = offset or 0
            stop = start + (limit or 0)
            frm = frm.slice(start, stop)

        return frm

    def frame(
        self,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):
        segments = self.segments(
            start=self.schema.deserialize(start),
            stop=self.schema.deserialize(stop),
            before=before,
            closed=closed,
        )


        return Frame.from_segments(
            self.schema,
            segments,
            limit=limit,
            offset=offset,
            select=select,
        )

    def df(
        self,
        start=None,
        stop=None,
        before=None,
        closed="LEFT",
        limit=None,
        offset=None,
        select=None,
    ):
        return self.frame(
            start=start,
            stop=stop,
            before=before,
            closed=closed,
            limit=limit,
            offset=offset,
            select=select,
        ).df()

Subclasses

Methods

def commit(self, start, stop, all_dig, length, root=False, closed='b', embedded=None)
Expand source code
def commit(
    self, start, stop, all_dig, length, root=False, closed="b", embedded=None
):
    # root force commit on phi
    leaf_rev = None if root else self.changelog.leaf()

    # Combine with last commit
    if leaf_rev:
        leaf_ci = leaf_rev.commit(self.collection)
        new_ci = leaf_ci.update(
            self.label,
            start,
            stop,
            all_dig,
            length,
            closed=closed,
            embedded=embedded,
        )
        # TODO early return if new_ci == leaf_ci
    else:
        new_ci = Commit.one(
            self.schema,
            self.label,
            start,
            stop,
            all_dig,
            length,
            closed=closed,
            embedded=embedded,
        )

    payload = new_ci.encode()
    parent = leaf_rev.child if leaf_rev else phi
    return self.changelog.commit(payload, parents=[parent])
def delete(self, start, stop, closed='b', root=False)
Expand source code
def delete(self, start, stop, closed="b", root=False):
    frm = {k: [] for k in self.schema}
    return self.write(frame=frm, start=start, stop=stop, closed=closed, root=root)
def df(self, start=None, stop=None, before=None, closed='LEFT', limit=None, offset=None, select=None)
Expand source code
def df(
    self,
    start=None,
    stop=None,
    before=None,
    closed="LEFT",
    limit=None,
    offset=None,
    select=None,
):
    return self.frame(
        start=start,
        stop=stop,
        before=before,
        closed=closed,
        limit=limit,
        offset=offset,
        select=select,
    ).df()
def frame(self, start=None, stop=None, before=None, closed='LEFT', limit=None, offset=None, select=None)
Expand source code
def frame(
    self,
    start=None,
    stop=None,
    before=None,
    closed="LEFT",
    limit=None,
    offset=None,
    select=None,
):
    segments = self.segments(
        start=self.schema.deserialize(start),
        stop=self.schema.deserialize(stop),
        before=before,
        closed=closed,
    )


    return Frame.from_segments(
        self.schema,
        segments,
        limit=limit,
        offset=offset,
        select=select,
    )
def interval(self, size=500000)

Find smallest natural partition that will fit size items

Expand source code
def interval(self, size=500_000):
    """
    Find smallest natural partition that will fit `size` items
    """
    schema = self.schema
    head_col = next(iter(schema.idx))
    assert issubdtype(schema[head_col].codec.dt, "datetime64")

    revisions = self.changelog.log()
    if not revisions:
        return None
    min_period = min(self.period(rev) for rev in revisions)
    target = min_period * size
    return Interval.bisect(target)
def paginate(self, step=500000, start=None, stop=None, before=None, closed='LEFT', limit=None, offset=None, select=None)
Expand source code
def paginate(
    self,
    step=settings.page_len,
    start=None,
    stop=None,
    before=None,
    closed="LEFT",
    limit=None,
    offset=None,
    select=None,
):

    return Paginate(
        self,
        step=step,
        start=start,
        stop=stop,
        before=before,
        closed=closed,
        limit=limit,
        offset=offset,
        select=select,
    ).iter()
def period(self, rev)

Return average period (time delta between two tic) of a given revision

Expand source code
def period(self, rev):
    """
    Return average period (time delta between two tic) of a given revision
    """
    start = self.schema.deserialize(rev["start"])[0]
    stop = self.schema.deserialize(rev["stop"])[0]
    span = stop - start
    # span is a timedelta64
    span = span.item().total_seconds()
    return span / rev["len"]
def segments(self, start=None, stop=None, before=None, closed='LEFT', from_ci=None)

Find matching segments

Expand source code
def segments(
    self,
    start=None,
    stop=None,
    before=None,
    closed="LEFT",
    from_ci=None,
):
    """
    Find matching segments
    """

    if not from_ci:
        # Find leaf commit
        leaf_rev = self.changelog.leaf(before=before)
        if not leaf_rev:
            return []
        from_ci = leaf_rev.commit(self.collection)
    return from_ci.segments(self.label, self.pod, start, stop, closed=closed)
def tail(self, length, start=None, stop=None, before=None, closed='LEFT', limit=None, offset=None, select=None)

Return the last length values of the series. Optionaly pre-filtered between start and stop.

Expand source code
def tail(
    self,
    length,
    start=None,
    stop=None,
    before=None,
    closed="LEFT",
    limit=None,
    offset=None,
    select=None,
):
    '''
    Return the last `length` values of the series. Optionaly
    pre-filtered between `start` and `stop`.
    '''
    if length <= 0:
        raise ValueError("length argument must be > 0")
    segments = self.segments(
        start=self.schema.deserialize(start),
        stop=self.schema.deserialize(stop),
        before=before,
        closed=closed,
    )

    cnt = 0
    res = []
    # Create one frame per segment, starting from the last one.
    for segment in reversed(list(segments)):
        frm = Frame.from_segments(
            self.schema, [segment], select=select
        )
        if cnt + len(frm) >= length:
            # Last frame: keep the correct amount of lines
            cut = length - cnt
            res.append(frm.slice(start=-cut))
            break
        # We consume the full frame, append it and increase counter
        res.append(frm)
        cnt += len(frm)

    if not res:
        return Frame(self.schema)

    # Re-order frames and concat
    frm = Frame.concat(*reversed(res))

    if (limit, offset) != (None, None):
        start = offset or 0
        stop = start + (limit or 0)
        frm = frm.slice(start, stop)

    return frm
def update(self, frame)
Expand source code
def update(self, frame):
    frame = Frame(self.schema, frame)
    start, stop = frame.start(), frame.stop()
    idx = tuple(self.schema.idx)
    upd_cols = tuple(c for c in frame if c not in idx)
    read_cols = tuple(c for c in self.schema.columns if c not in idx + upd_cols)
    db_frm = self.frame(start=start, stop=stop, closed="b", select=idx + read_cols)
    db_start, db_stop = db_frm.start(), db_frm.stop()
    overlap_frm = frame.islice(db_start, db_stop, "b")
    head_frm = frame.islice(None, db_start, "l")
    tail_frm = frame.islice(db_stop, None, "r")

    # Make sure index matches on overlapping part
    for col in idx:
        if (
            len(db_frm) != len(overlap_frm)
            or (db_frm[col] != overlap_frm[col]).any()
        ):
            raise ValueError("Update frame is not aligned with existing index")

    # Update columns
    for col in upd_cols:
        db_frm[col] = overlap_frm[col]

    # Add columns filled with zero-like values in non-overlapping
    # frames
    for frm in (head_frm, tail_frm):
        for col in read_cols:
            frm[col] = repeat(self.schema[col].zero(), len(frm))

    full_frm = Frame.concat(head_frm, db_frm, tail_frm)
    return self.write(full_frm, start, stop, closed="b")
def write(self, frame, start=None, stop=None, closed='b', root=False)
Expand source code
def write(self, frame, start=None, stop=None, closed="b", root=False):
    # Each commit is like a frame. A row in this frame represent a
    # write (aka a segment) and contains one digest per series
    # column + 2*N extra columns that encode start-stop values (N
    # being the number of index columns of the series) + a column
    # containing the series name (like that we can write all the
    # series in one commit)

    frame = Frame(self.schema, frame)

    # Make sure frame is sorted
    # XXX forbid repeated values in index ??
    assert frame.is_sorted(), "Frame is not sorted!"

    # Save segments
    all_dig = []
    arr_length = None
    embedded = {}
    with Pool() as pool:
        for name in self.schema:
            # Cast array & check len
            values = frame[name]
            if arr_length is None:
                arr_length = len(values)
            elif len(values) != arr_length:
                raise ValueError("Length mismatch")
            digest, embed_data = self._write_col(name, values, pool)
            all_dig.append(digest)
            if embed_data is not None:
                embedded[digest] = embed_data

    # Build commit info
    start = frame.start() if start is None else start
    stop = frame.stop() if stop is None else stop
    if not isinstance(start, tuple):
        start = (start,)
    if not isinstance(stop, tuple):
        stop = (stop,)

    # Create new digest
    batch = self.collection.batch
    if batch:
        ci_info = (self.label, start, stop, all_dig, len(frame), closed, embedded)
        if isinstance(batch, Batch):
            batch.append(*ci_info)
        else:
            return ci_info
        return
    return self.commit(
        start,
        stop,
        all_dig,
        len(frame),
        root=root,
        closed=closed,
        embedded=embedded,
    )