Module lakota.collection

Read and Writes Series

A collection is instantiated from a Repo object (see lakota.repo):

clct = repo / 'my_collection'

Series instantiation

all_series = clct.ls()

my_series = clct.series('my_series')
# or
my_series = clct.series / 'my_series'

See lakota.series on how to use Series.

The Collection.multi() method returns a contect manager that will provide atomic (and faster) writes across several series

with clct.multi():
    for label, df in ...:
        series = clct / label
        series.write(df)

Concurrent writes and synchronization

Collections can also be pushed/pulled and merged.

clct = local_repo / 'my_collection'
remote_clct = remote_repo / 'my_collection'
clct.pull(remote_clct)
clct.merge()

Trim remove past revisions

clct.trim()

Defrag Recombine small data files into bigger ones

clct.defrag()
Expand source code
"""
## Read and Writes Series

A collection is instantiated from a `lakota.repo.Repo` object (see `lakota.repo`):
```python
clct = repo / 'my_collection'
```

Series instantiation

```python
all_series = clct.ls()

my_series = clct.series('my_series')
# or
my_series = clct.series / 'my_series'
```

See `lakota.series` on how to use `lakota.series.Series`.

The `lakota.collection.Collection.multi` method returns a contect manager that will provide atomic
(and faster) writes across several series
```python
with clct.multi():
    for label, df in ...:
        series = clct / label
        series.write(df)
```

## Concurrent writes and synchronization

Collections can also be pushed/pulled and merged.

```python
clct = local_repo / 'my_collection'
remote_clct = remote_repo / 'my_collection'
clct.pull(remote_clct)
clct.merge()
```

Trim remove past revisions
```python
clct.trim()
```

Defrag Recombine small data files into bigger ones
```python
clct.defrag()
```
"""

from collections import defaultdict
from contextlib import contextmanager
from datetime import datetime, timedelta
from itertools import chain
from threading import Lock
from typing import Dict, Any
from warnings import warn

from numpy import asarray

from .batch import Batch
from .changelog import Changelog, phi
from .commit import Commit
from .series import KVSeries, Series
from .utils import Pool, hashed_path, logger, settings

__all__ = ["Collection"]


