Module lakota.commit

The Commit class is responsible to structure the content of a commit file. A commit is like a sorted dataframe with the following columns: label, start, stop, digest, length, closed.

Each row represent a slice of a series: its label, the indexes at which it starts an stops and the digests of the different columns.

So for a given commit, if we want to know all the data related to a given series, we can simply filter on the label column. Furthermore if only a part of the series is needed (for usually between two dates), we can use start and stop to detect if a row is relevant.

Once one or more rows are identified, the digests allows to know which files contain the data we want. We can then read and uncompress those and instanciate a dataframe.

Let's use the command line interface to illustrate this:

$ lakota create temperature "timestamp timestamp*" "value float"
$ cat input.csv | lakota write temperature/Paris
$ cat input.csv | lakota write temperature/Brussels

We have create two series with the same content.

The rev subcomment with the --extend flag lists revisions and print their content (the commits):

$ lakota rev temperature -e

Revision: 00000000000-0000000000000000000000000000000000000000.17665b9f49e-8e692971744a1222b8a7c706b31e24c8d0a22653
Date: 2020-12-15 10:27:34.302000
label    start                stop                   length  digests
-------  -------------------  -------------------  --------  -----------------------------------------------------------------------------------
Paris    2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e


Revision: 17665b9f49e-8e692971744a1222b8a7c706b31e24c8d0a22653.17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b*
Date: 2020-12-15 10:27:38.717000
label     start                stop                   length  digests
--------  -------------------  -------------------  --------  -----------------------------------------------------------------------------------
Brussels  2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e
Paris     2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e

(revisions are documented in lakota.changelog)

So we can see the two commits made in the collection, they show us different states. The first one with only the series Paris and the second with both. We can use read with verbose flags to illustrate file access (see comments inlined):

$ lakota -vv read temperature/Brussels
 # The first 4 storage access (1 LIST and 3 READ) are used to identify
 #  where our collection is.
LIST .lakota/00/00/000000000000000000000000000000000000 .
READ .lakota/00/00/000000000000000000000000000000000000 00000000000-0000000000000000000000000000000000000000.17665b9e79b-acf6e197ece6782a6da6e8a50bfd7d9aa3543e90
READ .lakota/6d/1d 7aa3d69158cdfdee236f3b18791204e6e308
READ .lakota/2e/0f f7b7988ee10ef47a8973afa3f9da60b6c892
 # This directory contains our collection, listing it gives us the revisions
LIST .lakota/70/33/e90fcdda169d2f5d08da17507b0c5db52029 .
 # The latest commit is read:
READ .lakota/70/33/e90fcdda169d2f5d08da17507b0c5db52029 17665b9f49e-8e692971744a1222b8a7c706b31e24c8d0a22653.17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b
 # Based on the commit info, we know which file to read for each column
READ .lakota/8d/c2 5a6911f09119919c2b3d177cd4430f43c73a # -> payload of timestamp column
READ .lakota/de/46 c7d97cc7dde24962e31ee5fdee1acadd114e # -> payload of value column
 # The array are then combined in a dataframe and dumped as csv
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.0
2020-06-24T00:00:00,27.0
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.0
2020-06-27T00:00:00,30.0

Each time a write is done on a collection, a new revision file is created, its content is a new commit. This commit is based on the previous one, usually with an extra line or an update line (or both). For example, if we update the Paris series, and print the commits:


$ cat input-corrected.csv
2020-06-23,24.2
2020-06-24,27.9
2020-06-25,31.0
2020-06-26,32.5
2020-06-27,30.1
2020-06-28,29.2
$ cat input-corrected.csv | lakota write temperature/Paris
$ lakota rev temperature -e
[...snipped...]
Revision: 17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b.17665cf207b-b5ef411d8f000b5eafcdd5c58cc4c26a82ac757e*
Date: 2020-12-15 10:50:41.787000
label     start                stop                   length  digests
--------  -------------------  -------------------  --------  -----------------------------------------------------------------------------------
Brussels  2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e
Paris     2020-06-22T00:00:00  2020-06-23T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e
Paris     2020-06-23T00:00:00  2020-06-28T00:00:00         6  1602a81fb4eafda5226880c7ef9145a8dada8cf0 / fda980aa244f5bef17b9feb5faa3e6532d0f815b

We see two lines for the Paris series. The first one has been updated (stop is now 2020-06-23) and the second one has been appended. The next time this series is read both line will be used except if we filter it:

$ lakota -vv read temperature/Paris --greater-than 2020-06-23
[...snipped...]
READ .lakota/70/33/e90fcdda169d2f5d08da17507b0c5db52029 17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b.17665cf207b-b5ef411d8f000b5eafcdd5c58cc4c26a82ac757e
READ .lakota/16/02 a81fb4eafda5226880c7ef9145a8dada8cf0
READ .lakota/fd/a9 80aa244f5bef17b9feb5faa3e6532d0f815b
timestamp,value
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
Expand source code
"""
The `Commit` class is responsible to structure the content of a
commit file. A commit is like a sorted dataframe with the following
columns: `label`, `start`, `stop`, `digest`, `length`, `closed`.

Each row represent a slice of a series: its label, the indexes at
which it starts an stops and the digests of the different columns.

So for a given commit, if we want to know all the data related to a
given series, we can simply filter on the `label` column. Furthermore
if only a part of the series is needed (for usually between two
dates), we can use `start` and `stop` to detect if a row is relevant.

Once one or more rows are identified, the digests allows to know which
files contain the data we want. We can then read and uncompress those
and instanciate a dataframe.

Let's use the command line interface to illustrate this:

```shell
$ lakota create temperature "timestamp timestamp*" "value float"
$ cat input.csv | lakota write temperature/Paris
$ cat input.csv | lakota write temperature/Brussels
```
We have create two series with the same content.


The `rev` subcomment with the `--extend` flag lists revisions and
print their content (the commits):

```shell
$ lakota rev temperature -e

Revision: 00000000000-0000000000000000000000000000000000000000.17665b9f49e-8e692971744a1222b8a7c706b31e24c8d0a22653
Date: 2020-12-15 10:27:34.302000
label    start                stop                   length  digests
-------  -------------------  -------------------  --------  -----------------------------------------------------------------------------------
Paris    2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e


Revision: 17665b9f49e-8e692971744a1222b8a7c706b31e24c8d0a22653.17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b*
Date: 2020-12-15 10:27:38.717000
label     start                stop                   length  digests
--------  -------------------  -------------------  --------  -----------------------------------------------------------------------------------
Brussels  2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e
Paris     2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e
```

(revisions are documented in `lakota.changelog`)

So we can see the two commits made in the collection, they show us
different states. The first one with only the series `Paris` and the
second with both. We can use `read` with verbose flags to illustrate
file access (see comments inlined):

```shell
$ lakota -vv read temperature/Brussels
 # The first 4 storage access (1 LIST and 3 READ) are used to identify
 #  where our collection is.
LIST .lakota/00/00/000000000000000000000000000000000000 .
READ .lakota/00/00/000000000000000000000000000000000000 00000000000-0000000000000000000000000000000000000000.17665b9e79b-acf6e197ece6782a6da6e8a50bfd7d9aa3543e90
READ .lakota/6d/1d 7aa3d69158cdfdee236f3b18791204e6e308
READ .lakota/2e/0f f7b7988ee10ef47a8973afa3f9da60b6c892
 # This directory contains our collection, listing it gives us the revisions
LIST .lakota/70/33/e90fcdda169d2f5d08da17507b0c5db52029 .
 # The latest commit is read:
READ .lakota/70/33/e90fcdda169d2f5d08da17507b0c5db52029 17665b9f49e-8e692971744a1222b8a7c706b31e24c8d0a22653.17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b
 # Based on the commit info, we know which file to read for each column
READ .lakota/8d/c2 5a6911f09119919c2b3d177cd4430f43c73a # -> payload of timestamp column
READ .lakota/de/46 c7d97cc7dde24962e31ee5fdee1acadd114e # -> payload of value column
 # The array are then combined in a dataframe and dumped as csv
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.0
2020-06-24T00:00:00,27.0
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.0
2020-06-27T00:00:00,30.0
```

Each time a write is done on a collection, a new revision file is
created, its content is a new commit. This commit is based on the
previous one, usually with an extra line or an update line (or
both). For example, if we update the `Paris` series, and print the
commits:

```shell

$ cat input-corrected.csv
2020-06-23,24.2
2020-06-24,27.9
2020-06-25,31.0
2020-06-26,32.5
2020-06-27,30.1
2020-06-28,29.2
$ cat input-corrected.csv | lakota write temperature/Paris
$ lakota rev temperature -e
[...snipped...]
Revision: 17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b.17665cf207b-b5ef411d8f000b5eafcdd5c58cc4c26a82ac757e*
Date: 2020-12-15 10:50:41.787000
label     start                stop                   length  digests
--------  -------------------  -------------------  --------  -----------------------------------------------------------------------------------
Brussels  2020-06-22T00:00:00  2020-06-27T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e
Paris     2020-06-22T00:00:00  2020-06-23T00:00:00         6  8dc25a6911f09119919c2b3d177cd4430f43c73a / de46c7d97cc7dde24962e31ee5fdee1acadd114e
Paris     2020-06-23T00:00:00  2020-06-28T00:00:00         6  1602a81fb4eafda5226880c7ef9145a8dada8cf0 / fda980aa244f5bef17b9feb5faa3e6532d0f815b
```

We see two lines for the `Paris` series. The first one has been
updated (stop is now `2020-06-23`) and the second one has been
appended. The next time this series is read both line will be
used except if we filter it:

```shell
$ lakota -vv read temperature/Paris --greater-than 2020-06-23
[...snipped...]
READ .lakota/70/33/e90fcdda169d2f5d08da17507b0c5db52029 17665ba05dd-1b188c517bb45f98669620e4822a67a81dd15b3b.17665cf207b-b5ef411d8f000b5eafcdd5c58cc4c26a82ac757e
READ .lakota/16/02 a81fb4eafda5226880c7ef9145a8dada8cf0
READ .lakota/fd/a9 80aa244f5bef17b9feb5faa3e6532d0f815b
timestamp,value
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
```
"""

from itertools import chain
from threading import Lock

from numcodecs import registry
from numpy import asarray, concatenate, isin, repeat, where

from .frame import Frame
from .schema import Codec, Schema
from .utils import Closed, Pool, hashed_path

__all__ = ["Commit", "Segment"]


