Module lakota.batch
Expand source code
from .changelog import phi
from .commit import Commit
# TODO batch should first do collection.repo.pod.read('arena') to know
# where to write. Or instanciate an arena object that abstract those
# concepts. it should also raise an exception if elapsed time since
# the first write gets too big.
# NOTE that changelog is not affected, because it creates data that is
# not impacted by gc
__all__ = ["Batch"]
class Batch:
def __init__(self, collection, root=False):
self.collection = collection
self._ci_info = []
self.revs = []
self.root = root
def append(self, label, start, stop, all_dig, frame_len, closed, embedded):
self._ci_info.append((label, start, stop, all_dig, frame_len, closed, embedded))
def extend(self, *other_batches):
for b in other_batches:
self._ci_info.extend(b._ci_info)
def flush(self):
# TODO abort flush if timeout is reached !
if len(self._ci_info) == 0:
return
changelog = self.collection.changelog
leaf_rev = None if self.root else changelog.leaf()
all_ci_info = iter(self._ci_info)
# Combine with last commit
if leaf_rev:
last_ci = leaf_rev.commit(self.collection)
else:
label, start, stop, all_dig, length, closed, embedded = next(all_ci_info)
last_ci = Commit.one(
self.collection.schema,
label,
start,
stop,
all_dig,
length,
closed=closed,
embedded=embedded,
)
for label, start, stop, all_dig, length, closed, embedded in all_ci_info:
last_ci = last_ci.update(
label, start, stop, all_dig, length, closed=closed, embedded=embedded
)
# Save it
payload = last_ci.encode()
parent = leaf_rev.child if leaf_rev else phi
self.revs = self.collection.changelog.commit(payload, parents=[parent])
Classes
class Batch (collection, root=False)
-
Expand source code
class Batch: def __init__(self, collection, root=False): self.collection = collection self._ci_info = [] self.revs = [] self.root = root def append(self, label, start, stop, all_dig, frame_len, closed, embedded): self._ci_info.append((label, start, stop, all_dig, frame_len, closed, embedded)) def extend(self, *other_batches): for b in other_batches: self._ci_info.extend(b._ci_info) def flush(self): # TODO abort flush if timeout is reached ! if len(self._ci_info) == 0: return changelog = self.collection.changelog leaf_rev = None if self.root else changelog.leaf() all_ci_info = iter(self._ci_info) # Combine with last commit if leaf_rev: last_ci = leaf_rev.commit(self.collection) else: label, start, stop, all_dig, length, closed, embedded = next(all_ci_info) last_ci = Commit.one( self.collection.schema, label, start, stop, all_dig, length, closed=closed, embedded=embedded, ) for label, start, stop, all_dig, length, closed, embedded in all_ci_info: last_ci = last_ci.update( label, start, stop, all_dig, length, closed=closed, embedded=embedded ) # Save it payload = last_ci.encode() parent = leaf_rev.child if leaf_rev else phi self.revs = self.collection.changelog.commit(payload, parents=[parent])
Methods
def append(self, label, start, stop, all_dig, frame_len, closed, embedded)
-
Expand source code
def append(self, label, start, stop, all_dig, frame_len, closed, embedded): self._ci_info.append((label, start, stop, all_dig, frame_len, closed, embedded))
def extend(self, *other_batches)
-
Expand source code
def extend(self, *other_batches): for b in other_batches: self._ci_info.extend(b._ci_info)
def flush(self)
-
Expand source code
def flush(self): # TODO abort flush if timeout is reached ! if len(self._ci_info) == 0: return changelog = self.collection.changelog leaf_rev = None if self.root else changelog.leaf() all_ci_info = iter(self._ci_info) # Combine with last commit if leaf_rev: last_ci = leaf_rev.commit(self.collection) else: label, start, stop, all_dig, length, closed, embedded = next(all_ci_info) last_ci = Commit.one( self.collection.schema, label, start, stop, all_dig, length, closed=closed, embedded=embedded, ) for label, start, stop, all_dig, length, closed, embedded in all_ci_info: last_ci = last_ci.update( label, start, stop, all_dig, length, closed=closed, embedded=embedded ) # Save it payload = last_ci.encode() parent = leaf_rev.child if leaf_rev else phi self.revs = self.collection.changelog.commit(payload, parents=[parent])