Module lakota.repo
The Repo
class manage the organisation of a storage location. It
provides creation and deletion of collections, synchronization with
remote repositories and garbage collection.
Create repositories
Create a Repo
instance:
# in-memory
repo = Repo()
repo = Repo("memory://")
# From a local directory
repo = Repo('some/local/path')
repo = Repo('file://some/local/path')
# From an S3 location
repo = Repo('s3:///my_bucket')
# Use a list of uri to enable caching
repo = Repo(['memory://', 's3:///my_bucket'])
repo = Repo(['file:///tmp/local_cache', 's3:///my_bucket'])
S3 authentication is handled by boto3. So you can either put your credentials in a configuration files or in environment variables. Alternatively you an pass them as url arguments:
# Special characters in KEY and SECRET must be escaped (for example
# with urllib.parse.quote)
pod = POD.from_uri('s3:///bucket_name?key=KEY&secret=SECRET')
repo = Repo(pod=pod)
Similarly, you can use a compatible service through the endpoint_url
parameter:
pod = POD.from_uri('s3:///bucket_name', endpoint_url='http://127.0.0.1:5300')
repo = Repo(pod=pod)
Access collections
Create one or several collections:
# Define schema
schema = Schema(timestamp='int*', value='float')
# Create one collection
repo.create_collection(schema, 'my_collection')
# Create a few more
labels = ['one', 'or_more', 'labels']
repo.create_collection(schema, *labels)
List and instanciate collections
print(list(repo.ls())) # Print collections names
# Instanciate a collection
clct = repo.collection('my_collection')
# like pathlib, the `/` operator can be used
clct = repo / 'my_collection'
See lakota.collection
on how to manipulate collections
Garbage Collection
After some times, some series can be overwritten, deleted,
defragmented or merged. Sooner or later some pieces of data will get
dereferenced, those can be deleted to recover storage space. It is
simply done with the gc
method, which returns the number of deleted
files.
nb_hard_delete, nb_soft_delete = repo.gc()
Garbage collection is done in two-phases. On the first invocation,
dangling files are renamed, this is the soft-deletion. It is done by
adding the current time (returned by hextime()
) as a
suffix, so d3/9e/df960b6150e84bd82d5afaf2791a9b210030
will become
d3/9e/df960b6150e84bd82d5afaf2791a9b210030.17b7d6ef91c
.
Any following read on that file will fails but it automatically triggers a search for a similarly named file containing a suffix.
On the next invocation, the garbage collection will reconsider those
files with a suffix, and if the time represented by the suffix is
older than a given deadline (defined by timeout
in
Settings
), it will be definitively deleted.
Expand source code
"""
The `Repo` class manage the organisation of a storage location. It
provides creation and deletion of collections, synchronization with
remote repositories and garbage collection.
## Create repositories
Create a `Repo` instance:
```python
# in-memory
repo = Repo()
repo = Repo("memory://")
# From a local directory
repo = Repo('some/local/path')
repo = Repo('file://some/local/path')
# From an S3 location
repo = Repo('s3:///my_bucket')
# Use a list of uri to enable caching
repo = Repo(['memory://', 's3:///my_bucket'])
repo = Repo(['file:///tmp/local_cache', 's3:///my_bucket'])
```
S3 authentication is handled by
[boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
"s3 credentials"). So you can either put your credentials in a
configuration files or in environment variables. Alternatively you an
pass them as url arguments:
```python
# Special characters in KEY and SECRET must be escaped (for example
# with urllib.parse.quote)
pod = POD.from_uri('s3:///bucket_name?key=KEY&secret=SECRET')
repo = Repo(pod=pod)
```
Similarly, you can use a compatible service through the `endpoint_url`
parameter:
```python
pod = POD.from_uri('s3:///bucket_name', endpoint_url='http://127.0.0.1:5300')
repo = Repo(pod=pod)
```
## Access collections
Create one or several collections:
```python
# Define schema
schema = Schema(timestamp='int*', value='float')
# Create one collection
repo.create_collection(schema, 'my_collection')
# Create a few more
labels = ['one', 'or_more', 'labels']
repo.create_collection(schema, *labels)
```
List and instanciate collections
```python
print(list(repo.ls())) # Print collections names
# Instanciate a collection
clct = repo.collection('my_collection')
# like pathlib, the `/` operator can be used
clct = repo / 'my_collection'
```
See `lakota.collection` on how to manipulate collections
## Garbage Collection
After some times, some series can be overwritten, deleted,
defragmented or merged. Sooner or later some pieces of data will get
dereferenced, those can be deleted to recover storage space. It is
simply done with the `gc` method, which returns the number of deleted
files.
```python
nb_hard_delete, nb_soft_delete = repo.gc()
```
Garbage collection is done in two-phases. On the first invocation,
dangling files are renamed, this is the soft-deletion. It is done by
adding the current time (returned by `lakota.utils.hextime`) as a
suffix, so `d3/9e/df960b6150e84bd82d5afaf2791a9b210030` will become
`d3/9e/df960b6150e84bd82d5afaf2791a9b210030.17b7d6ef91c`.
Any following read on that file will fails but it automatically
triggers a search for a similarly named file containing a suffix.
On the next invocation, the garbage collection will reconsider those
files with a suffix, and if the time represented by the suffix is
older than a given deadline (defined by `timeout` in
`lakota.utils.Settings` ), it will be definitively deleted.
"""
import csv
import json
from io import BytesIO, StringIO
from itertools import chain
from time import time
from uuid import uuid4
from numpy import where
from .changelog import zero_hash
from .collection import Collection
from .pod import POD
from .schema import Schema
from .utils import Pool, hashed_path, hexdigest, hextime, logger, settings
__all__ = ["Repo"]
class Repo:
schema = Schema.kv(label="str*", meta="O")
def __init__(self, uri=None, pod=None):
"""
`uri` : a string or a list of string representing a storage
location
`pod`
: a `lakota.pod.POD` instance
"""
pod = pod or POD.from_uri(uri)
folder, filename = hashed_path(zero_hash)
self.pod = pod
path = folder / filename
self.registry = Collection("registry", self.schema, path, self)
def ls(self):
return [item.label for item in self.search()]
def __iter__(self):
return self.search()
def search(self, label=None, namespace="collection"):
if label:
start = stop = (label,)
else:
start = stop = None
series = self.registry.series(namespace)
frm = series.frame(start=start, stop=stop, closed="BOTH")
for label in frm["label"]:
yield self.collection(label, frm)
def __truediv__(self, name):
return self.collection(name)
def collection(self, label, from_frm=None, namespace="collection"):
series = self.registry.series(namespace)
if from_frm:
frm = from_frm.islice([label], [label], closed="BOTH")
else:
frm = series.frame(start=label, stop=label, closed="BOTH")
if frm.empty:
return None
meta = frm["meta"][-1]
return self.reify(label, meta)
def create_collection(
self, schema, *labels, raise_if_exists=True, namespace="collection"
):
"""
`schema`
: A `lakota.schema.Schema` instance
`labels`
: One or more collection name
`raise_if_exists`
: Raise an exception if the label is already present
"""
assert isinstance(
schema, Schema
), "The schema parameter must be an instance of lakota.Schema"
meta = []
schema_dump = schema.dumps()
series = self.registry.series(namespace)
current_labels = series.frame(
start=min(labels), stop=max(labels), closed="BOTH", select="label"
)["label"]
new_labels = []
for label in labels:
label = label.strip()
if len(label) == 0:
raise ValueError(f"Invalid label: {label}")
if label in current_labels:
if raise_if_exists:
msg = f"Collection with label '{label}' already exists"
raise ValueError(msg)
else:
continue
# Generate random digest
digest = hexdigest(uuid4().bytes)
folder, filename = hashed_path(digest)
new_labels.append(label)
meta.append({
"path": str(folder / filename),
"schema": schema_dump,
})
if new_labels:
series.write({"label": new_labels, "meta": meta})
# Return collections
mask = "(isin self.label (list %s))" % " ".join(
f'"{l}"' for l in labels)
frm = series.frame(
start=min(labels),
stop=max(labels),
closed="BOTH",
).mask(mask)
res = [self.collection(lbl, from_frm=frm) for lbl in labels]
if len(labels) == 1:
return res[0]
return res
def reify(self, name, meta):
schema = Schema.loads(meta["schema"])
return Collection(name, schema, meta["path"], self)
def archive(self, collection):
label = collection.label
archive = self.collection(label, mode="archive")
if archive:
return archive
return self.create_collection(collection.schema, label, mode="archive")
def delete(self, *labels, namespace="collection"):
"""
Delete one or more collections
`*labels`
: Strings, names of the collection do delete
"""
to_remove = []
for lbl in labels:
clct = self.collection(lbl)
if not clct:
continue
to_remove.append(clct.changelog.pod)
series = self.registry.series(namespace)
series.delete(*labels)
for pod in to_remove:
try:
pod.rm(".", recursive=True)
except FileNotFoundError:
continue
def refresh(self):
self.registry.refresh()
def push(self, remote, *labels, shallow=False):
"""
Push local revisions (and related segments) to `remote` Repo.
`remote`
: A `lakota.repo.Repo` instance
`labels`
: The collections to push. If not given, all collections are pushed
"""
return remote.pull(self, *labels, shallow=shallow)
def pull(self, remote, *labels, shallow=False):
"""
Pull revisions from `remote` Repo (and related segments).
`remote`
: A `lakota.repo.Repo` instance
`labels`
: The collections to pull. If not given, all collections are pulled
"""
assert isinstance(remote, Repo), "A Repo instance is required"
# Pull registry
self.registry.pull(remote.registry, shallow=shallow)
# Extract frames
local_cache = {loc.label: loc for loc in self.search()}
remote_cache = {rem.label: rem for rem in remote.search()}
if not labels:
labels = remote_cache.keys()
for label in labels:
logger.info("Sync collection: %s", label)
r_clct = remote_cache[label]
if label not in local_cache:
l_clct = self.create_collection(r_clct.schema, label)
else:
l_clct = local_cache[label]
if l_clct.schema != r_clct.schema:
msg = (
f'Unable to sync collection "{label}",'
"incompatible meta-info."
)
raise ValueError(msg)
l_clct.pull(r_clct, shallow=shallow)
def merge(self):
"""
Merge repository registry. Needed when collections have been created
or deleted concurrently.
"""
return self.registry.merge()
def rename(self, from_label, to_label, namespace="collection"):
"""
Change the label of a collection
"""
series = self.registry.series(namespace)
frm = series.frame()
idx = where(frm["label"] == from_label)[0]
if len(idx) == 0:
raise ValueError(f'Collection "{from_label}" does not exists')
if to_label in frm["label"]:
raise ValueError(f'Collection "{to_label}" already exists')
# Extract bounds
start, stop = frm.start(), frm.stop()
# unpack idx
idx = idx[0]
# rebuild label list
labels = list(frm["label"])
frm["label"] = labels[:idx] + [to_label] + labels[idx + 1:]
# Re-order frame
frm = frm.sorted()
series.write(
frm,
# Make sure we over-write the previous content:
start=min(frm.start(), start),
stop=max(frm.stop(), stop),
)
def gc(self):
"""
Loop on all series, collect all used digests, and delete obsolete
ones.
"""
logger.info("Start GC")
# Collect digests across folders (_walk_folder ignore files
# containing changelogs, it will only return segments of data)
base_folders = self.pod.ls()
with Pool() as pool:
for folder in base_folders:
pool.submit(self._walk_folder, folder)
all_dig = set(chain(*pool.results))
# Collect digest from changelogs. Because commits are written
# after the segments, we minimize chance to bury data created
# concurrently.
self.refresh()
active_digests = set(self.registry.digests())
for namespace in self.registry.ls():
for clct in self.search(namespace=namespace):
active_digests.update(clct.digests())
nb_hard_del = 0
nb_soft_del = 0
current_ts_ext = f".{hextime()}"
deadline = hextime(time() - settings.timeout)
# Soft-Delete ("bury") files on fs not in changelogs
inactive = all_dig - active_digests
for dig in inactive:
if "." not in dig:
# Disable digest
folder, filename = hashed_path(dig)
path = str(folder / filename)
self.pod.mv(path, path + current_ts_ext, missing_ok=True)
nb_soft_del += 1
continue
# Inactive file, check timestamp & delete or re-enable it
dig, ext = dig.split(".")
if ext > deadline:
# ext contains a ts created recently, we can not act on it yet
continue
folder, filename = hashed_path(dig)
path = str(folder / filename)
if dig in active_digests:
# Re-enable by removing extension
self.pod.mv(path + f".{ext}", path, missing_ok=True)
else:
# Permanent deletion
self.pod.rm(path + f".{ext}", missing_ok=True)
nb_hard_del += 1
logger.info(
"End of GC (hard deletions: %s, soft deletions: %s)",
nb_hard_del,
nb_soft_del,
)
return nb_hard_del, nb_soft_del
def _walk_folder(self, folder):
"""
Return list of digests contained in the top folder (ignoring
content deeper than 2 subfolders and so ignores changelog
files)
"""
digs = []
pod = self.pod.cd(folder)
for filename in pod.walk(max_depth=2):
digs.append(folder + filename.replace("/", ""))
return digs
def import_collections(self, src, collections=None):
"""
Import collections from given `src`. It can url accepted by
`POD.from_uri` or a `POD` instance. `collections` is the list of
collections to load (all collection are loaded if not set).
"""
if not isinstance(src, POD):
src = POD.from_uri(src)
names = collections or src.ls()
for clc_name in names:
clc = self / clc_name
pod = src.cd(clc_name)
if clc is None:
json_schema = pod.read("_schema.json").decode()
schema = Schema.loads(json.loads(json_schema))
clc = self.create_collection(schema, clc_name)
logger.info('Import collection "%s"', clc_name)
with clc.multi():
for file_name in pod.ls():
if file_name.startswith("_"):
continue
self.import_series(pod, clc, file_name)
def import_series(self, from_pod, collection, filename):
stem, ext = filename.rsplit(".", 1)
column_names = sorted(collection.schema)
if ext == "csv":
buff = StringIO(from_pod.read(filename).decode())
reader = csv.reader(buff)
headers = next(reader)
assert sorted(headers) == column_names
columns = zip(*reader)
frm = dict(zip(headers, columns))
srs = collection / stem
srs.write(frm)
elif ext == "parquet":
from pandas import read_parquet
buff = BytesIO(from_pod.read(filename))
df = read_parquet(buff)
assert sorted(df.columns) == column_names
srs = collection / stem
srs.write(df)
else:
msg = f"Unable to load {filename}, extension not supported"
raise ValueError(msg)
def export_collections(self, dest, collections=None, file_type="csv"):
if not isinstance(dest, POD):
dest = POD.from_uri(dest)
names = collections or self.ls()
for clc_name in names:
clc = self / clc_name
if clc is None:
logger.warn('Collection "%s" not found', clc_name)
pod = dest.cd(clc_name)
logger.info('Export collection "%s"', clc_name)
schema = clc.schema.dumps()
pod.write("_schema.json", json.dumps(schema).encode())
for srs in clc:
# Read series
self.export_series(pod, srs, file_type)
def export_series(self, pod, series, file_type):
if file_type == "csv":
frm = series.frame()
columns = list(frm)
# Save series as csv in buff
buff = StringIO()
writer = csv.writer(buff)
writer.writerow(columns)
rows = zip(*(frm[c] for c in columns))
writer.writerows(rows)
# Write generated content in pod
buff.seek(0)
pod.write(f"{series.label}.csv", buff.read().encode())
elif file_type == "parquet":
df = series.df()
data = df.to_parquet()
pod.write(f"{series.label}.parquet", data)
else:
exit(f'Unsupported file type "{file_type}"')
Classes
class Repo (uri=None, pod=None)
-
uri
: a string or a list of string representing a storage locationpod
- a
POD
instance
Expand source code
class Repo: schema = Schema.kv(label="str*", meta="O") def __init__(self, uri=None, pod=None): """ `uri` : a string or a list of string representing a storage location `pod` : a `lakota.pod.POD` instance """ pod = pod or POD.from_uri(uri) folder, filename = hashed_path(zero_hash) self.pod = pod path = folder / filename self.registry = Collection("registry", self.schema, path, self) def ls(self): return [item.label for item in self.search()] def __iter__(self): return self.search() def search(self, label=None, namespace="collection"): if label: start = stop = (label,) else: start = stop = None series = self.registry.series(namespace) frm = series.frame(start=start, stop=stop, closed="BOTH") for label in frm["label"]: yield self.collection(label, frm) def __truediv__(self, name): return self.collection(name) def collection(self, label, from_frm=None, namespace="collection"): series = self.registry.series(namespace) if from_frm: frm = from_frm.islice([label], [label], closed="BOTH") else: frm = series.frame(start=label, stop=label, closed="BOTH") if frm.empty: return None meta = frm["meta"][-1] return self.reify(label, meta) def create_collection( self, schema, *labels, raise_if_exists=True, namespace="collection" ): """ `schema` : A `lakota.schema.Schema` instance `labels` : One or more collection name `raise_if_exists` : Raise an exception if the label is already present """ assert isinstance( schema, Schema ), "The schema parameter must be an instance of lakota.Schema" meta = [] schema_dump = schema.dumps() series = self.registry.series(namespace) current_labels = series.frame( start=min(labels), stop=max(labels), closed="BOTH", select="label" )["label"] new_labels = [] for label in labels: label = label.strip() if len(label) == 0: raise ValueError(f"Invalid label: {label}") if label in current_labels: if raise_if_exists: msg = f"Collection with label '{label}' already exists" raise ValueError(msg) else: continue # Generate random digest digest = hexdigest(uuid4().bytes) folder, filename = hashed_path(digest) new_labels.append(label) meta.append({ "path": str(folder / filename), "schema": schema_dump, }) if new_labels: series.write({"label": new_labels, "meta": meta}) # Return collections mask = "(isin self.label (list %s))" % " ".join( f'"{l}"' for l in labels) frm = series.frame( start=min(labels), stop=max(labels), closed="BOTH", ).mask(mask) res = [self.collection(lbl, from_frm=frm) for lbl in labels] if len(labels) == 1: return res[0] return res def reify(self, name, meta): schema = Schema.loads(meta["schema"]) return Collection(name, schema, meta["path"], self) def archive(self, collection): label = collection.label archive = self.collection(label, mode="archive") if archive: return archive return self.create_collection(collection.schema, label, mode="archive") def delete(self, *labels, namespace="collection"): """ Delete one or more collections `*labels` : Strings, names of the collection do delete """ to_remove = [] for lbl in labels: clct = self.collection(lbl) if not clct: continue to_remove.append(clct.changelog.pod) series = self.registry.series(namespace) series.delete(*labels) for pod in to_remove: try: pod.rm(".", recursive=True) except FileNotFoundError: continue def refresh(self): self.registry.refresh() def push(self, remote, *labels, shallow=False): """ Push local revisions (and related segments) to `remote` Repo. `remote` : A `lakota.repo.Repo` instance `labels` : The collections to push. If not given, all collections are pushed """ return remote.pull(self, *labels, shallow=shallow) def pull(self, remote, *labels, shallow=False): """ Pull revisions from `remote` Repo (and related segments). `remote` : A `lakota.repo.Repo` instance `labels` : The collections to pull. If not given, all collections are pulled """ assert isinstance(remote, Repo), "A Repo instance is required" # Pull registry self.registry.pull(remote.registry, shallow=shallow) # Extract frames local_cache = {loc.label: loc for loc in self.search()} remote_cache = {rem.label: rem for rem in remote.search()} if not labels: labels = remote_cache.keys() for label in labels: logger.info("Sync collection: %s", label) r_clct = remote_cache[label] if label not in local_cache: l_clct = self.create_collection(r_clct.schema, label) else: l_clct = local_cache[label] if l_clct.schema != r_clct.schema: msg = ( f'Unable to sync collection "{label}",' "incompatible meta-info." ) raise ValueError(msg) l_clct.pull(r_clct, shallow=shallow) def merge(self): """ Merge repository registry. Needed when collections have been created or deleted concurrently. """ return self.registry.merge() def rename(self, from_label, to_label, namespace="collection"): """ Change the label of a collection """ series = self.registry.series(namespace) frm = series.frame() idx = where(frm["label"] == from_label)[0] if len(idx) == 0: raise ValueError(f'Collection "{from_label}" does not exists') if to_label in frm["label"]: raise ValueError(f'Collection "{to_label}" already exists') # Extract bounds start, stop = frm.start(), frm.stop() # unpack idx idx = idx[0] # rebuild label list labels = list(frm["label"]) frm["label"] = labels[:idx] + [to_label] + labels[idx + 1:] # Re-order frame frm = frm.sorted() series.write( frm, # Make sure we over-write the previous content: start=min(frm.start(), start), stop=max(frm.stop(), stop), ) def gc(self): """ Loop on all series, collect all used digests, and delete obsolete ones. """ logger.info("Start GC") # Collect digests across folders (_walk_folder ignore files # containing changelogs, it will only return segments of data) base_folders = self.pod.ls() with Pool() as pool: for folder in base_folders: pool.submit(self._walk_folder, folder) all_dig = set(chain(*pool.results)) # Collect digest from changelogs. Because commits are written # after the segments, we minimize chance to bury data created # concurrently. self.refresh() active_digests = set(self.registry.digests()) for namespace in self.registry.ls(): for clct in self.search(namespace=namespace): active_digests.update(clct.digests()) nb_hard_del = 0 nb_soft_del = 0 current_ts_ext = f".{hextime()}" deadline = hextime(time() - settings.timeout) # Soft-Delete ("bury") files on fs not in changelogs inactive = all_dig - active_digests for dig in inactive: if "." not in dig: # Disable digest folder, filename = hashed_path(dig) path = str(folder / filename) self.pod.mv(path, path + current_ts_ext, missing_ok=True) nb_soft_del += 1 continue # Inactive file, check timestamp & delete or re-enable it dig, ext = dig.split(".") if ext > deadline: # ext contains a ts created recently, we can not act on it yet continue folder, filename = hashed_path(dig) path = str(folder / filename) if dig in active_digests: # Re-enable by removing extension self.pod.mv(path + f".{ext}", path, missing_ok=True) else: # Permanent deletion self.pod.rm(path + f".{ext}", missing_ok=True) nb_hard_del += 1 logger.info( "End of GC (hard deletions: %s, soft deletions: %s)", nb_hard_del, nb_soft_del, ) return nb_hard_del, nb_soft_del def _walk_folder(self, folder): """ Return list of digests contained in the top folder (ignoring content deeper than 2 subfolders and so ignores changelog files) """ digs = [] pod = self.pod.cd(folder) for filename in pod.walk(max_depth=2): digs.append(folder + filename.replace("/", "")) return digs def import_collections(self, src, collections=None): """ Import collections from given `src`. It can url accepted by `POD.from_uri` or a `POD` instance. `collections` is the list of collections to load (all collection are loaded if not set). """ if not isinstance(src, POD): src = POD.from_uri(src) names = collections or src.ls() for clc_name in names: clc = self / clc_name pod = src.cd(clc_name) if clc is None: json_schema = pod.read("_schema.json").decode() schema = Schema.loads(json.loads(json_schema)) clc = self.create_collection(schema, clc_name) logger.info('Import collection "%s"', clc_name) with clc.multi(): for file_name in pod.ls(): if file_name.startswith("_"): continue self.import_series(pod, clc, file_name) def import_series(self, from_pod, collection, filename): stem, ext = filename.rsplit(".", 1) column_names = sorted(collection.schema) if ext == "csv": buff = StringIO(from_pod.read(filename).decode()) reader = csv.reader(buff) headers = next(reader) assert sorted(headers) == column_names columns = zip(*reader) frm = dict(zip(headers, columns)) srs = collection / stem srs.write(frm) elif ext == "parquet": from pandas import read_parquet buff = BytesIO(from_pod.read(filename)) df = read_parquet(buff) assert sorted(df.columns) == column_names srs = collection / stem srs.write(df) else: msg = f"Unable to load {filename}, extension not supported" raise ValueError(msg) def export_collections(self, dest, collections=None, file_type="csv"): if not isinstance(dest, POD): dest = POD.from_uri(dest) names = collections or self.ls() for clc_name in names: clc = self / clc_name if clc is None: logger.warn('Collection "%s" not found', clc_name) pod = dest.cd(clc_name) logger.info('Export collection "%s"', clc_name) schema = clc.schema.dumps() pod.write("_schema.json", json.dumps(schema).encode()) for srs in clc: # Read series self.export_series(pod, srs, file_type) def export_series(self, pod, series, file_type): if file_type == "csv": frm = series.frame() columns = list(frm) # Save series as csv in buff buff = StringIO() writer = csv.writer(buff) writer.writerow(columns) rows = zip(*(frm[c] for c in columns)) writer.writerows(rows) # Write generated content in pod buff.seek(0) pod.write(f"{series.label}.csv", buff.read().encode()) elif file_type == "parquet": df = series.df() data = df.to_parquet() pod.write(f"{series.label}.parquet", data) else: exit(f'Unsupported file type "{file_type}"')
Class variables
var schema
Methods
def archive(self, collection)
-
Expand source code
def archive(self, collection): label = collection.label archive = self.collection(label, mode="archive") if archive: return archive return self.create_collection(collection.schema, label, mode="archive")
def collection(self, label, from_frm=None, namespace='collection')
-
Expand source code
def collection(self, label, from_frm=None, namespace="collection"): series = self.registry.series(namespace) if from_frm: frm = from_frm.islice([label], [label], closed="BOTH") else: frm = series.frame(start=label, stop=label, closed="BOTH") if frm.empty: return None meta = frm["meta"][-1] return self.reify(label, meta)
def create_collection(self, schema, *labels, raise_if_exists=True, namespace='collection')
-
schema
- A
Schema
instance labels
- One or more collection name
raise_if_exists
- Raise an exception if the label is already present
Expand source code
def create_collection( self, schema, *labels, raise_if_exists=True, namespace="collection" ): """ `schema` : A `lakota.schema.Schema` instance `labels` : One or more collection name `raise_if_exists` : Raise an exception if the label is already present """ assert isinstance( schema, Schema ), "The schema parameter must be an instance of lakota.Schema" meta = [] schema_dump = schema.dumps() series = self.registry.series(namespace) current_labels = series.frame( start=min(labels), stop=max(labels), closed="BOTH", select="label" )["label"] new_labels = [] for label in labels: label = label.strip() if len(label) == 0: raise ValueError(f"Invalid label: {label}") if label in current_labels: if raise_if_exists: msg = f"Collection with label '{label}' already exists" raise ValueError(msg) else: continue # Generate random digest digest = hexdigest(uuid4().bytes) folder, filename = hashed_path(digest) new_labels.append(label) meta.append({ "path": str(folder / filename), "schema": schema_dump, }) if new_labels: series.write({"label": new_labels, "meta": meta}) # Return collections mask = "(isin self.label (list %s))" % " ".join( f'"{l}"' for l in labels) frm = series.frame( start=min(labels), stop=max(labels), closed="BOTH", ).mask(mask) res = [self.collection(lbl, from_frm=frm) for lbl in labels] if len(labels) == 1: return res[0] return res
def delete(self, *labels, namespace='collection')
-
Delete one or more collections
*labels
- Strings, names of the collection do delete
Expand source code
def delete(self, *labels, namespace="collection"): """ Delete one or more collections `*labels` : Strings, names of the collection do delete """ to_remove = [] for lbl in labels: clct = self.collection(lbl) if not clct: continue to_remove.append(clct.changelog.pod) series = self.registry.series(namespace) series.delete(*labels) for pod in to_remove: try: pod.rm(".", recursive=True) except FileNotFoundError: continue
def export_collections(self, dest, collections=None, file_type='csv')
-
Expand source code
def export_collections(self, dest, collections=None, file_type="csv"): if not isinstance(dest, POD): dest = POD.from_uri(dest) names = collections or self.ls() for clc_name in names: clc = self / clc_name if clc is None: logger.warn('Collection "%s" not found', clc_name) pod = dest.cd(clc_name) logger.info('Export collection "%s"', clc_name) schema = clc.schema.dumps() pod.write("_schema.json", json.dumps(schema).encode()) for srs in clc: # Read series self.export_series(pod, srs, file_type)
def export_series(self, pod, series, file_type)
-
Expand source code
def export_series(self, pod, series, file_type): if file_type == "csv": frm = series.frame() columns = list(frm) # Save series as csv in buff buff = StringIO() writer = csv.writer(buff) writer.writerow(columns) rows = zip(*(frm[c] for c in columns)) writer.writerows(rows) # Write generated content in pod buff.seek(0) pod.write(f"{series.label}.csv", buff.read().encode()) elif file_type == "parquet": df = series.df() data = df.to_parquet() pod.write(f"{series.label}.parquet", data) else: exit(f'Unsupported file type "{file_type}"')
def gc(self)
-
Loop on all series, collect all used digests, and delete obsolete ones.
Expand source code
def gc(self): """ Loop on all series, collect all used digests, and delete obsolete ones. """ logger.info("Start GC") # Collect digests across folders (_walk_folder ignore files # containing changelogs, it will only return segments of data) base_folders = self.pod.ls() with Pool() as pool: for folder in base_folders: pool.submit(self._walk_folder, folder) all_dig = set(chain(*pool.results)) # Collect digest from changelogs. Because commits are written # after the segments, we minimize chance to bury data created # concurrently. self.refresh() active_digests = set(self.registry.digests()) for namespace in self.registry.ls(): for clct in self.search(namespace=namespace): active_digests.update(clct.digests()) nb_hard_del = 0 nb_soft_del = 0 current_ts_ext = f".{hextime()}" deadline = hextime(time() - settings.timeout) # Soft-Delete ("bury") files on fs not in changelogs inactive = all_dig - active_digests for dig in inactive: if "." not in dig: # Disable digest folder, filename = hashed_path(dig) path = str(folder / filename) self.pod.mv(path, path + current_ts_ext, missing_ok=True) nb_soft_del += 1 continue # Inactive file, check timestamp & delete or re-enable it dig, ext = dig.split(".") if ext > deadline: # ext contains a ts created recently, we can not act on it yet continue folder, filename = hashed_path(dig) path = str(folder / filename) if dig in active_digests: # Re-enable by removing extension self.pod.mv(path + f".{ext}", path, missing_ok=True) else: # Permanent deletion self.pod.rm(path + f".{ext}", missing_ok=True) nb_hard_del += 1 logger.info( "End of GC (hard deletions: %s, soft deletions: %s)", nb_hard_del, nb_soft_del, ) return nb_hard_del, nb_soft_del
def import_collections(self, src, collections=None)
-
Import collections from given
src
. It can url accepted byPOD.from_uri
or aPOD
instance.collections
is the list of collections to load (all collection are loaded if not set).Expand source code
def import_collections(self, src, collections=None): """ Import collections from given `src`. It can url accepted by `POD.from_uri` or a `POD` instance. `collections` is the list of collections to load (all collection are loaded if not set). """ if not isinstance(src, POD): src = POD.from_uri(src) names = collections or src.ls() for clc_name in names: clc = self / clc_name pod = src.cd(clc_name) if clc is None: json_schema = pod.read("_schema.json").decode() schema = Schema.loads(json.loads(json_schema)) clc = self.create_collection(schema, clc_name) logger.info('Import collection "%s"', clc_name) with clc.multi(): for file_name in pod.ls(): if file_name.startswith("_"): continue self.import_series(pod, clc, file_name)
def import_series(self, from_pod, collection, filename)
-
Expand source code
def import_series(self, from_pod, collection, filename): stem, ext = filename.rsplit(".", 1) column_names = sorted(collection.schema) if ext == "csv": buff = StringIO(from_pod.read(filename).decode()) reader = csv.reader(buff) headers = next(reader) assert sorted(headers) == column_names columns = zip(*reader) frm = dict(zip(headers, columns)) srs = collection / stem srs.write(frm) elif ext == "parquet": from pandas import read_parquet buff = BytesIO(from_pod.read(filename)) df = read_parquet(buff) assert sorted(df.columns) == column_names srs = collection / stem srs.write(df) else: msg = f"Unable to load {filename}, extension not supported" raise ValueError(msg)
def ls(self)
-
Expand source code
def ls(self): return [item.label for item in self.search()]
def merge(self)
-
Merge repository registry. Needed when collections have been created or deleted concurrently.
Expand source code
def merge(self): """ Merge repository registry. Needed when collections have been created or deleted concurrently. """ return self.registry.merge()
def pull(self, remote, *labels, shallow=False)
-
- Pull revisions from
remote
Repo (and related segments). remote
- A
Repo
instance labels
- The collections to pull. If not given, all collections are pulled
Expand source code
def pull(self, remote, *labels, shallow=False): """ Pull revisions from `remote` Repo (and related segments). `remote` : A `lakota.repo.Repo` instance `labels` : The collections to pull. If not given, all collections are pulled """ assert isinstance(remote, Repo), "A Repo instance is required" # Pull registry self.registry.pull(remote.registry, shallow=shallow) # Extract frames local_cache = {loc.label: loc for loc in self.search()} remote_cache = {rem.label: rem for rem in remote.search()} if not labels: labels = remote_cache.keys() for label in labels: logger.info("Sync collection: %s", label) r_clct = remote_cache[label] if label not in local_cache: l_clct = self.create_collection(r_clct.schema, label) else: l_clct = local_cache[label] if l_clct.schema != r_clct.schema: msg = ( f'Unable to sync collection "{label}",' "incompatible meta-info." ) raise ValueError(msg) l_clct.pull(r_clct, shallow=shallow)
- Pull revisions from
def push(self, remote, *labels, shallow=False)
-
- Push local revisions (and related segments) to
remote
Repo. remote
- A
Repo
instance labels
- The collections to push. If not given, all collections are pushed
Expand source code
def push(self, remote, *labels, shallow=False): """ Push local revisions (and related segments) to `remote` Repo. `remote` : A `lakota.repo.Repo` instance `labels` : The collections to push. If not given, all collections are pushed """ return remote.pull(self, *labels, shallow=shallow)
- Push local revisions (and related segments) to
def refresh(self)
-
Expand source code
def refresh(self): self.registry.refresh()
def reify(self, name, meta)
-
Expand source code
def reify(self, name, meta): schema = Schema.loads(meta["schema"]) return Collection(name, schema, meta["path"], self)
def rename(self, from_label, to_label, namespace='collection')
-
Change the label of a collection
Expand source code
def rename(self, from_label, to_label, namespace="collection"): """ Change the label of a collection """ series = self.registry.series(namespace) frm = series.frame() idx = where(frm["label"] == from_label)[0] if len(idx) == 0: raise ValueError(f'Collection "{from_label}" does not exists') if to_label in frm["label"]: raise ValueError(f'Collection "{to_label}" already exists') # Extract bounds start, stop = frm.start(), frm.stop() # unpack idx idx = idx[0] # rebuild label list labels = list(frm["label"]) frm["label"] = labels[:idx] + [to_label] + labels[idx + 1:] # Re-order frame frm = frm.sorted() series.write( frm, # Make sure we over-write the previous content: start=min(frm.start(), start), stop=max(frm.stop(), stop), )
def search(self, label=None, namespace='collection')
-
Expand source code
def search(self, label=None, namespace="collection"): if label: start = stop = (label,) else: start = stop = None series = self.registry.series(namespace) frm = series.frame(start=start, stop=stop, closed="BOTH") for label in frm["label"]: yield self.collection(label, frm)