Module lakota.changelog

Expand source code
from collections import defaultdict, namedtuple
from datetime import datetime
from itertools import chain, takewhile
from random import random
from time import sleep

from .commit import Commit
from .utils import Pool, hexdigest, hexhash_len, hextime

zero_hextime = "0" * 11
zero_hash = "0" * hexhash_len
phi = f"{zero_hextime}-{zero_hash}"

__all__ = ["Changelog", "Revision"]


class Changelog:

    """
    Build a tree over a pod to provide concurrent revisions
    """

    def __init__(self, pod):
        self.pod = pod
        self._log_cache = None

    def commit(self, payload, parents=None, _jitter=False):
        assert isinstance(payload, bytes)

        # Find parent & write revisions
        if not parents:
            last_revision = self.leaf()
            if last_revision is None:
                parents = [phi]
            else:
                parents = [last_revision.child]

        # Debug helper
        if _jitter:
            sleep(random())

        # Compute new key
        key = hexdigest(payload)

        # Create one commit per parent
        revs = []
        for parent in parents:
            if parent is not phi:
                parent_key = parent.split("-", 1)[1]
                if parent_key == key:
                    # Catch double writes
                    continue

            # Construct new filename and save content
            child = hextime() + "-" + key
            revision = Revision(self, parent, child)
            self.pod.write(revision.path, payload)
            revs.append(revision)

        self.refresh()
        return revs

    def refresh(self):
        self._log_cache = None

    def __iter__(self):
        yield from self.pod.ls(missing_ok=True)

    def leaf(self, before=None):
        revisions = self.log(before=before)
        if not revisions:
            return None
        return revisions[-1]

    def leafs(self):
        return [rev for rev in self.log() if rev.is_leaf]

    def log(self, before=None):
        """
        Create a list of all the active revisions
        """
        if self._log_cache is None:
            self._log_cache = list(self._log())
        if before is not None:
            if isinstance(before, datetime):
                before = hextime(before.timestamp())
            cond = lambda rev: rev.epoch < before
            return list(takewhile(cond, self._log_cache))
        return self._log_cache

    def _log(self):
        # Extract parent->children relations
        revisions = defaultdict(list)
        all_children = set()
        for name in sorted(self):
            parent, child = name.split(".")
            if parent == child:
                continue
            all_children.add(child)
            revisions[parent].append(Revision(self, parent, child))

        # `revision` is sorted low to high (because filled based on
        # `sorted(self)`, so `queue` is sorted too (high to low). This
        # means the last revision to be yielded is the last child of
        # the first branch (aka oldest parent)
        parent_revs = [r for r in revisions if r not in all_children]
        first_gen = list(chain.from_iterable(revisions[p] for p in parent_revs))
        queue = list(reversed(first_gen))

        # Depth first traversal of the tree(see
        # https://stackoverflow.com/a/5278667)
        yielded = set()
        while queue:
            rev = queue.pop()
            # Append children
            children = [] if rev.child in yielded else revisions[rev.child]
            yielded.add(rev.child)
            # Set is_leaf attribute, fill queue with next generation
            # and yield
            rev.is_leaf = not children
            queue.extend(reversed(children))
            yield rev

    def pull(self, remote, shallow=False):
        new_paths = []
        local_digests = set(r.digests for r in self.log())
        remote_revs = remote.leafs() if shallow else remote.log()
        with Pool() as pool:
            for remote_rev in remote_revs:
                if remote_rev.digests in local_digests:
                    continue
                path = remote_rev.path
                new_paths.append(path)
                payload = remote.pod.read(path)
                pool.submit(self.pod.write, path, payload)
        self.refresh()
        return new_paths


RevDigest = namedtuple("RevDigest", ["parent", "child"])


class Revision:
    # This could be implemented as a dataclass (__init__ contains no
    # logic)
    def __init__(self, changelog, parent, child):
        self.changelog = changelog
        self.parent = parent
        self.child = child
        self.is_leaf = False
        self._payload = None

    @classmethod
    def from_path(cls, changelog, path):
        parent, child = path.split(".")
        return Revision(changelog, parent, child)

    @property
    def digests(self):
        items = (self.parent, self.child)
        return RevDigest(*(i.split("-")[1] for i in items))

    @property
    def path(self):
        return f"{self.parent}.{self.child}"

    @property
    def epoch(self):
        return self.child.split("-", 1)[0]

    def __repr__(self):
        return f"<Revision {self.path} {'*' if self.is_leaf else ''}>"

    def read(self):
        if self._payload is not None:
            return self._payload
        for i in range(1, 5):
            payload = self.changelog.pod.read(self.path)
            key = hexdigest(payload)
            _, child_digest = self.digests
            # Incorrect checksum is usualy because file is being written concurrently
            if key == child_digest:
                self._payload = payload
                return payload
            else:
                sleep(i / 10)
        raise RuntimeError(f"Unable to read {self.path}")

    def commit(self, collection):
        """
        Instanciate commit based on self payload and series schema
        """
        payload = self.read()
        return Commit.decode(collection.schema, payload)