class Commit:

    digest_codec = Codec("U")  # FIXME use better encoding
    len_codec = Codec("int")
    label_codec = Codec("str")
    closed_codec = Codec("str")  # Could be i1

    def __init__(self, schema, label, start, stop, digest, length, closed, embedded):
        assert list(digest) == list(schema)
        self.schema = schema
        self.label = label  # Array of str
        self.start = start  # Dict of Arrays
        self.stop = stop  # Dict of arrays
        self.digest = digest  # Dict of arrays
        self.length = length  # Array of int
        self.closed = closed  # Array of ("l", "r", "b", "n")
        self.embedded = embedded or {}

    @classmethod
    def one(
        cls,
        schema,
        label,
        start,
        stop,
        digest,
        length,
        closed=Closed.BOTH,
        embedded=None,
    ):
        closed = Closed.cast(closed)
        label = asarray([label])
        start = dict(zip(schema.idx, (asarray([s]) for s in start)))
        stop = dict(zip(schema.idx, (asarray([s]) for s in stop)))
        digest = dict(zip(schema, (asarray([d], dtype="U") for d in digest)))
        length = [length]
        closed = [closed.short]
        return Commit(schema, label, start, stop, digest, length, closed, embedded)

    @classmethod
    def decode(cls, schema, payload):
        msgpck = registry.codec_registry["msgpack2"]()
        data = msgpck.decode(payload)[0]
        values = {}
        # Decode starts, stops and digests
        for key in ("start", "stop", "digest"):
            key_vals = {}
            columns = schema if key == "digest" else schema.idx
            for name in columns:
                codec = cls.digest_codec if key == "digest" else schema[name].codec
                key_vals[name] = codec.decode(data[key][name])
            values[key] = key_vals

        # Decode len and labels
        values["length"] = cls.len_codec.decode(data["length"])
        values["label"] = cls.label_codec.decode(data["label"])
        values["closed"] = cls.closed_codec.decode(data["closed"])

        # Embedded data will be decoded on demand
        values["embedded"] = data.get("embedded")
        return Commit(schema, **values)

    def encode(self):
        msgpck = registry.codec_registry["msgpack2"]()
        data = {}
        # Encode starts, stops and digests
        for key in ("start", "stop", "digest"):
            columns = self.schema if key == "digest" else self.schema.idx
            key_vals = {}
            for pos, name in enumerate(columns):
                codec = (
                    self.digest_codec if key == "digest" else self.schema[name].codec
                )
                arr = getattr(self, key)[name]
                key_vals[name] = codec.encode(arr)
            data[key] = key_vals

        # Encode digests
        for name in self.schema:
            data["digest"][name] = self.digest_codec.encode(self.digest[name])

        # Encode length, closed and labels
        data["length"] = self.len_codec.encode(self.length)
        data["closed"] = self.closed_codec.encode(self.closed)
        data["label"] = self.label_codec.encode(self.label)
        # Keep only embedded data referenced by self.digest
        keep_digests = (
            set(chain.from_iterable(self.digest.values())) & self.embedded.keys()
        )
        embedded = {d: self.embedded[d] for d in sorted(keep_digests)}
        data["embedded"] = embedded
        return msgpck.encode([data])

    def split(self, label, start, stop):
        start_values = {"_label": self.label}
        start_values.update(self.start)
        stop_values = {"_label": self.label}
        stop_values.update(self.stop)
        frm_start = Frame(Schema.from_frame(start_values), start_values)
        frm_stop = Frame(Schema.from_frame(stop_values), stop_values)
        start_pos = frm_stop.index((label,) + start, right=True)
        stop_pos = frm_start.index((label,) + stop, right=False)
        return start_pos, stop_pos

    def __len__(self):
        return len(self.label)

    def at(self, pos):
        if pos < 0:
            pos = len(self) + pos
        res = {}
        for key in ("start", "stop", "digest"):
            columns = self.schema if key == "digest" else self.schema.idx
            values = getattr(self, key)
            res[key] = tuple(values[n][pos] for n in columns)

        for key in ("label", "length", "closed"):
            res[key] = getattr(self, key)[pos]
        res["embedded"] = self.embedded
        return res

    def update(self, label, start, stop, digest, length, closed="b", embedded=None):
        closed = Closed.cast(closed)
        if not start <= stop:
            raise ValueError(f"Invalid range {start} -> {stop}")
        inner = Commit.one(
            self.schema, label, start, stop, digest, length, closed, embedded
        )
        if len(self) == 0:
            return inner

        if embedded:
            self.embedded.update(embedded)

        first = (self.at(0)["label"], self.at(0)["start"])
        last = (self.at(-1)["label"], self.at(-1)["stop"])
        if (label, start) < first and (label, stop) > last:
            return inner

        start_pos, stop_pos = self.split(label, start, stop)
        # Truncate start_pos row
        head = None
        # start_pos is the result of a bisect_right, so we have to
        # check the slot on the left that may be perfect match
        start_row = None
        if start_pos > 0:
            prev_row = self.at(start_pos - 1)
            if (prev_row["label"], prev_row["stop"]) == (label, start):
                start_pos -= 1
                start_row = prev_row
        if start_row is None:
            start_row = self.at(min(start_pos, len(self) - 1))

        if (
            label == start_row["label"]
            and start_row["start"] <= start <= start_row["stop"]
        ):
            # We hit the right of an existing row
            start_row["stop"] = start
            # If closed is open/close on left, start_row become the
            # opposite on right:
            start_row["closed"] = Closed[start_row["closed"]].set_right(not closed.left)

            if (
                start_row["start"] == start_row["stop"]
                and start_row["closed"] != Closed.BOTH
            ):
                # Ignore star_row
                head = self.head(start_pos)
            else:
                head = Commit.concat(
                    self.head(start_pos),
                    Commit.one(schema=self.schema, **start_row),
                )
            # when start_row["start"] == start_row["stop"],
            # start_row stop and start are both "overshadowed" by
            # new commit

        if head is None:
            head = self.head(start_pos)

        # Truncate stop_pos row
        tail = None
        # stop_pos is the result of a bisect_left, so we have to
        # check the slot on the right that may be perfect match
        stop_row = None
        if stop_pos < len(self):
            next_row = self.at(stop_pos)
            if (next_row["label"], next_row["start"]) == (label, stop):
                stop_row = next_row
                stop_pos += 1
        if stop_row is None:
            stop_row = self.at(max(0, stop_pos - 1))

        if label == stop_row["label"] and stop_row["start"] <= stop <= stop_row["stop"]:
            # We hit the left of an existing row
            stop_row["start"] = stop
            # If closed is open/close on right, stop_row become the
            # opposite on left:
            stop_row["closed"] = Closed[stop_row["closed"]].set_left(not closed.right)

            if (
                stop_row["start"] == stop_row["stop"]
                and start_row["closed"] != Closed.BOTH
            ):
                # Ignore stop_row
                tail = self.tail(stop_pos)
            else:
                tail = Commit.concat(
                    Commit.one(schema=self.schema, **stop_row),
                    self.tail(stop_pos),
                )
            # when stop_row["start"] == stop_row["stop"],
            # stop_row stop and start are both "overshadowed" by
            # new commit
        if tail is None:
            tail = self.tail(stop_pos)
        return Commit.concat(head, inner, tail)

    def slice(self, *pos):
        slc = slice(*pos)
        schema = self.schema
        start = {name: self.start[name][slc] for name in schema.idx}
        stop = {name: self.stop[name][slc] for name in schema.idx}
        digest = {name: self.digest[name][slc] for name in schema}
        label = self.label[slc]
        length = self.length[slc]
        closed = self.closed[slc]
        return Commit(schema, label, start, stop, digest, length, closed, self.embedded)

    def head(self, pos):
        return self.slice(None, pos)

    def tail(self, pos):
        return self.slice(pos, None)

    @classmethod
    def concat(cls, commit, *other_commits):
        schema = commit.schema
        all_ci = (commit,) + other_commits
        all_ci = tuple(ci for ci in all_ci if len(ci) > 0)

        # Make sure there are no overlaps
        for prv, nxt in zip(all_ci[:-1], all_ci[1:]):
            prv_tail = prv.at(-1)
            nxt_head = nxt.at(0)
            assert (prv_tail["label"], prv_tail["stop"]) <= (
                nxt_head["label"],
                nxt_head["start"],
            )

        start = {
            name: concatenate([ci.start[name] for ci in all_ci]) for name in schema.idx
        }
        stop = {
            name: concatenate([ci.stop[name] for ci in all_ci]) for name in schema.idx
        }
        digest = {
            name: concatenate([ci.digest[name] for ci in all_ci]) for name in schema
        }
        label = concatenate([ci.label for ci in all_ci])
        length = concatenate([ci.length for ci in all_ci])
        closed = concatenate([ci.closed for ci in all_ci])
        embedded = {}
        for ci in all_ci:
            embedded.update(ci.embedded)
        return Commit(schema, label, start, stop, digest, length, closed, embedded)

    def __repr__(self):
        fmt = lambda a: "/".join(map(str, a))
        starts = list(map(fmt, zip(*self.start.values())))
        stops = list(map(fmt, zip(*self.stop.values())))
        items = "\n        ".join(
            f"{l}[{a} -> {b} ({c})]"
            for l, a, b, c in zip(self.label, starts, stops, self.closed)
        )
        return f"<Commit {items}>"

    def match(self, label):
        (matches,) = where(self.label == label)
        for pos in matches:
            yield self.at(pos)

    def segments(self, label, pod, start=None, stop=None, closed=Closed.BOTH):
        closed = Closed.cast(closed)

        # If start (or stop) is not set, and the left (right) side is
        # not closed, we ignore the very first (last) item, which
        # makes no sense
        if start is None:
            closed = closed.set_left(True)
        if stop is None:
            closed = closed.set_right(True)

        # XXX allow to pass condition instead of simple start-stop ?
        for row in self.match(label):
            arr_start = row["start"]
            arr_stop = row["stop"]
            arr_closed = Closed[row["closed"]]
            if start:
                if start > arr_stop:
                    # start is on the right of the array
                    continue
                elif not arr_closed.right and start == arr_stop:
                    # Same
                    continue
                elif start > arr_start:
                    # `closed` "win" over arr_closed on the left
                    arr_closed = arr_closed.set_left(closed.left)
                    arr_start = start
                elif start == arr_start and arr_closed.left:
                    # `closed` "win" only if array left is not already
                    # open
                    arr_closed = arr_closed.set_left(closed.left)

            if stop:
                if stop < arr_start:
                    continue
                elif not arr_closed.left and stop == arr_start:
                    continue
                elif stop < arr_stop:
                    # closed "win" over arr_closed on the right
                    arr_closed = arr_closed.set_right(closed.right)
                    arr_stop = stop
                elif stop == arr_stop and arr_closed.right:
                    arr_closed = arr_closed.set_right(closed.right)
            yield Segment(
                self,
                pod,
                row["digest"],
                start=arr_start,
                stop=arr_stop,
                closed=arr_closed,
            )

    def delete_labels(self, rm_labels):
        keep = ~isin(self.label, rm_labels)
        return self.mask(keep)

    def mask(self, keep):
        return Commit(
            schema=self.schema,
            label=self.label[keep],
            start={k: v[keep] for k, v in self.start.items()},
            stop={k: v[keep] for k, v in self.stop.items()},
            digest={k: v[keep] for k, v in self.digest.items()},
            length=self.length[keep],
            closed=self.closed[keep],
            embedded=self.embedded,
        )

    def rename_label(self, from_label, to_label):
        # First we create an extract with `from_label` only
        extract = self.mask(self.label == from_label)
        base_ci = self.delete_labels(from_label)

        # replace label in extract
        extract.label = repeat(to_label, len(extract))

        # Re-inject it
        for pos in range(len(extract)):
            row = extract.at(pos)
            base_ci = base_ci.update(**row)

        return base_ci

    def __contains__(self, row):
        start_pos, _ = self.split(row["label"], row["start"], row["stop"])
        if start_pos >= len(self):
            return False
        match_row = self.at(start_pos)
        for attr in ("start", "stop", "digest"):
            if match_row[attr] != row[attr]:
                return False
        return True