class Collection:
    def __init__(self, label, schema, path, repo):
        self.repo = repo
        self.pod = repo.pod
        self.schema = schema
        self.label = label
        self.changelog = Changelog(self.pod / path)
        self.batch = None
        self._batch_lock = Lock()

    def series(self, label):
        label = label.strip()
        if len(label) == 0:
            raise ValueError(f"Invalid label: '{label}'")
        cls = KVSeries if self.schema.kind == "kv" else Series
        return cls(label, self)

    def __truediv__(self, name):
        return self.series(name)

    def __iter__(self):
        return (self.series(n) for n in self.ls())

    def ls(self):
        rev = self.changelog.leaf()
        if rev is None:
            return []
        payload = rev.read()
        ci = Commit.decode(self.schema, payload)
        return sorted(set(ci.label))

    def delete(self, *labels):
        leaf_rev = self.changelog.leaf()
        if not leaf_rev:
            return

        ci = leaf_rev.commit(self)
        ci = ci.delete_labels(labels)
        parent = leaf_rev.child
        payload = ci.encode()
        return self.changelog.commit(payload, parents=[parent])

    def rename(self, from_label, to_label):
        """
        Rename Series
        """
        leaf_rev = self.changelog.leaf()
        if not leaf_rev:
            return

        ci = leaf_rev.commit(self)
        ci = ci.rename_label(from_label, to_label)
        parent = leaf_rev.child
        payload = ci.encode()
        return self.changelog.commit(payload, parents=[parent])

    def clone(
        self,
        other_collection: "Collection",
        rename: Dict[str, str] = None,
        defaults: Dict[str, Any] = None,
    ) -> Commit:
        if other_collection.changelog.leaf():
            raise ValueError("Clone can only be saved into an empty collection")

        other_schema = other_collection.schema
        # new2old will map the "old" label corresponding to it's new
        # label (in other_schema)
        new2old = {}
        if rename:
            for old, new in rename.items():
                assert new in other_schema
                new2old[new] = old

        leaf_rev = self.changelog.leaf()
        defaults = defaults or {}
        leaf_ci = leaf_rev.commit(self)
        all_dig = {}
        start = {} #n: [] for n in leaf_ci.start
        stop = {} #n: [] for n in leaf_ci.stop
        embedded = {}

        for col in other_schema:
            old_col = new2old.get(col, col)

            # TODO: raise an exception if a column is removed from the
            # index!

            # Determine default value
            default_value = defaults.get(col, other_schema[col].zero())

            # Add zero/default to start & stop
            if col in other_schema.idx:
                if old_col in leaf_ci.start:
                    start[col] = leaf_ci.start[old_col]
                    stop[col] = leaf_ci.stop[old_col]
                else:
                    start_stop = [default_value] * len(leaf_ci.length)
                    start[col] = start_stop
                    stop[col] = start_stop

            if old_col in leaf_ci.digest:
                # XXX raise exception if dtype is different
                all_dig[col] = leaf_ci.digest[old_col]
                continue

            # build digest & embed arrays (based on zeroes arrays)
            values = [[default_value] * l for l in leaf_ci.length]
            sch_col = other_schema[col]
            col_embedded = {}

            # Compute digest and fill all_dig
            all_dig[col] = []
            for v in values:
                arr = sch_col.cast(asarray(v))
                encoded, digest = sch_col.codec.encode(arr, with_digest=True)
                col_embedded[digest] = encoded
                # Add new coll to digest dict
                all_dig[col].append(digest)

            # Add data to embedded
            embedded.update(col_embedded)
        embedded.update(leaf_ci.embedded)

        new_ci = Commit(
            other_schema,
            leaf_ci.label,
            start,
            stop,
            all_dig,
            leaf_ci.length,
            closed=leaf_ci.closed,
            embedded=embedded,
        )
        return other_collection.changelog.commit(new_ci.encode(), parents=[phi])

    def refresh(self):
        self.changelog.refresh()

    def push(self, remote, shallow=False):
        return remote.pull(self, shallow=shallow)

    def pull(self, remote, shallow=False):
        """
        Pull remote series into self
        """
        assert isinstance(remote, Collection), "A Collection instance is required"

        local_digs = set(self.digests())
        if shallow:
            remote_digs = set(remote.digests(remote.changelog.leafs()))
        else:
            remote_digs = set(remote.digests())
        sync = lambda path: self.pod.write(path, remote.pod.read(path))
        with Pool() as pool:
            for dig in remote_digs:
                if dig in local_digs:
                    continue
                folder, filename = hashed_path(dig)
                path = folder / filename
                pool.submit(sync, path)

        self.changelog.pull(remote.changelog, shallow=shallow)

    def merge(self, *heads):
        revisions = self.changelog.log()
        # Corner cases
        if not revisions:
            return []
        if not heads:
            heads = [r for r in revisions if r.is_leaf]

        # We may have multiple revision pointing to the same child
        # (aka a previous commit). No need to merge again.
        if len(set(r.digests.child for r in heads)) < 2:
            return []

        # Reorganise revision as child->parents dict
        ch2pr = defaultdict(list)
        for r in revisions:
            ch2pr[r.child].append(r)

        # Find common root
        root = None
        first_parents, *other_parents = [
            list(self._find_parents(h, ch2pr)) for h in heads
        ]
        for root in first_parents:
            if all(root in op for op in other_parents):
                break

        # Reify commits, changelog.log is a depth first traversal, so
        # the first head is also the oldest branch.
        first_ci, *other_ci = [h.commit(self) for h in heads]
        root_ci = root.commit(self) if root else []
        # Pile all rows for all other commit into the first one
        self.batch = True  # TODO use a real batch instance but adapt
        # multi() to accept the list of heads as
        # parent (and batch will use first parent as
        # last_ci in flush)
        for ci in other_ci:
            for pos in range(len(ci)):
                row = ci.at(pos)
                # Skip existing rows
                if row in first_ci or row in root_ci:
                    continue

                # Re-apply row
                closed = row["closed"]
                if closed == "b":
                    # Closed commit can be applied as-is
                    first_ci = first_ci.update(**row)
                else:
                    # Non-closed: we read and rewrite
                    series = self / row["label"]
                    frm = series.frame(
                        start=row["start"], stop=row["stop"], closed=closed, from_ci=ci
                    )
                    ci_info = series.write(
                        frm
                    )  # since batch is true series simply returns info
                    first_ci = first_ci.update(*ci_info)

        # encode and commit
        payload = first_ci.encode()
        revs = self.changelog.commit(payload, parents=set(h.child for h in heads))
        self.batch = False
        return revs

    @staticmethod
    def _find_parents(rev, ch2pr):
        queue = ch2pr[rev.child][:]
        while queue:
            rev = queue.pop()
            # Append children
            parents = ch2pr[rev.parent]
            queue.extend(parents)
            yield rev

    def squash(self, trim=None, max_chunk=settings.defrag_max_chunk):
        """
        Remove past revisions, collapse each series into one or few large
        frames. Returns newly created revisions.

        If `trim` is None (the default), all revisions older than
        twice the timeout (see `settings.timeout` in `utils.py`) are
        removed.  If set to False, the full history is kept. If set to
        a datetime, all the revisions older than the given value will
        be deleted, keeping the recent history.

        The `max_chunk` parameter defines a limit over which the
        method will rewrite a series. If a given series comprise a
        small number of chunk (aka less than `max_chunk`) it will be
        kept as is and no rewrite will be attempted.

        If `max_chunk` is less or equal to zero, no new revision is
        created.
        """
        warn(
            "Collection.squash will be deprecated, please use"
            "Collection.trim & Collection.defrag"
        )

        revs = self.defrag(max_chunk=max_chunk)
        if trim is not False:
            self.trim(trim)
        return revs

    def trim(self, before=None):
        """
        If `before` is None (the default), all revisions older than twice
        the timeout (see `settings.timeout` in `utils.py`) are
        removed.  If set to a datetime, all the revisions older than
        the given value will be deleted, keeping at least the last
        commit.
        """
        logger.info('Trim collection "%s"', self.label)
        if before is None:
            before = datetime.now() - timedelta(seconds=settings.timeout) * 2

        # Read existing revisions, make sure we don't erase the last one
        revs = self.changelog.log(before=before)
        if not revs:
            return 0
        if revs[-1].is_leaf:
            full_revs = self.changelog.log()
            if len(full_revs) == len(revs):
                revs = revs[:-1]

        self.changelog.pod.rm_many([r.path for r in revs])
        self.changelog.refresh()
        return len(revs)

    def defrag(self, max_chunk=settings.defrag_max_chunk):
        # Rewrite each series
        leaf = self.changelog.leaf()
        commit = leaf and leaf.commit(self)
        with self.multi() as batch:
            with Pool() as pool:
                for series in self:
                    pool.submit(self._defrag_series, series, commit, max_chunk)

        return batch.revs

    def _defrag_series(self, series, commit, max_chunk):
        logger.info('Defrag series "%s/%s"', self.label, series.label)
        # Re-write series. We use _find_defrag_start to fast-forward
        # in the series (we bet on the fact that most series are
        # append-only)
        start, closed = self._find_defrag_start(commit, series, max_chunk)
        prev_stop = None
        for frm in series.paginate(start=start, closed=closed):
            series.write(frm, start=prev_stop, closed="r" if prev_stop else "b")
            prev_stop = frm.stop()

    def _find_defrag_start(self, commit, series, max_chunk):
        """
        Find the first "small" segment , and return its start values.
        """
        assert max_chunk > 0, "Parameter 'max_chunk' must be bigger than 0"
        rows = list(commit.match(series.label))
        if len(rows) <= max_chunk:
            return rows[-1]["stop"], "RIGHT"

        # Define a minimal acceptable len
        total_len = sum(row["length"] for row in rows)
        threshold = min(settings.page_len, total_len / (max_chunk + 1))

        for row in rows[:-1]:
            # Stop at first small row
            if row["length"] < threshold:
                return row["start"], "BOTH"
        return rows[-1]["stop"], "RIGHT"

    def digests(self, revisions=None):
        if revisions is None:
            revisions = self.changelog.log()
        for rev in revisions:
            ci = rev.commit(self)
            digs = set(chain.from_iterable(ci.digest.values()))
            # return only digest not already embedded in the commit
            digs = digs - set(ci.embedded)
            yield from digs

    @contextmanager
    def multi(self, root=None):
        with self._batch_lock:
            b = Batch(self, root)
            self.batch = b
            yield b
            b.flush()
            self.batch = None