Classes

class Changelog (pod)

Build a tree over a pod to provide concurrent revisions

Expand source code
class Changelog:

    """
    Build a tree over a pod to provide concurrent revisions
    """

    def __init__(self, pod):
        self.pod = pod
        self._log_cache = None

    def commit(self, payload, parents=None, _jitter=False):
        assert isinstance(payload, bytes)

        # Find parent & write revisions
        if not parents:
            last_revision = self.leaf()
            if last_revision is None:
                parents = [phi]
            else:
                parents = [last_revision.child]

        # Debug helper
        if _jitter:
            sleep(random())

        # Compute new key
        key = hexdigest(payload)

        # Create one commit per parent
        revs = []
        for parent in parents:
            if parent is not phi:
                parent_key = parent.split("-", 1)[1]
                if parent_key == key:
                    # Catch double writes
                    continue

            # Construct new filename and save content
            child = hextime() + "-" + key
            revision = Revision(self, parent, child)
            self.pod.write(revision.path, payload)
            revs.append(revision)

        self.refresh()
        return revs

    def refresh(self):
        self._log_cache = None

    def __iter__(self):
        yield from self.pod.ls(missing_ok=True)

    def leaf(self, before=None):
        revisions = self.log(before=before)
        if not revisions:
            return None
        return revisions[-1]

    def leafs(self):
        return [rev for rev in self.log() if rev.is_leaf]

    def log(self, before=None):
        """
        Create a list of all the active revisions
        """
        if self._log_cache is None:
            self._log_cache = list(self._log())
        if before is not None:
            if isinstance(before, datetime):
                before = hextime(before.timestamp())
            cond = lambda rev: rev.epoch < before
            return list(takewhile(cond, self._log_cache))
        return self._log_cache

    def _log(self):
        # Extract parent->children relations
        revisions = defaultdict(list)
        all_children = set()
        for name in sorted(self):
            parent, child = name.split(".")
            if parent == child:
                continue
            all_children.add(child)
            revisions[parent].append(Revision(self, parent, child))

        # `revision` is sorted low to high (because filled based on
        # `sorted(self)`, so `queue` is sorted too (high to low). This
        # means the last revision to be yielded is the last child of
        # the first branch (aka oldest parent)
        parent_revs = [r for r in revisions if r not in all_children]
        first_gen = list(chain.from_iterable(revisions[p] for p in parent_revs))
        queue = list(reversed(first_gen))

        # Depth first traversal of the tree(see
        # https://stackoverflow.com/a/5278667)
        yielded = set()
        while queue:
            rev = queue.pop()
            # Append children
            children = [] if rev.child in yielded else revisions[rev.child]
            yielded.add(rev.child)
            # Set is_leaf attribute, fill queue with next generation
            # and yield
            rev.is_leaf = not children
            queue.extend(reversed(children))
            yield rev

    def pull(self, remote, shallow=False):
        new_paths = []
        local_digests = set(r.digests for r in self.log())
        remote_revs = remote.leafs() if shallow else remote.log()
        with Pool() as pool:
            for remote_rev in remote_revs:
                if remote_rev.digests in local_digests:
                    continue
                path = remote_rev.path
                new_paths.append(path)
                payload = remote.pod.read(path)
                pool.submit(self.pod.write, path, payload)
        self.refresh()
        return new_paths

Methods