class Segment:
    def __init__(self, commit, pod, digests, start, stop, closed):
        self.commit = commit
        self.pod = pod
        self.start = start
        self.stop = stop
        self.closed = closed
        self.digest = dict(zip(commit.schema, digests))
        self._frm = None
        self.start_pos = None
        self.stop_pos = None
        self.lock = Lock()

    def __len__(self):
        return len(self.frame)

    def read(self, name, start_pos=None, stop_pos=None):
        # Prime cache
        if not name in self.frame:
            self.frame[name] = self._read(name)
        return self.frame[name][start_pos:stop_pos]

    def _read(self, name):
        dig = self.digest[name]
        # check first if content is not already in commit
        data = self.commit.embedded.get(dig)
        if data is None:
            folder, filename = hashed_path(dig)
            sub_pod = self.pod.cd(folder)
            try:
                data = sub_pod.read(filename)
            except FileNotFoundError:
                for f in sub_pod.ls():
                    # File is in soft-delete mode
                    if f.startswith(filename):
                        data = sub_pod.read(f)
                        break
                else:
                    raise RuntimeError(f'File {folder}/{filename} is missing!')

        codec = self.commit.schema[name].codec
        arr = codec.decode(data)
        return arr[self.start_pos : self.stop_pos]

    @property
    def frame(self):
        # Use a Frame instance as container to cache columns
        with self.lock:
            if self._frm is not None:
                return self._frm

            cols = {}
            # with Pool() as pool: # TODO need a smarter pool
            #     for name in self.commit.schema.idx:
            #         pool.submit(lambda: cols.update({name: self._read(name)}))
            for name in self.commit.schema.idx:
                cols[name] = self._read(name)

            frm = Frame(self.commit.schema, cols)
            self.start_pos, self.stop_pos = frm.slice_index(
                self.start, self.stop, closed=self.closed
            )
            self._frm = frm.slice(self.start_pos, self.stop_pos)
            return self._frm

Classes