Classes

class Collection (label, schema, path, repo)
Expand source code
class Collection:
    def __init__(self, label, schema, path, repo):
        self.repo = repo
        self.pod = repo.pod
        self.schema = schema
        self.label = label
        self.changelog = Changelog(self.pod / path)
        self.batch = None
        self._batch_lock = Lock()

    def series(self, label):
        label = label.strip()
        if len(label) == 0:
            raise ValueError(f"Invalid label: '{label}'")
        cls = KVSeries if self.schema.kind == "kv" else Series
        return cls(label, self)

    def __truediv__(self, name):
        return self.series(name)

    def __iter__(self):
        return (self.series(n) for n in self.ls())

    def ls(self):
        rev = self.changelog.leaf()
        if rev is None:
            return []
        payload = rev.read()
        ci = Commit.decode(self.schema, payload)
        return sorted(set(ci.label))

    def delete(self, *labels):
        leaf_rev = self.changelog.leaf()
        if not leaf_rev:
            return

        ci = leaf_rev.commit(self)
        ci = ci.delete_labels(labels)
        parent = leaf_rev.child
        payload = ci.encode()
        return self.changelog.commit(payload, parents=[parent])

    def rename(self, from_label, to_label):
        """
        Rename Series
        """
        leaf_rev = self.changelog.leaf()
        if not leaf_rev:
            return

        ci = leaf_rev.commit(self)
        ci = ci.rename_label(from_label, to_label)
        parent = leaf_rev.child
        payload = ci.encode()
        return self.changelog.commit(payload, parents=[parent])

    def clone(
        self,
        other_collection: "Collection",
        rename: Dict[str, str] = None,
        defaults: Dict[str, Any] = None,
    ) -> Commit:
        if other_collection.changelog.leaf():
            raise ValueError("Clone can only be saved into an empty collection")

        other_schema = other_collection.schema
        # new2old will map the "old" label corresponding to it's new
        # label (in other_schema)
        new2old = {}
        if rename:
            for old, new in rename.items():
                assert new in other_schema
                new2old[new] = old

        leaf_rev = self.changelog.leaf()
        defaults = defaults or {}
        leaf_ci = leaf_rev.commit(self)
        all_dig = {}
        start = {} #n: [] for n in leaf_ci.start
        stop = {} #n: [] for n in leaf_ci.stop
        embedded = {}

        for col in other_schema:
            old_col = new2old.get(col, col)

            # TODO: raise an exception if a column is removed from the
            # index!

            # Determine default value
            default_value = defaults.get(col, other_schema[col].zero())

            # Add zero/default to start & stop
            if col in other_schema.idx:
                if old_col in leaf_ci.start:
                    start[col] = leaf_ci.start[old_col]
                    stop[col] = leaf_ci.stop[old_col]
                else:
                    start_stop = [default_value] * len(leaf_ci.length)
                    start[col] = start_stop
                    stop[col] = start_stop

            if old_col in leaf_ci.digest:
                # XXX raise exception if dtype is different
                all_dig[col] = leaf_ci.digest[old_col]
                continue

            # build digest & embed arrays (based on zeroes arrays)
            values = [[default_value] * l for l in leaf_ci.length]
            sch_col = other_schema[col]
            col_embedded = {}

            # Compute digest and fill all_dig
            all_dig[col] = []
            for v in values:
                arr = sch_col.cast(asarray(v))
                encoded, digest = sch_col.codec.encode(arr, with_digest=True)
                col_embedded[digest] = encoded
                # Add new coll to digest dict
                all_dig[col].append(digest)

            # Add data to embedded
            embedded.update(col_embedded)
        embedded.update(leaf_ci.embedded)

        new_ci = Commit(
            other_schema,
            leaf_ci.label,
            start,
            stop,
            all_dig,
            leaf_ci.length,
            closed=leaf_ci.closed,
            embedded=embedded,
        )
        return other_collection.changelog.commit(new_ci.encode(), parents=[phi])

    def refresh(self):
        self.changelog.refresh()

    def push(self, remote, shallow=False):
        return remote.pull(self, shallow=shallow)

    def pull(self, remote, shallow=False):
        """
        Pull remote series into self
        """
        assert isinstance(remote, Collection), "A Collection instance is required"

        local_digs = set(self.digests())
        if shallow:
            remote_digs = set(remote.digests(remote.changelog.leafs()))
        else:
            remote_digs = set(remote.digests())
        sync = lambda path: self.pod.write(path, remote.pod.read(path))
        with Pool() as pool:
            for dig in remote_digs:
                if dig in local_digs:
                    continue
                folder, filename = hashed_path(dig)
                path = folder / filename
                pool.submit(sync, path)

        self.changelog.pull(remote.changelog, shallow=shallow)

    def merge(self, *heads):
        revisions = self.changelog.log()
        # Corner cases
        if not revisions:
            return []
        if not heads:
            heads = [r for r in revisions if r.is_leaf]

        # We may have multiple revision pointing to the same child
        # (aka a previous commit). No need to merge again.
        if len(set(r.digests.child for r in heads)) < 2:
            return []

        # Reorganise revision as child->parents dict
        ch2pr = defaultdict(list)
        for r in revisions:
            ch2pr[r.child].append(r)

        # Find common root
        root = None
        first_parents, *other_parents = [
            list(self._find_parents(h, ch2pr)) for h in heads
        ]
        for root in first_parents:
            if all(root in op for op in other_parents):
                break

        # Reify commits, changelog.log is a depth first traversal, so
        # the first head is also the oldest branch.
        first_ci, *other_ci = [h.commit(self) for h in heads]
        root_ci = root.commit(self) if root else []
        # Pile all rows for all other commit into the first one
        self.batch = True  # TODO use a real batch instance but adapt
        # multi() to accept the list of heads as
        # parent (and batch will use first parent as
        # last_ci in flush)
        for ci in other_ci:
            for pos in range(len(ci)):
                row = ci.at(pos)
                # Skip existing rows
                if row in first_ci or row in root_ci:
                    continue

                # Re-apply row
                closed = row["closed"]
                if closed == "b":
                    # Closed commit can be applied as-is
                    first_ci = first_ci.update(**row)
                else:
                    # Non-closed: we read and rewrite
                    series = self / row["label"]
                    frm = series.frame(
                        start=row["start"], stop=row["stop"], closed=closed, from_ci=ci
                    )
                    ci_info = series.write(
                        frm
                    )  # since batch is true series simply returns info
                    first_ci = first_ci.update(*ci_info)

        # encode and commit
        payload = first_ci.encode()
        revs = self.changelog.commit(payload, parents=set(h.child for h in heads))
        self.batch = False
        return revs

    @staticmethod
    def _find_parents(rev, ch2pr):
        queue = ch2pr[rev.child][:]
        while queue:
            rev = queue.pop()
            # Append children
            parents = ch2pr[rev.parent]
            queue.extend(parents)
            yield rev

    def squash(self, trim=None, max_chunk=settings.defrag_max_chunk):
        """
        Remove past revisions, collapse each series into one or few large
        frames. Returns newly created revisions.

        If `trim` is None (the default), all revisions older than
        twice the timeout (see `settings.timeout` in `utils.py`) are
        removed.  If set to False, the full history is kept. If set to
        a datetime, all the revisions older than the given value will
        be deleted, keeping the recent history.

        The `max_chunk` parameter defines a limit over which the
        method will rewrite a series. If a given series comprise a
        small number of chunk (aka less than `max_chunk`) it will be
        kept as is and no rewrite will be attempted.

        If `max_chunk` is less or equal to zero, no new revision is
        created.
        """
        warn(
            "Collection.squash will be deprecated, please use"
            "Collection.trim & Collection.defrag"
        )

        revs = self.defrag(max_chunk=max_chunk)
        if trim is not False:
            self.trim(trim)
        return revs

    def trim(self, before=None):
        """
        If `before` is None (the default), all revisions older than twice
        the timeout (see `settings.timeout` in `utils.py`) are
        removed.  If set to a datetime, all the revisions older than
        the given value will be deleted, keeping at least the last
        commit.
        """
        logger.info('Trim collection "%s"', self.label)
        if before is None:
            before = datetime.now() - timedelta(seconds=settings.timeout) * 2

        # Read existing revisions, make sure we don't erase the last one
        revs = self.changelog.log(before=before)
        if not revs:
            return 0
        if revs[-1].is_leaf:
            full_revs = self.changelog.log()
            if len(full_revs) == len(revs):
                revs = revs[:-1]

        self.changelog.pod.rm_many([r.path for r in revs])
        self.changelog.refresh()
        return len(revs)

    def defrag(self, max_chunk=settings.defrag_max_chunk):
        # Rewrite each series
        leaf = self.changelog.leaf()
        commit = leaf and leaf.commit(self)
        with self.multi() as batch:
            with Pool() as pool:
                for series in self:
                    pool.submit(self._defrag_series, series, commit, max_chunk)

        return batch.revs

    def _defrag_series(self, series, commit, max_chunk):
        logger.info('Defrag series "%s/%s"', self.label, series.label)
        # Re-write series. We use _find_defrag_start to fast-forward
        # in the series (we bet on the fact that most series are
        # append-only)
        start, closed = self._find_defrag_start(commit, series, max_chunk)
        prev_stop = None
        for frm in series.paginate(start=start, closed=closed):
            series.write(frm, start=prev_stop, closed="r" if prev_stop else "b")
            prev_stop = frm.stop()

    def _find_defrag_start(self, commit, series, max_chunk):
        """
        Find the first "small" segment , and return its start values.
        """
        assert max_chunk > 0, "Parameter 'max_chunk' must be bigger than 0"
        rows = list(commit.match(series.label))
        if len(rows) <= max_chunk:
            return rows[-1]["stop"], "RIGHT"

        # Define a minimal acceptable len
        total_len = sum(row["length"] for row in rows)
        threshold = min(settings.page_len, total_len / (max_chunk + 1))

        for row in rows[:-1]:
            # Stop at first small row
            if row["length"] < threshold:
                return row["start"], "BOTH"
        return rows[-1]["stop"], "RIGHT"

    def digests(self, revisions=None):
        if revisions is None:
            revisions = self.changelog.log()
        for rev in revisions:
            ci = rev.commit(self)
            digs = set(chain.from_iterable(ci.digest.values()))
            # return only digest not already embedded in the commit
            digs = digs - set(ci.embedded)
            yield from digs

    @contextmanager
    def multi(self, root=None):
        with self._batch_lock:
            b = Batch(self, root)
            self.batch = b
            yield b
            b.flush()
            self.batch = None