def commit(self, payload, parents=None)
Expand source code
def commit(self, payload, parents=None, _jitter=False):
    assert isinstance(payload, bytes)

    # Find parent & write revisions
    if not parents:
        last_revision = self.leaf()
        if last_revision is None:
            parents = [phi]
        else:
            parents = [last_revision.child]

    # Debug helper
    if _jitter:
        sleep(random())

    # Compute new key
    key = hexdigest(payload)

    # Create one commit per parent
    revs = []
    for parent in parents:
        if parent is not phi:
            parent_key = parent.split("-", 1)[1]
            if parent_key == key:
                # Catch double writes
                continue

        # Construct new filename and save content
        child = hextime() + "-" + key
        revision = Revision(self, parent, child)
        self.pod.write(revision.path, payload)
        revs.append(revision)

    self.refresh()
    return revs
def leaf(self, before=None)
Expand source code
def leaf(self, before=None):
    revisions = self.log(before=before)
    if not revisions:
        return None
    return revisions[-1]
def leafs(self)
Expand source code
def leafs(self):
    return [rev for rev in self.log() if rev.is_leaf]
def log(self, before=None)

Create a list of all the active revisions

Expand source code
def log(self, before=None):
    """
    Create a list of all the active revisions
    """
    if self._log_cache is None:
        self._log_cache = list(self._log())
    if before is not None:
        if isinstance(before, datetime):
            before = hextime(before.timestamp())
        cond = lambda rev: rev.epoch < before
        return list(takewhile(cond, self._log_cache))
    return self._log_cache
def pull(self, remote, shallow=False)
Expand source code
def pull(self, remote, shallow=False):
    new_paths = []
    local_digests = set(r.digests for r in self.log())
    remote_revs = remote.leafs() if shallow else remote.log()
    with Pool() as pool:
        for remote_rev in remote_revs:
            if remote_rev.digests in local_digests:
                continue
            path = remote_rev.path
            new_paths.append(path)
            payload = remote.pod.read(path)
            pool.submit(self.pod.write, path, payload)
    self.refresh()
    return new_paths
def refresh(self)
Expand source code
def refresh(self):
    self._log_cache = None
class Revision (changelog, parent, child)
Expand source code
class Revision:
    # This could be implemented as a dataclass (__init__ contains no
    # logic)
    def __init__(self, changelog, parent, child):
        self.changelog = changelog
        self.parent = parent
        self.child = child
        self.is_leaf = False
        self._payload = None

    @classmethod
    def from_path(cls, changelog, path):
        parent, child = path.split(".")
        return Revision(changelog, parent, child)

    @property
    def digests(self):
        items = (self.parent, self.child)
        return RevDigest(*(i.split("-")[1] for i in items))

    @property
    def path(self):
        return f"{self.parent}.{self.child}"

    @property
    def epoch(self):
        return self.child.split("-", 1)[0]

    def __repr__(self):
        return f"<Revision {self.path} {'*' if self.is_leaf else ''}>"

    def read(self):
        if self._payload is not None:
            return self._payload
        for i in range(1, 5):
            payload = self.changelog.pod.read(self.path)
            key = hexdigest(payload)
            _, child_digest = self.digests
            # Incorrect checksum is usualy because file is being written concurrently
            if key == child_digest:
                self._payload = payload
                return payload
            else:
                sleep(i / 10)
        raise RuntimeError(f"Unable to read {self.path}")

    def commit(self, collection):
        """
        Instanciate commit based on self payload and series schema
        """
        payload = self.read()
        return Commit.decode(collection.schema, payload)

Static methods

def from_path(changelog, path)
Expand source code
@classmethod
def from_path(cls, changelog, path):
    parent, child = path.split(".")
    return Revision(changelog, parent, child)

Instance variables

var digests
Expand source code
@property
def digests(self):
    items = (self.parent, self.child)
    return RevDigest(*(i.split("-")[1] for i in items))
var epoch
Expand source code
@property
def epoch(self):
    return self.child.split("-", 1)[0]
var path
Expand source code
@property
def path(self):
    return f"{self.parent}.{self.child}"

Methods

def commit(self, collection)

Instanciate commit based on self payload and series schema

Expand source code
def commit(self, collection):
    """
    Instanciate commit based on self payload and series schema
    """
    payload = self.read()
    return Commit.decode(collection.schema, payload)
def read(self)
Expand source code
def read(self):
    if self._payload is not None:
        return self._payload
    for i in range(1, 5):
        payload = self.changelog.pod.read(self.path)
        key = hexdigest(payload)
        _, child_digest = self.digests
        # Incorrect checksum is usualy because file is being written concurrently
        if key == child_digest:
            self._payload = payload
            return payload
        else:
            sleep(i / 10)
    raise RuntimeError(f"Unable to read {self.path}")