class Commit (schema, label, start, stop, digest, length, closed, embedded)
Expand source code
class Commit:

    digest_codec = Codec("U")  # FIXME use better encoding
    len_codec = Codec("int")
    label_codec = Codec("str")
    closed_codec = Codec("str")  # Could be i1

    def __init__(self, schema, label, start, stop, digest, length, closed, embedded):
        assert list(digest) == list(schema)
        self.schema = schema
        self.label = label  # Array of str
        self.start = start  # Dict of Arrays
        self.stop = stop  # Dict of arrays
        self.digest = digest  # Dict of arrays
        self.length = length  # Array of int
        self.closed = closed  # Array of ("l", "r", "b", "n")
        self.embedded = embedded or {}

    @classmethod
    def one(
        cls,
        schema,
        label,
        start,
        stop,
        digest,
        length,
        closed=Closed.BOTH,
        embedded=None,
    ):
        closed = Closed.cast(closed)
        label = asarray([label])
        start = dict(zip(schema.idx, (asarray([s]) for s in start)))
        stop = dict(zip(schema.idx, (asarray([s]) for s in stop)))
        digest = dict(zip(schema, (asarray([d], dtype="U") for d in digest)))
        length = [length]
        closed = [closed.short]
        return Commit(schema, label, start, stop, digest, length, closed, embedded)

    @classmethod
    def decode(cls, schema, payload):
        msgpck = registry.codec_registry["msgpack2"]()
        data = msgpck.decode(payload)[0]
        values = {}
        # Decode starts, stops and digests
        for key in ("start", "stop", "digest"):
            key_vals = {}
            columns = schema if key == "digest" else schema.idx
            for name in columns:
                codec = cls.digest_codec if key == "digest" else schema[name].codec
                key_vals[name] = codec.decode(data[key][name])
            values[key] = key_vals

        # Decode len and labels
        values["length"] = cls.len_codec.decode(data["length"])
        values["label"] = cls.label_codec.decode(data["label"])
        values["closed"] = cls.closed_codec.decode(data["closed"])

        # Embedded data will be decoded on demand
        values["embedded"] = data.get("embedded")
        return Commit(schema, **values)

    def encode(self):
        msgpck = registry.codec_registry["msgpack2"]()
        data = {}
        # Encode starts, stops and digests
        for key in ("start", "stop", "digest"):
            columns = self.schema if key == "digest" else self.schema.idx
            key_vals = {}
            for pos, name in enumerate(columns):
                codec = (
                    self.digest_codec if key == "digest" else self.schema[name].codec
                )
                arr = getattr(self, key)[name]
                key_vals[name] = codec.encode(arr)
            data[key] = key_vals

        # Encode digests
        for name in self.schema:
            data["digest"][name] = self.digest_codec.encode(self.digest[name])

        # Encode length, closed and labels
        data["length"] = self.len_codec.encode(self.length)
        data["closed"] = self.closed_codec.encode(self.closed)
        data["label"] = self.label_codec.encode(self.label)
        # Keep only embedded data referenced by self.digest
        keep_digests = (
            set(chain.from_iterable(self.digest.values())) & self.embedded.keys()
        )
        embedded = {d: self.embedded[d] for d in sorted(keep_digests)}
        data["embedded"] = embedded
        return msgpck.encode([data])

    def split(self, label, start, stop):
        start_values = {"_label": self.label}
        start_values.update(self.start)
        stop_values = {"_label": self.label}
        stop_values.update(self.stop)
        frm_start = Frame(Schema.from_frame(start_values), start_values)
        frm_stop = Frame(Schema.from_frame(stop_values), stop_values)
        start_pos = frm_stop.index((label,) + start, right=True)
        stop_pos = frm_start.index((label,) + stop, right=False)
        return start_pos, stop_pos

    def __len__(self):
        return len(self.label)

    def at(self, pos):
        if pos < 0:
            pos = len(self) + pos
        res = {}
        for key in ("start", "stop", "digest"):
            columns = self.schema if key == "digest" else self.schema.idx
            values = getattr(self, key)
            res[key] = tuple(values[n][pos] for n in columns)

        for key in ("label", "length", "closed"):
            res[key] = getattr(self, key)[pos]
        res["embedded"] = self.embedded
        return res

    def update(self, label, start, stop, digest, length, closed="b", embedded=None):
        closed = Closed.cast(closed)
        if not start <= stop:
            raise ValueError(f"Invalid range {start} -> {stop}")
        inner = Commit.one(
            self.schema, label, start, stop, digest, length, closed, embedded
        )
        if len(self) == 0:
            return inner

        if embedded:
            self.embedded.update(embedded)

        first = (self.at(0)["label"], self.at(0)["start"])
        last = (self.at(-1)["label"], self.at(-1)["stop"])
        if (label, start) < first and (label, stop) > last:
            return inner

        start_pos, stop_pos = self.split(label, start, stop)
        # Truncate start_pos row
        head = None
        # start_pos is the result of a bisect_right, so we have to
        # check the slot on the left that may be perfect match
        start_row = None
        if start_pos > 0:
            prev_row = self.at(start_pos - 1)
            if (prev_row["label"], prev_row["stop"]) == (label, start):
                start_pos -= 1
                start_row = prev_row
        if start_row is None:
            start_row = self.at(min(start_pos, len(self) - 1))

        if (
            label == start_row["label"]
            and start_row["start"] <= start <= start_row["stop"]
        ):
            # We hit the right of an existing row
            start_row["stop"] = start
            # If closed is open/close on left, start_row become the
            # opposite on right:
            start_row["closed"] = Closed[start_row["closed"]].set_right(not closed.left)

            if (
                start_row["start"] == start_row["stop"]
                and start_row["closed"] != Closed.BOTH
            ):
                # Ignore star_row
                head = self.head(start_pos)
            else:
                head = Commit.concat(
                    self.head(start_pos),
                    Commit.one(schema=self.schema, **start_row),
                )
            # when start_row["start"] == start_row["stop"],
            # start_row stop and start are both "overshadowed" by
            # new commit

        if head is None:
            head = self.head(start_pos)

        # Truncate stop_pos row
        tail = None
        # stop_pos is the result of a bisect_left, so we have to
        # check the slot on the right that may be perfect match
        stop_row = None
        if stop_pos < len(self):
            next_row = self.at(stop_pos)
            if (next_row["label"], next_row["start"]) == (label, stop):
                stop_row = next_row
                stop_pos += 1
        if stop_row is None:
            stop_row = self.at(max(0, stop_pos - 1))

        if label == stop_row["label"] and stop_row["start"] <= stop <= stop_row["stop"]:
            # We hit the left of an existing row
            stop_row["start"] = stop
            # If closed is open/close on right, stop_row become the
            # opposite on left:
            stop_row["closed"] = Closed[stop_row["closed"]].set_left(not closed.right)

            if (
                stop_row["start"] == stop_row["stop"]
                and start_row["closed"] != Closed.BOTH
            ):
                # Ignore stop_row
                tail = self.tail(stop_pos)
            else:
                tail = Commit.concat(
                    Commit.one(schema=self.schema, **stop_row),
                    self.tail(stop_pos),
                )
            # when stop_row["start"] == stop_row["stop"],
            # stop_row stop and start are both "overshadowed" by
            # new commit
        if tail is None:
            tail = self.tail(stop_pos)
        return Commit.concat(head, inner, tail)

    def slice(self, *pos):
        slc = slice(*pos)
        schema = self.schema
        start = {name: self.start[name][slc] for name in schema.idx}
        stop = {name: self.stop[name][slc] for name in schema.idx}
        digest = {name: self.digest[name][slc] for name in schema}
        label = self.label[slc]
        length = self.length[slc]
        closed = self.closed[slc]
        return Commit(schema, label, start, stop, digest, length, closed, self.embedded)

    def head(self, pos):
        return self.slice(None, pos)

    def tail(self, pos):
        return self.slice(pos, None)

    @classmethod
    def concat(cls, commit, *other_commits):
        schema = commit.schema
        all_ci = (commit,) + other_commits
        all_ci = tuple(ci for ci in all_ci if len(ci) > 0)

        # Make sure there are no overlaps
        for prv, nxt in zip(all_ci[:-1], all_ci[1:]):
            prv_tail = prv.at(-1)
            nxt_head = nxt.at(0)
            assert (prv_tail["label"], prv_tail["stop"]) <= (
                nxt_head["label"],
                nxt_head["start"],
            )

        start = {
            name: concatenate([ci.start[name] for ci in all_ci]) for name in schema.idx
        }
        stop = {
            name: concatenate([ci.stop[name] for ci in all_ci]) for name in schema.idx
        }
        digest = {
            name: concatenate([ci.digest[name] for ci in all_ci]) for name in schema
        }
        label = concatenate([ci.label for ci in all_ci])
        length = concatenate([ci.length for ci in all_ci])
        closed = concatenate([ci.closed for ci in all_ci])
        embedded = {}
        for ci in all_ci:
            embedded.update(ci.embedded)
        return Commit(schema, label, start, stop, digest, length, closed, embedded)

    def __repr__(self):
        fmt = lambda a: "/".join(map(str, a))
        starts = list(map(fmt, zip(*self.start.values())))
        stops = list(map(fmt, zip(*self.stop.values())))
        items = "\n        ".join(
            f"{l}[{a} -> {b} ({c})]"
            for l, a, b, c in zip(self.label, starts, stops, self.closed)
        )
        return f"<Commit {items}>"

    def match(self, label):
        (matches,) = where(self.label == label)
        for pos in matches:
            yield self.at(pos)

    def segments(self, label, pod, start=None, stop=None, closed=Closed.BOTH):
        closed = Closed.cast(closed)

        # If start (or stop) is not set, and the left (right) side is
        # not closed, we ignore the very first (last) item, which
        # makes no sense
        if start is None:
            closed = closed.set_left(True)
        if stop is None:
            closed = closed.set_right(True)

        # XXX allow to pass condition instead of simple start-stop ?
        for row in self.match(label):
            arr_start = row["start"]
            arr_stop = row["stop"]
            arr_closed = Closed[row["closed"]]
            if start:
                if start > arr_stop:
                    # start is on the right of the array
                    continue
                elif not arr_closed.right and start == arr_stop:
                    # Same
                    continue
                elif start > arr_start:
                    # `closed` "win" over arr_closed on the left
                    arr_closed = arr_closed.set_left(closed.left)
                    arr_start = start
                elif start == arr_start and arr_closed.left:
                    # `closed` "win" only if array left is not already
                    # open
                    arr_closed = arr_closed.set_left(closed.left)

            if stop:
                if stop < arr_start:
                    continue
                elif not arr_closed.left and stop == arr_start:
                    continue
                elif stop < arr_stop:
                    # closed "win" over arr_closed on the right
                    arr_closed = arr_closed.set_right(closed.right)
                    arr_stop = stop
                elif stop == arr_stop and arr_closed.right:
                    arr_closed = arr_closed.set_right(closed.right)
            yield Segment(
                self,
                pod,
                row["digest"],
                start=arr_start,
                stop=arr_stop,
                closed=arr_closed,
            )

    def delete_labels(self, rm_labels):
        keep = ~isin(self.label, rm_labels)
        return self.mask(keep)

    def mask(self, keep):
        return Commit(
            schema=self.schema,
            label=self.label[keep],
            start={k: v[keep] for k, v in self.start.items()},
            stop={k: v[keep] for k, v in self.stop.items()},
            digest={k: v[keep] for k, v in self.digest.items()},
            length=self.length[keep],
            closed=self.closed[keep],
            embedded=self.embedded,
        )

    def rename_label(self, from_label, to_label):
        # First we create an extract with `from_label` only
        extract = self.mask(self.label == from_label)
        base_ci = self.delete_labels(from_label)

        # replace label in extract
        extract.label = repeat(to_label, len(extract))

        # Re-inject it
        for pos in range(len(extract)):
            row = extract.at(pos)
            base_ci = base_ci.update(**row)

        return base_ci

    def __contains__(self, row):
        start_pos, _ = self.split(row["label"], row["start"], row["stop"])
        if start_pos >= len(self):
            return False
        match_row = self.at(start_pos)
        for attr in ("start", "stop", "digest"):
            if match_row[attr] != row[attr]:
                return False
        return True