Methods

def clone(self, other_collection: Collection, rename: Dict[str, str] = None, defaults: Dict[str, Any] = None) ‑> Commit
Expand source code
def clone(
    self,
    other_collection: "Collection",
    rename: Dict[str, str] = None,
    defaults: Dict[str, Any] = None,
) -> Commit:
    if other_collection.changelog.leaf():
        raise ValueError("Clone can only be saved into an empty collection")

    other_schema = other_collection.schema
    # new2old will map the "old" label corresponding to it's new
    # label (in other_schema)
    new2old = {}
    if rename:
        for old, new in rename.items():
            assert new in other_schema
            new2old[new] = old

    leaf_rev = self.changelog.leaf()
    defaults = defaults or {}
    leaf_ci = leaf_rev.commit(self)
    all_dig = {}
    start = {} #n: [] for n in leaf_ci.start
    stop = {} #n: [] for n in leaf_ci.stop
    embedded = {}

    for col in other_schema:
        old_col = new2old.get(col, col)

        # TODO: raise an exception if a column is removed from the
        # index!

        # Determine default value
        default_value = defaults.get(col, other_schema[col].zero())

        # Add zero/default to start & stop
        if col in other_schema.idx:
            if old_col in leaf_ci.start:
                start[col] = leaf_ci.start[old_col]
                stop[col] = leaf_ci.stop[old_col]
            else:
                start_stop = [default_value] * len(leaf_ci.length)
                start[col] = start_stop
                stop[col] = start_stop

        if old_col in leaf_ci.digest:
            # XXX raise exception if dtype is different
            all_dig[col] = leaf_ci.digest[old_col]
            continue

        # build digest & embed arrays (based on zeroes arrays)
        values = [[default_value] * l for l in leaf_ci.length]
        sch_col = other_schema[col]
        col_embedded = {}

        # Compute digest and fill all_dig
        all_dig[col] = []
        for v in values:
            arr = sch_col.cast(asarray(v))
            encoded, digest = sch_col.codec.encode(arr, with_digest=True)
            col_embedded[digest] = encoded
            # Add new coll to digest dict
            all_dig[col].append(digest)

        # Add data to embedded
        embedded.update(col_embedded)
    embedded.update(leaf_ci.embedded)

    new_ci = Commit(
        other_schema,
        leaf_ci.label,
        start,
        stop,
        all_dig,
        leaf_ci.length,
        closed=leaf_ci.closed,
        embedded=embedded,
    )
    return other_collection.changelog.commit(new_ci.encode(), parents=[phi])
def defrag(self, max_chunk=4)
Expand source code
def defrag(self, max_chunk=settings.defrag_max_chunk):
    # Rewrite each series
    leaf = self.changelog.leaf()
    commit = leaf and leaf.commit(self)
    with self.multi() as batch:
        with Pool() as pool:
            for series in self:
                pool.submit(self._defrag_series, series, commit, max_chunk)

    return batch.revs
def delete(self, *labels)
Expand source code
def delete(self, *labels):
    leaf_rev = self.changelog.leaf()
    if not leaf_rev:
        return

    ci = leaf_rev.commit(self)
    ci = ci.delete_labels(labels)
    parent = leaf_rev.child
    payload = ci.encode()
    return self.changelog.commit(payload, parents=[parent])
def digests(self, revisions=None)
Expand source code
def digests(self, revisions=None):
    if revisions is None:
        revisions = self.changelog.log()
    for rev in revisions:
        ci = rev.commit(self)
        digs = set(chain.from_iterable(ci.digest.values()))
        # return only digest not already embedded in the commit
        digs = digs - set(ci.embedded)
        yield from digs
def ls(self)
Expand source code
def ls(self):
    rev = self.changelog.leaf()
    if rev is None:
        return []
    payload = rev.read()
    ci = Commit.decode(self.schema, payload)
    return sorted(set(ci.label))