Class variables

var closed_codec
var digest_codec
var label_codec
var len_codec

Static methods

def concat(commit, *other_commits)
Expand source code
@classmethod
def concat(cls, commit, *other_commits):
    schema = commit.schema
    all_ci = (commit,) + other_commits
    all_ci = tuple(ci for ci in all_ci if len(ci) > 0)

    # Make sure there are no overlaps
    for prv, nxt in zip(all_ci[:-1], all_ci[1:]):
        prv_tail = prv.at(-1)
        nxt_head = nxt.at(0)
        assert (prv_tail["label"], prv_tail["stop"]) <= (
            nxt_head["label"],
            nxt_head["start"],
        )

    start = {
        name: concatenate([ci.start[name] for ci in all_ci]) for name in schema.idx
    }
    stop = {
        name: concatenate([ci.stop[name] for ci in all_ci]) for name in schema.idx
    }
    digest = {
        name: concatenate([ci.digest[name] for ci in all_ci]) for name in schema
    }
    label = concatenate([ci.label for ci in all_ci])
    length = concatenate([ci.length for ci in all_ci])
    closed = concatenate([ci.closed for ci in all_ci])
    embedded = {}
    for ci in all_ci:
        embedded.update(ci.embedded)
    return Commit(schema, label, start, stop, digest, length, closed, embedded)