def merge(self, *heads)
Expand source code
def merge(self, *heads):
    revisions = self.changelog.log()
    # Corner cases
    if not revisions:
        return []
    if not heads:
        heads = [r for r in revisions if r.is_leaf]

    # We may have multiple revision pointing to the same child
    # (aka a previous commit). No need to merge again.
    if len(set(r.digests.child for r in heads)) < 2:
        return []

    # Reorganise revision as child->parents dict
    ch2pr = defaultdict(list)
    for r in revisions:
        ch2pr[r.child].append(r)

    # Find common root
    root = None
    first_parents, *other_parents = [
        list(self._find_parents(h, ch2pr)) for h in heads
    ]
    for root in first_parents:
        if all(root in op for op in other_parents):
            break

    # Reify commits, changelog.log is a depth first traversal, so
    # the first head is also the oldest branch.
    first_ci, *other_ci = [h.commit(self) for h in heads]
    root_ci = root.commit(self) if root else []
    # Pile all rows for all other commit into the first one
    self.batch = True  # TODO use a real batch instance but adapt
    # multi() to accept the list of heads as
    # parent (and batch will use first parent as
    # last_ci in flush)
    for ci in other_ci:
        for pos in range(len(ci)):
            row = ci.at(pos)
            # Skip existing rows
            if row in first_ci or row in root_ci:
                continue

            # Re-apply row
            closed = row["closed"]
            if closed == "b":
                # Closed commit can be applied as-is
                first_ci = first_ci.update(**row)
            else:
                # Non-closed: we read and rewrite
                series = self / row["label"]
                frm = series.frame(
                    start=row["start"], stop=row["stop"], closed=closed, from_ci=ci
                )
                ci_info = series.write(
                    frm
                )  # since batch is true series simply returns info
                first_ci = first_ci.update(*ci_info)

    # encode and commit
    payload = first_ci.encode()
    revs = self.changelog.commit(payload, parents=set(h.child for h in heads))
    self.batch = False
    return revs
def multi(self, root=None)
Expand source code
@contextmanager
def multi(self, root=None):
    with self._batch_lock:
        b = Batch(self, root)
        self.batch = b
        yield b
        b.flush()
        self.batch = None
def pull(self, remote, shallow=False)

Pull remote series into self

Expand source code
def pull(self, remote, shallow=False):
    """
    Pull remote series into self
    """
    assert isinstance(remote, Collection), "A Collection instance is required"

    local_digs = set(self.digests())
    if shallow:
        remote_digs = set(remote.digests(remote.changelog.leafs()))
    else:
        remote_digs = set(remote.digests())
    sync = lambda path: self.pod.write(path, remote.pod.read(path))
    with Pool() as pool:
        for dig in remote_digs:
            if dig in local_digs:
                continue
            folder, filename = hashed_path(dig)
            path = folder / filename
            pool.submit(sync, path)

    self.changelog.pull(remote.changelog, shallow=shallow)
def push(self, remote, shallow=False)
Expand source code
def push(self, remote, shallow=False):
    return remote.pull(self, shallow=shallow)
def refresh(self)
Expand source code
def refresh(self):
    self.changelog.refresh()
def rename(self, from_label, to_label)

Rename Series

Expand source code
def rename(self, from_label, to_label):
    """
    Rename Series
    """
    leaf_rev = self.changelog.leaf()
    if not leaf_rev:
        return

    ci = leaf_rev.commit(self)
    ci = ci.rename_label(from_label, to_label)
    parent = leaf_rev.child
    payload = ci.encode()
    return self.changelog.commit(payload, parents=[parent])