def decode(schema, payload)
Expand source code
@classmethod
def decode(cls, schema, payload):
    msgpck = registry.codec_registry["msgpack2"]()
    data = msgpck.decode(payload)[0]
    values = {}
    # Decode starts, stops and digests
    for key in ("start", "stop", "digest"):
        key_vals = {}
        columns = schema if key == "digest" else schema.idx
        for name in columns:
            codec = cls.digest_codec if key == "digest" else schema[name].codec
            key_vals[name] = codec.decode(data[key][name])
        values[key] = key_vals

    # Decode len and labels
    values["length"] = cls.len_codec.decode(data["length"])
    values["label"] = cls.label_codec.decode(data["label"])
    values["closed"] = cls.closed_codec.decode(data["closed"])

    # Embedded data will be decoded on demand
    values["embedded"] = data.get("embedded")
    return Commit(schema, **values)
def one(schema, label, start, stop, digest, length, closed=Closed.BOTH, embedded=None)
Expand source code
@classmethod
def one(
    cls,
    schema,
    label,
    start,
    stop,
    digest,
    length,
    closed=Closed.BOTH,
    embedded=None,
):
    closed = Closed.cast(closed)
    label = asarray([label])
    start = dict(zip(schema.idx, (asarray([s]) for s in start)))
    stop = dict(zip(schema.idx, (asarray([s]) for s in stop)))
    digest = dict(zip(schema, (asarray([d], dtype="U") for d in digest)))
    length = [length]
    closed = [closed.short]
    return Commit(schema, label, start, stop, digest, length, closed, embedded)

Methods

def at(self, pos)
Expand source code
def at(self, pos):
    if pos < 0:
        pos = len(self) + pos
    res = {}
    for key in ("start", "stop", "digest"):
        columns = self.schema if key == "digest" else self.schema.idx
        values = getattr(self, key)
        res[key] = tuple(values[n][pos] for n in columns)

    for key in ("label", "length", "closed"):
        res[key] = getattr(self, key)[pos]
    res["embedded"] = self.embedded
    return res
def delete_labels(self, rm_labels)
Expand source code
def delete_labels(self, rm_labels):
    keep = ~isin(self.label, rm_labels)
    return self.mask(keep)
def encode(self)
Expand source code
def encode(self):
    msgpck = registry.codec_registry["msgpack2"]()
    data = {}
    # Encode starts, stops and digests
    for key in ("start", "stop", "digest"):
        columns = self.schema if key == "digest" else self.schema.idx
        key_vals = {}
        for pos, name in enumerate(columns):
            codec = (
                self.digest_codec if key == "digest" else self.schema[name].codec
            )
            arr = getattr(self, key)[name]
            key_vals[name] = codec.encode(arr)
        data[key] = key_vals

    # Encode digests
    for name in self.schema:
        data["digest"][name] = self.digest_codec.encode(self.digest[name])

    # Encode length, closed and labels
    data["length"] = self.len_codec.encode(self.length)
    data["closed"] = self.closed_codec.encode(self.closed)
    data["label"] = self.label_codec.encode(self.label)
    # Keep only embedded data referenced by self.digest
    keep_digests = (
        set(chain.from_iterable(self.digest.values())) & self.embedded.keys()
    )
    embedded = {d: self.embedded[d] for d in sorted(keep_digests)}
    data["embedded"] = embedded
    return msgpck.encode([data])
def head(self, pos)
Expand source code
def head(self, pos):
    return self.slice(None, pos)
def mask(self, keep)
Expand source code
def mask(self, keep):
    return Commit(
        schema=self.schema,
        label=self.label[keep],
        start={k: v[keep] for k, v in self.start.items()},
        stop={k: v[keep] for k, v in self.stop.items()},
        digest={k: v[keep] for k, v in self.digest.items()},
        length=self.length[keep],
        closed=self.closed[keep],
        embedded=self.embedded,
    )
def match(self, label)
Expand source code
def match(self, label):
    (matches,) = where(self.label == label)
    for pos in matches:
        yield self.at(pos)
def rename_label(self, from_label, to_label)
Expand source code
def rename_label(self, from_label, to_label):
    # First we create an extract with `from_label` only
    extract = self.mask(self.label == from_label)
    base_ci = self.delete_labels(from_label)

    # replace label in extract
    extract.label = repeat(to_label, len(extract))

    # Re-inject it
    for pos in range(len(extract)):
        row = extract.at(pos)
        base_ci = base_ci.update(**row)

    return base_ci
def segments(self, label, pod, start=None, stop=None, closed=Closed.BOTH)
Expand source code
def segments(self, label, pod, start=None, stop=None, closed=Closed.BOTH):
    closed = Closed.cast(closed)

    # If start (or stop) is not set, and the left (right) side is
    # not closed, we ignore the very first (last) item, which
    # makes no sense
    if start is None:
        closed = closed.set_left(True)
    if stop is None:
        closed = closed.set_right(True)

    # XXX allow to pass condition instead of simple start-stop ?
    for row in self.match(label):
        arr_start = row["start"]
        arr_stop = row["stop"]
        arr_closed = Closed[row["closed"]]
        if start:
            if start > arr_stop:
                # start is on the right of the array
                continue
            elif not arr_closed.right and start == arr_stop:
                # Same
                continue
            elif start > arr_start:
                # `closed` "win" over arr_closed on the left
                arr_closed = arr_closed.set_left(closed.left)
                arr_start = start
            elif start == arr_start and arr_closed.left:
                # `closed` "win" only if array left is not already
                # open
                arr_closed = arr_closed.set_left(closed.left)

        if stop:
            if stop < arr_start:
                continue
            elif not arr_closed.left and stop == arr_start:
                continue
            elif stop < arr_stop:
                # closed "win" over arr_closed on the right
                arr_closed = arr_closed.set_right(closed.right)
                arr_stop = stop
            elif stop == arr_stop and arr_closed.right:
                arr_closed = arr_closed.set_right(closed.right)
        yield Segment(
            self,
            pod,
            row["digest"],
            start=arr_start,
            stop=arr_stop,
            closed=arr_closed,
        )
def slice(self, *pos)
Expand source code
def slice(self, *pos):
    slc = slice(*pos)
    schema = self.schema
    start = {name: self.start[name][slc] for name in schema.idx}
    stop = {name: self.stop[name][slc] for name in schema.idx}
    digest = {name: self.digest[name][slc] for name in schema}
    label = self.label[slc]
    length = self.length[slc]
    closed = self.closed[slc]
    return Commit(schema, label, start, stop, digest, length, closed, self.embedded)
def split(self, label, start, stop)
Expand source code
def split(self, label, start, stop):
    start_values = {"_label": self.label}
    start_values.update(self.start)
    stop_values = {"_label": self.label}
    stop_values.update(self.stop)
    frm_start = Frame(Schema.from_frame(start_values), start_values)
    frm_stop = Frame(Schema.from_frame(stop_values), stop_values)
    start_pos = frm_stop.index((label,) + start, right=True)
    stop_pos = frm_start.index((label,) + stop, right=False)
    return start_pos, stop_pos