def series(self, label)
Expand source code
def series(self, label):
    label = label.strip()
    if len(label) == 0:
        raise ValueError(f"Invalid label: '{label}'")
    cls = KVSeries if self.schema.kind == "kv" else Series
    return cls(label, self)
def squash(self, trim=None, max_chunk=4)

Remove past revisions, collapse each series into one or few large frames. Returns newly created revisions.

If trim is None (the default), all revisions older than twice the timeout (see settings.timeout in utils.py) are removed. If set to False, the full history is kept. If set to a datetime, all the revisions older than the given value will be deleted, keeping the recent history.

The max_chunk parameter defines a limit over which the method will rewrite a series. If a given series comprise a small number of chunk (aka less than max_chunk) it will be kept as is and no rewrite will be attempted.

If max_chunk is less or equal to zero, no new revision is created.

Expand source code
def squash(self, trim=None, max_chunk=settings.defrag_max_chunk):
    """
    Remove past revisions, collapse each series into one or few large
    frames. Returns newly created revisions.

    If `trim` is None (the default), all revisions older than
    twice the timeout (see `settings.timeout` in `utils.py`) are
    removed.  If set to False, the full history is kept. If set to
    a datetime, all the revisions older than the given value will
    be deleted, keeping the recent history.

    The `max_chunk` parameter defines a limit over which the
    method will rewrite a series. If a given series comprise a
    small number of chunk (aka less than `max_chunk`) it will be
    kept as is and no rewrite will be attempted.

    If `max_chunk` is less or equal to zero, no new revision is
    created.
    """
    warn(
        "Collection.squash will be deprecated, please use"
        "Collection.trim & Collection.defrag"
    )

    revs = self.defrag(max_chunk=max_chunk)
    if trim is not False:
        self.trim(trim)
    return revs
def trim(self, before=None)

If before is None (the default), all revisions older than twice the timeout (see settings.timeout in utils.py) are removed. If set to a datetime, all the revisions older than the given value will be deleted, keeping at least the last commit.

Expand source code
def trim(self, before=None):
    """
    If `before` is None (the default), all revisions older than twice
    the timeout (see `settings.timeout` in `utils.py`) are
    removed.  If set to a datetime, all the revisions older than
    the given value will be deleted, keeping at least the last
    commit.
    """
    logger.info('Trim collection "%s"', self.label)
    if before is None:
        before = datetime.now() - timedelta(seconds=settings.timeout) * 2

    # Read existing revisions, make sure we don't erase the last one
    revs = self.changelog.log(before=before)
    if not revs:
        return 0
    if revs[-1].is_leaf:
        full_revs = self.changelog.log()
        if len(full_revs) == len(revs):
            revs = revs[:-1]

    self.changelog.pod.rm_many([r.path for r in revs])
    self.changelog.refresh()
    return len(revs)