def tail(self, pos)
Expand source code
def tail(self, pos):
    return self.slice(pos, None)
def update(self, label, start, stop, digest, length, closed='b', embedded=None)
Expand source code
def update(self, label, start, stop, digest, length, closed="b", embedded=None):
    closed = Closed.cast(closed)
    if not start <= stop:
        raise ValueError(f"Invalid range {start} -> {stop}")
    inner = Commit.one(
        self.schema, label, start, stop, digest, length, closed, embedded
    )
    if len(self) == 0:
        return inner

    if embedded:
        self.embedded.update(embedded)

    first = (self.at(0)["label"], self.at(0)["start"])
    last = (self.at(-1)["label"], self.at(-1)["stop"])
    if (label, start) < first and (label, stop) > last:
        return inner

    start_pos, stop_pos = self.split(label, start, stop)
    # Truncate start_pos row
    head = None
    # start_pos is the result of a bisect_right, so we have to
    # check the slot on the left that may be perfect match
    start_row = None
    if start_pos > 0:
        prev_row = self.at(start_pos - 1)
        if (prev_row["label"], prev_row["stop"]) == (label, start):
            start_pos -= 1
            start_row = prev_row
    if start_row is None:
        start_row = self.at(min(start_pos, len(self) - 1))

    if (
        label == start_row["label"]
        and start_row["start"] <= start <= start_row["stop"]
    ):
        # We hit the right of an existing row
        start_row["stop"] = start
        # If closed is open/close on left, start_row become the
        # opposite on right:
        start_row["closed"] = Closed[start_row["closed"]].set_right(not closed.left)

        if (
            start_row["start"] == start_row["stop"]
            and start_row["closed"] != Closed.BOTH
        ):
            # Ignore star_row
            head = self.head(start_pos)
        else:
            head = Commit.concat(
                self.head(start_pos),
                Commit.one(schema=self.schema, **start_row),
            )
        # when start_row["start"] == start_row["stop"],
        # start_row stop and start are both "overshadowed" by
        # new commit

    if head is None:
        head = self.head(start_pos)

    # Truncate stop_pos row
    tail = None
    # stop_pos is the result of a bisect_left, so we have to
    # check the slot on the right that may be perfect match
    stop_row = None
    if stop_pos < len(self):
        next_row = self.at(stop_pos)
        if (next_row["label"], next_row["start"]) == (label, stop):
            stop_row = next_row
            stop_pos += 1
    if stop_row is None:
        stop_row = self.at(max(0, stop_pos - 1))

    if label == stop_row["label"] and stop_row["start"] <= stop <= stop_row["stop"]:
        # We hit the left of an existing row
        stop_row["start"] = stop
        # If closed is open/close on right, stop_row become the
        # opposite on left:
        stop_row["closed"] = Closed[stop_row["closed"]].set_left(not closed.right)

        if (
            stop_row["start"] == stop_row["stop"]
            and start_row["closed"] != Closed.BOTH
        ):
            # Ignore stop_row
            tail = self.tail(stop_pos)
        else:
            tail = Commit.concat(
                Commit.one(schema=self.schema, **stop_row),
                self.tail(stop_pos),
            )
        # when stop_row["start"] == stop_row["stop"],
        # stop_row stop and start are both "overshadowed" by
        # new commit
    if tail is None:
        tail = self.tail(stop_pos)
    return Commit.concat(head, inner, tail)
class Segment (commit, pod, digests, start, stop, closed)
Expand source code
class Segment:
    def __init__(self, commit, pod, digests, start, stop, closed):
        self.commit = commit
        self.pod = pod
        self.start = start
        self.stop = stop
        self.closed = closed
        self.digest = dict(zip(commit.schema, digests))
        self._frm = None
        self.start_pos = None
        self.stop_pos = None
        self.lock = Lock()

    def __len__(self):
        return len(self.frame)

    def read(self, name, start_pos=None, stop_pos=None):
        # Prime cache
        if not name in self.frame:
            self.frame[name] = self._read(name)
        return self.frame[name][start_pos:stop_pos]

    def _read(self, name):
        dig = self.digest[name]
        # check first if content is not already in commit
        data = self.commit.embedded.get(dig)
        if data is None:
            folder, filename = hashed_path(dig)
            sub_pod = self.pod.cd(folder)
            try:
                data = sub_pod.read(filename)
            except FileNotFoundError:
                for f in sub_pod.ls():
                    # File is in soft-delete mode
                    if f.startswith(filename):
                        data = sub_pod.read(f)
                        break
                else:
                    raise RuntimeError(f'File {folder}/{filename} is missing!')

        codec = self.commit.schema[name].codec
        arr = codec.decode(data)
        return arr[self.start_pos : self.stop_pos]

    @property
    def frame(self):
        # Use a Frame instance as container to cache columns
        with self.lock:
            if self._frm is not None:
                return self._frm

            cols = {}
            # with Pool() as pool: # TODO need a smarter pool
            #     for name in self.commit.schema.idx:
            #         pool.submit(lambda: cols.update({name: self._read(name)}))
            for name in self.commit.schema.idx:
                cols[name] = self._read(name)

            frm = Frame(self.commit.schema, cols)
            self.start_pos, self.stop_pos = frm.slice_index(
                self.start, self.stop, closed=self.closed
            )
            self._frm = frm.slice(self.start_pos, self.stop_pos)
            return self._frm

Instance variables

var frame
Expand source code
@property
def frame(self):
    # Use a Frame instance as container to cache columns
    with self.lock:
        if self._frm is not None:
            return self._frm

        cols = {}
        # with Pool() as pool: # TODO need a smarter pool
        #     for name in self.commit.schema.idx:
        #         pool.submit(lambda: cols.update({name: self._read(name)}))
        for name in self.commit.schema.idx:
            cols[name] = self._read(name)

        frm = Frame(self.commit.schema, cols)
        self.start_pos, self.stop_pos = frm.slice_index(
            self.start, self.stop, closed=self.closed
        )
        self._frm = frm.slice(self.start_pos, self.stop_pos)
        return self._frm

Methods

def read(self, name, start_pos=None, stop_pos=None)
Expand source code
def read(self, name, start_pos=None, stop_pos=None):
    # Prime cache
    if not name in self.frame:
        self.frame[name] = self._read(name)
    return self.frame[name][start_pos:stop_pos]