Module lakota.cli
Example usage
Create a collection, and a first series:
$ # Create a collection "temperature" with columns "timestamp" and "value"
$ lakota create temperature "timestamp timestamp*" "value float"
$ ls .lakota/ # the default repository is the .lakota directory
00 2e 6d
$ cat input.csv # Some input data that contains some timestamps and values
2020-06-22,25
2020-06-23,24
2020-06-24,27
2020-06-25,31
2020-06-26,32
2020-06-27,30
$ # Write into a series "Brussels"
$ cat input.csv | lakota write temperature/Brussels
Read the data back:
$ lakota ls # list collections
collection
temperature
$ lakota ls temperature # list series
series
Brussels
$ lakota read temperature/Brussels # read series
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.0
2020-06-24T00:00:00,27.0
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.0
2020-06-27T00:00:00,30.0
$ lakota -P read temperature/Brussels # with pretty-print
timestamp value
------------------- -------
2020-06-22T00:00:00 25
2020-06-23T00:00:00 24
2020-06-24T00:00:00 27
2020-06-25T00:00:00 31
2020-06-26T00:00:00 32
2020-06-27T00:00:00 30
Update the series:
$ cat input-corrected.csv # New values
2020-06-23,24.2
2020-06-24,27.9
2020-06-25,31.0
2020-06-26,32.5
2020-06-27,30.1
2020-06-28,29.2
$ cat input-corrected.csv | lakota write temperature/Brussels
$ lakota read temperature/Brussels
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
Show revisions and clean history:
$ lakota rev temperature # show revisions
Revision: 00000000000-0000000000000000000000000000000000000000.176618fecf1-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620
Date: 2020-12-14 15:03:10.961000
Revision: 176618fecf1-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620.17661929034-08d4cd873f7900d89d78e589acdbf54524da45c7*
Date: 2020-12-14 15:06:03.828000
$ lakota defrag temperature # Rewrite the timeseries into larger segments
$ lakota trim temperature # Remove older revs
$ lakota rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.176619478d1-425a9d4eaa62dad4e560875834ed79a17238c6a6*
Date: 2020-12-14 15:08:08.913000
Read supports basic group-by operations and filters:
$ # compute max value on group by month
$ lakota read temperature/Brussels "(floor self.timestamp 'M')" "(max self.value)"
(floor self.timestamp 'M'),(max self.value)
2020-06-01T00:00:00,32.5
$ lakota read temperature/Brussels --mask "(< self.value 28)"
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
Built-in help:
$ lakota --help
usage: lakota [-h] [--repo REPO] [--timing] [--pretty] [--verbose]
{read,len,rev,ls,trim,defrag,push,pull,create,write,delete,truncate,gc,help,version}
...
positional arguments:
{read,len,rev,ls,trim,defrag,push,pull,create,write,delete,truncate,gc,help,version}
optional arguments:
-h, --help show this help message and exit
--repo REPO, -r REPO Lakota repo (default: file://.lakota
--timing, -t Enable timing
--pretty, -P Tabulate output
--verbose, -v Increase verbosity
$ lakota read --help
usage: lakota read [-h] [--limit LIMIT] [--offset OFFSET]
[--paginate PAGINATE] [--before BEFORE] [--mask MASK]
[--greater-than GREATER_THAN [GREATER_THAN ...]]
[--less-than LESS_THAN [LESS_THAN ...]]
label [columns [columns ...]]
positional arguments:
label
columns
optional arguments:
-h, --help show this help message and exit
--limit LIMIT, -l LIMIT
--offset OFFSET, -o OFFSET
--paginate PAGINATE, -p PAGINATE
--before BEFORE, -B BEFORE
--mask MASK, -m MASK
--greater-than GREATER_THAN [GREATER_THAN ...], --gt GREATER_THAN [GREATER_THAN ...]
Keep rows where index is bigger the given value
--less-than LESS_THAN [LESS_THAN ...], --lt LESS_THAN [LESS_THAN ...]
Keep rows where index is less than given value
Most sub commands come with extra doc:
$ lakota help ls
List collections in a repo
...
Push & pull
Create two repo and write in both:
$ lakota -r repo_A create temperature "timestamp timestamp*" "value float"
$ lakota -r repo_B create temperature "timestamp timestamp*" "value float"
$ cat input.csv | lakota -r repo_A write temperature/Brussels
$ cat input-corrected.csv | lakota -r repo_B write temperature/Brussels
$ lakota -r repo_A read temperature/Brussels
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.0
2020-06-24T00:00:00,27.0
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.0
2020-06-27T00:00:00,30.0
$ lakota -r repo_B read temperature/Brussels
timestamp,value
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
Pull and compare revisions:
$ lakota -r repo_A pull repo_B
$ lakota -r repo_A rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.17661e795b5-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620*
Date: 2020-12-14 16:38:55.797000
Revision: 00000000000-0000000000000000000000000000000000000000.17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b*
Date: 2020-12-14 16:39:05.643000
$ lakota -r repo_B rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b*
Date: 2020-12-14 16:39:05.643000
Merge and check merged data:
$ lakota -r repo_A merge temperature
$ lakota -r repo_A rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.17661e795b5-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620
Date: 2020-12-14 16:38:55.797000
Revision: 17661e795b5-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620.17661ec08ee-08d4cd873f7900d89d78e589acdbf54524da45c7*
Date: 2020-12-14 16:43:47.438000
Revision: 00000000000-0000000000000000000000000000000000000000.17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b
Date: 2020-12-14 16:39:05.643000
Revision: 17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b.17661ec08ef-08d4cd873f7900d89d78e589acdbf54524da45c7*
Date: 2020-12-14 16:43:47.439000
$ lakota -r repo_A read temperature/Brussels
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
Expand source code
"""
# Example usage
Create a collection, and a first series:
```shell
$ # Create a collection "temperature" with columns "timestamp" and "value"
$ lakota create temperature "timestamp timestamp*" "value float"
$ ls .lakota/ # the default repository is the .lakota directory
00 2e 6d
$ cat input.csv # Some input data that contains some timestamps and values
2020-06-22,25
2020-06-23,24
2020-06-24,27
2020-06-25,31
2020-06-26,32
2020-06-27,30
$ # Write into a series "Brussels"
$ cat input.csv | lakota write temperature/Brussels
```
Read the data back:
```shell
$ lakota ls # list collections
collection
temperature
$ lakota ls temperature # list series
series
Brussels
$ lakota read temperature/Brussels # read series
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.0
2020-06-24T00:00:00,27.0
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.0
2020-06-27T00:00:00,30.0
$ lakota -P read temperature/Brussels # with pretty-print
timestamp value
------------------- -------
2020-06-22T00:00:00 25
2020-06-23T00:00:00 24
2020-06-24T00:00:00 27
2020-06-25T00:00:00 31
2020-06-26T00:00:00 32
2020-06-27T00:00:00 30
```
Update the series:
```shell
$ cat input-corrected.csv # New values
2020-06-23,24.2
2020-06-24,27.9
2020-06-25,31.0
2020-06-26,32.5
2020-06-27,30.1
2020-06-28,29.2
$ cat input-corrected.csv | lakota write temperature/Brussels
$ lakota read temperature/Brussels
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
```
Show revisions and clean history:
```shell
$ lakota rev temperature # show revisions
Revision: 00000000000-0000000000000000000000000000000000000000.176618fecf1-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620
Date: 2020-12-14 15:03:10.961000
Revision: 176618fecf1-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620.17661929034-08d4cd873f7900d89d78e589acdbf54524da45c7*
Date: 2020-12-14 15:06:03.828000
$ lakota defrag temperature # Rewrite the timeseries into larger segments
$ lakota trim temperature # Remove older revs
$ lakota rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.176619478d1-425a9d4eaa62dad4e560875834ed79a17238c6a6*
Date: 2020-12-14 15:08:08.913000
```
Read supports basic group-by operations and filters:
```shell
$ # compute max value on group by month
$ lakota read temperature/Brussels "(floor self.timestamp 'M')" "(max self.value)"
(floor self.timestamp 'M'),(max self.value)
2020-06-01T00:00:00,32.5
$ lakota read temperature/Brussels --mask "(< self.value 28)"
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
```
Built-in help:
```shell
$ lakota --help
usage: lakota [-h] [--repo REPO] [--timing] [--pretty] [--verbose]
{read,len,rev,ls,trim,defrag,push,pull,create,write,delete,truncate,gc,help,version}
...
positional arguments:
{read,len,rev,ls,trim,defrag,push,pull,create,write,delete,truncate,gc,help,version}
optional arguments:
-h, --help show this help message and exit
--repo REPO, -r REPO Lakota repo (default: file://.lakota
--timing, -t Enable timing
--pretty, -P Tabulate output
--verbose, -v Increase verbosity
$ lakota read --help
usage: lakota read [-h] [--limit LIMIT] [--offset OFFSET]
[--paginate PAGINATE] [--before BEFORE] [--mask MASK]
[--greater-than GREATER_THAN [GREATER_THAN ...]]
[--less-than LESS_THAN [LESS_THAN ...]]
label [columns [columns ...]]
positional arguments:
label
columns
optional arguments:
-h, --help show this help message and exit
--limit LIMIT, -l LIMIT
--offset OFFSET, -o OFFSET
--paginate PAGINATE, -p PAGINATE
--before BEFORE, -B BEFORE
--mask MASK, -m MASK
--greater-than GREATER_THAN [GREATER_THAN ...], --gt GREATER_THAN [GREATER_THAN ...]
Keep rows where index is bigger the given value
--less-than LESS_THAN [LESS_THAN ...], --lt LESS_THAN [LESS_THAN ...]
Keep rows where index is less than given value
```
Most sub commands come with extra doc:
```
$ lakota help ls
List collections in a repo
...
```
# Push & pull
Create two repo and write in both:
```shell
$ lakota -r repo_A create temperature "timestamp timestamp*" "value float"
$ lakota -r repo_B create temperature "timestamp timestamp*" "value float"
$ cat input.csv | lakota -r repo_A write temperature/Brussels
$ cat input-corrected.csv | lakota -r repo_B write temperature/Brussels
$ lakota -r repo_A read temperature/Brussels
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.0
2020-06-24T00:00:00,27.0
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.0
2020-06-27T00:00:00,30.0
$ lakota -r repo_B read temperature/Brussels
timestamp,value
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
```
Pull and compare revisions:
```shell
$ lakota -r repo_A pull repo_B
$ lakota -r repo_A rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.17661e795b5-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620*
Date: 2020-12-14 16:38:55.797000
Revision: 00000000000-0000000000000000000000000000000000000000.17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b*
Date: 2020-12-14 16:39:05.643000
$ lakota -r repo_B rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b*
Date: 2020-12-14 16:39:05.643000
```
Merge and check merged data:
```shell
$ lakota -r repo_A merge temperature
$ lakota -r repo_A rev temperature
Revision: 00000000000-0000000000000000000000000000000000000000.17661e795b5-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620
Date: 2020-12-14 16:38:55.797000
Revision: 17661e795b5-1b49944eecf9fd02fb13c0f0ac2e92f4e9d62620.17661ec08ee-08d4cd873f7900d89d78e589acdbf54524da45c7*
Date: 2020-12-14 16:43:47.438000
Revision: 00000000000-0000000000000000000000000000000000000000.17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b
Date: 2020-12-14 16:39:05.643000
Revision: 17661e7bc2b-8fb3766613e5f5a9e9556178c371e0ea3695930b.17661ec08ef-08d4cd873f7900d89d78e589acdbf54524da45c7*
Date: 2020-12-14 16:43:47.439000
$ lakota -r repo_A read temperature/Brussels
timestamp,value
2020-06-22T00:00:00,25.0
2020-06-23T00:00:00,24.2
2020-06-24T00:00:00,27.9
2020-06-25T00:00:00,31.0
2020-06-26T00:00:00,32.5
2020-06-27T00:00:00,30.1
2020-06-28T00:00:00,29.2
```
"""
import argparse
import csv
import json
import os
import sys
from datetime import datetime
from itertools import chain
from tabulate import tabulate
from . import __version__
from .pod import POD
from .repo import Repo
from .schema import Schema
from .utils import logger, strpt, timeit
# Take default repo from env variable, fallback to .lakota in current dir
default_repo = os.environ.get("LAKOTA_REPO", "file:///.lakota")
def get_repo(args):
return Repo(args.repo or default_repo)
def get_collection(repo, label):
collection = repo / label
if collection:
return collection
match = [c for c in repo.ls() if c.startswith(label)]
if len(match) == 1:
return repo / match[0]
exit(f'Collection "{label}" not found')
def get_series(repo, label, auto_create=False):
if not "/" in label:
exit('Label argument should have the form "collection/series"')
c_label, s_label = label.split("/", 1)
collection = get_collection(repo, c_label)
if auto_create or label in collection:
return collection / s_label
match = [s for s in collection.ls() if s.startswith(s_label)]
if len(match) == 1:
return collection / match[0]
elif s_label in match:
return collection / s_label
exit(f"Series '{label}' not found")
def read(args):
"""
Basic usage:
```
$ lakota read my_collection/my_series
$ lakota read my_collection/my_series --limit 10 --offset 10
$ lakota read my_collection/my_series --greater-than 2020-01-01
```
Group-by and aggregate
```
$ lakota read my_collection/my_series '(floor self.timestamp "Y")' "(max self.value)"
```
Explore past revisions
```
$ lakota read my_collection/my_series--before 2021-01-01
```
Filter results
```
lakota read my_collection/my_series --mask "(< self.some_field 42)
```
"""
repo = get_repo(args)
series = get_series(repo, args.label)
reduce = False
if not args.columns:
columns = list(series.schema.columns)
elif any("(" in c for c in args.columns):
columns = list(series.schema.columns)
reduce = True
else:
columns = args.columns
kw = {
"start": args.greater_than,
"stop": args.less_than,
"limit": args.limit,
"offset": args.offset,
"before": args.before,
"select": columns,
"closed": args.closed,
}
if args.paginate:
frames = series.paginate(args.paginate, **kw)
elif args.tail:
frames = [series.tail(args.tail, **kw)]
else:
frames = [series.frame(**kw)]
if args.mask:
frames = (frm.mask(args.mask) for frm in frames)
if reduce:
kw = {c: c for c in args.columns}
frames = (f.reduce(**kw) for f in frames)
# Peek at first frame to get the colums
first = next(frames)
columns = list(first)
frames = chain([first], frames)
if args.pretty:
for frm in frames:
rows = zip(*(frm[col] for col in columns))
if len(frm) == 0:
print(tabulate([], headers=columns))
else:
print(tabulate(rows, headers=columns))
else:
writer = csv.writer(sys.stdout)
writer.writerow(columns)
for frm in frames:
rows = zip(*(frm[col] for col in columns))
writer.writerows(rows)
def export(args):
repo = get_repo(args)
repo.export_collections(args.uri, args.collection, args.file_type)
def import_(args):
repo = get_repo(args)
repo.import_collections(args.uri, args.collection)
def length(args):
"""
Show total length of a collection/series
```
$ lakota len my_collection
$ lakota len my_collection/my_series
```
"""
repo = get_repo(args)
label = args.label
if "/" in args.label:
series = [get_series(repo, label)]
else:
repo = get_repo(args)
clc = get_collection(repo, label)
if clc is None:
exit(f'Collection "{label}" not found')
series = list(clc)
print(sum(len(s) for s in series))
def rev(args):
"""
Show Revision
```
$ lakota rev my_collection # -e for extended output
```
"""
repo = get_repo(args)
if args.label:
if "/" in args.label:
series = get_series(repo, args.label)
collection = series.collection
else:
series = None
collection = get_collection(repo, args.label)
if collection is None:
exit(f"Collection '{args.label}' not found")
else:
collection = repo.registry
series = None
fmt = lambda a: " / ".join(map(str, a))
for rev in collection.changelog.log():
timestamp = str(datetime.fromtimestamp(int(rev.epoch, 16) / 1000))
ci = rev.commit(collection)
print(
f"""
Revision: {rev.path}{"*" if rev.is_leaf else ""}
Date: {timestamp}
Total length: {sum(ci.length)}
"""
)
if not args.extended:
continue
if series is not None:
ci = ci.mask(ci.label == series.label)
starts = list(map(fmt, zip(*ci.start.values())))
stops = list(map(fmt, zip(*ci.stop.values())))
headers = ["label", "start", "stop", "length", "closed"]
columns = [ci.label, starts, stops, ci.length, ci.closed]
if args.extended > 1:
digests = list(map(fmt, zip(*ci.digest.values())))
headers.append("digests")
columns.append(digests)
rows = zip(*columns)
print(tabulate(rows, headers=headers))
print()
def ls(args):
"""
List collections in a repo
```
$ lakota ls
```
List series in a collection
```
$ lakota ls my_collection
```
"""
repo = get_repo(args)
if args.label:
collection = get_collection(repo, args.label)
header = "series"
else:
collection = repo
header = "collection"
rows = [[label] for label in collection.ls()]
if args.pretty:
print(tabulate(rows, headers=[header]))
else:
writer = csv.writer(sys.stdout)
writer.writerow([header])
writer.writerows(rows)
def create(args):
"""
Create a collection, the `*` indicate the index columns
```
$ lakota create my_collection "timestamp timestamp*" "value float"
$ lakota create another_collection "timestamp timestamp*" "category str*" "value float"
```
"""
repo = get_repo(args)
collection = args.label
columns = dict(col.split(maxsplit=1) for col in args.columns)
schema = Schema(**columns)
repo.create_collection(schema, collection)
def write(args):
"""
Write is done through stdin
```
$ cat some_file.csv | lakota write my_collection/my_series
```
"""
repo = get_repo(args)
series = get_series(repo, args.label, auto_create=True)
reader = csv.reader(sys.stdin)
headers = next(reader)
columns = list(zip(*reader))
df = dict(zip(headers, columns))
series.write(df)
def merge(args):
"""
Merge a collection
```
$ lakota merge my_collection
```
"""
repo = get_repo(args)
collection = repo / args.label
if not collection:
exit(f"Collection {args.label} not found")
collection.merge()
def defrag(args):
"""
Defrag changelog of given series. If no series is given, defrag
repo changelog.
"""
repo = get_repo(args)
labels = repo.ls() if args.all else args.labels
if labels:
for label in labels:
collection = get_collection(repo, label)
if not collection:
exit(f'Collection "{label}" not found')
collection.defrag()
if args.all or not args.labels:
repo.registry.defrag()
def trim(args):
"""
Trim changelog of given series. If no series is given, trim
repo changelog.
"""
repo = get_repo(args)
labels = repo.ls() if args.all else args.labels
if labels:
for label in labels:
collection = get_collection(repo, label)
if not collection:
exit(f'Collection "{label}" not found')
collection.trim(args.before)
if args.all or not args.labels:
repo.registry.trim(args.before)
def push(args):
"""
Push (the local repo in `.lakota`) to a remote repo
```
$ lakota push some_remote_repo
```
Push `some_repo` to `another_repo`
```
$ lakota -r some_repo push another_repo
```
"""
repo = get_repo(args)
remote_repo = Repo(args.remote)
repo.push(remote_repo, *args.labels, shallow=args.shallow)
def pull(args):
"""
Similar to `push`, but with direction inversed
"""
repo = get_repo(args)
remote_reg = Repo(args.remote)
repo.pull(remote_reg, *args.labels, shallow=args.shallow)
def delete(args):
"""
Delete a series or a collection
```
$ lakota delete my_collection/my_series
```
Delete a collection
```
$ lakota delete my_collection
```
"""
repo = get_repo(args)
if "/" in args.label:
srs = get_series(args.label)
srs.collection.delete(srs.label)
else:
clc = get_collection(repo, args.label)
repo.delete(clc.label)
def gc(args):
"""
Garbage-collec a repository
```
$ lakota gc
```
"""
repo = get_repo(args)
hard, _ = repo.gc()
print(f"{hard} segments deleted")
def serve(args):
try:
from lakota import server
except ImportError:
raise
exit("Please install flask to run server")
repo_map = {}
for item in args.repo_map:
name, *uris = item.split()
if not uris:
raise ValueError("Missing uri in repo-map argument")
repo_map[name] = uris
server.run(repo_map, args.web_uri, debug=args.verbose)
def print_help(parser, args):
cmd = args.help_cmd and globals().get(args.help_cmd)
if cmd and cmd.__doc__:
print(cmd.__doc__)
if cmd:
parser.parse_args([args.help_cmd, "-h"])
def bool_like(v):
v = v.lower()
if v in ("yes", "true", "t", "y", "1"):
return True
elif v in ("no", "false", "f", "n", "0"):
return False
raise argparse.ArgumentTypeError("Boolean value expected.")
def datetime_like(v):
try:
return strpt(v)
except:
raise argparse.ArgumentTypeError("Datetime-like value expected.")
def run():
# top-level parser
parser = argparse.ArgumentParser(
prog="lakota",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--repo",
"-r",
action="append",
help=f"Lakota repo (default: {default_repo}",
)
parser.add_argument("--timing", "-t", action="store_true", help="Enable timing")
parser.add_argument("--pretty", "-P", action="store_true", help="Tabulate output")
parser.add_argument(
"--verbose", "-v", action="count", help="Increase verbosity", default=0
)
subparsers = parser.add_subparsers(dest="command")
# Add read command
parser_read = subparsers.add_parser("read")
parser_read.add_argument("label")
parser_read.add_argument("columns", nargs="*")
parser_read.add_argument("--limit", "-l", type=int, default=None)
parser_read.add_argument("--offset", "-o", type=int, default=None)
parser_read.add_argument("--paginate", "-p", type=int, default=None)
parser_read.add_argument("--tail", "-t", type=int, default=None)
parser_read.add_argument("--before", "-B", default=None, type=datetime_like)
parser_read.add_argument("--mask", "-m", type=str, default=None)
parser_read.add_argument(
"--greater-than", # TODO rename into --ge (and provide proper --gt), get rid of --closed
"--gt",
action="append",
help="Keep rows where index is bigger the given value",
)
parser_read.add_argument(
"--less-than",
"--lt",
action="append",
help="Keep rows where index is less than given value",
)
parser_read.add_argument(
"--closed",
"-c",
type=str,
default="LEFT",
help="Include or exclude the bounds of interval defined by --gt"
' and --lt (defaults to "LEFT" or "l", other possible values: '
"RIGHT, r, BOTH, b and NONE, n)",
)
parser_read.set_defaults(func=read)
# Add export command
parser_export = subparsers.add_parser("export")
parser_export.add_argument("uri", help="Where to save the export")
parser_export.add_argument(
"--collection",
"-c",
action="append",
help="Export only the given collection(s)",
)
parser_export.add_argument(
"--file-type", "-T", default="csv", help="File type: csv (default) or parquet "
)
parser_export.set_defaults(func=export)
parser_import = subparsers.add_parser("import")
parser_import.add_argument("uri", help="From where to import collections")
parser_import.add_argument(
"--collection",
"-c",
action="append",
help="Import only the given collection(s)",
)
parser_import.set_defaults(func=import_)
# Add len command
parser_len = subparsers.add_parser("length", aliases=["len"])
parser_len.add_argument("label")
parser_len.set_defaults(func=length)
# Add rev command
parser_rev = subparsers.add_parser("rev")
parser_rev.add_argument("label", nargs="?")
parser_rev.add_argument(
"-e", "--extended", action="count", default=0, help="Extended output"
)
parser_rev.set_defaults(func=rev)
# Add len command
parser_ls = subparsers.add_parser("ls")
parser_ls.add_argument("label", nargs="?")
parser_ls.set_defaults(func=ls)
# Add defrag command
parser_defrag = subparsers.add_parser("defrag")
parser_defrag.add_argument("labels", nargs="*")
parser_defrag.add_argument(
"-a", "--all", action="store_true", help="Defrag all collections"
)
parser_defrag.set_defaults(func=defrag)
# Add trim command
parser_trim = subparsers.add_parser("trim")
parser_trim.add_argument("labels", nargs="*")
parser_trim.add_argument(
"-b",
"--before",
type=datetime_like,
help="Delete revisions older than given date",
)
parser_trim.add_argument("-a", "--all", action="store_true", help="Trim everything")
parser_trim.set_defaults(func=trim)
# Add push command
parser_push = subparsers.add_parser("push")
parser_push.add_argument("remote")
parser_push.add_argument(
"labels", nargs="*", help="Collection to push (all if not set)"
)
parser_push.add_argument(
"-s",
"--shallow",
action="store_true",
help="Shallow push (send only last revision)",
)
parser_push.set_defaults(func=push)
# Add pull command
parser_pull = subparsers.add_parser("pull")
parser_pull.add_argument("remote")
parser_pull.add_argument(
"labels", nargs="*", help="Collection to pull (all if not set)"
)
parser_pull.add_argument(
"-s",
"--shallow",
action="store_true",
help="Shallow pull (fetch only last revision)",
)
parser_pull.set_defaults(func=pull)
# Add create command
parser_create = subparsers.add_parser("create")
parser_create.add_argument("label")
parser_create.add_argument("columns", nargs="+")
parser_create.set_defaults(func=create)
# Add write command
parser_write = subparsers.add_parser("write")
parser_write.add_argument("label")
# TODO add --update flag to do Series.update instead of Series.write
parser_write.set_defaults(func=write)
# Add merge command
parser_write = subparsers.add_parser("merge")
parser_write.add_argument("label")
parser_write.set_defaults(func=merge)
# Add delete command
parser_delete = subparsers.add_parser("delete", aliases=["del"])
parser_delete.add_argument("label", help="collection or series to delete")
parser_delete.set_defaults(func=delete)
# Add gc command
parser_gc = subparsers.add_parser("gc")
parser_gc.set_defaults(func=gc)
# Add help command
parser_help = subparsers.add_parser("help")
parser_help.add_argument("help_cmd", nargs="?")
parser_help.set_defaults(func=lambda args: print_help(parser, args))
# Add version command
parser_len = subparsers.add_parser("version")
parser_len.set_defaults(func=lambda *a: print(__version__))
# Add serve command
parser_serve = subparsers.add_parser("serve")
parser_serve.add_argument(
"repo_map",
nargs="+",
metavar="repo-map",
help="space-separated mapping of name and repo uri. "
"Example: 'my-local-repo file://.lakota' 'a-remote-one http://host/foobar'. "
"Use '/' as name to serve repo at root.",
)
parser_serve.add_argument(
"-w",
"--web-uri",
nargs="?",
default="http://127.0.0.1:8080",
help="Base url at which the repo will be served",
)
parser_serve.set_defaults(func=serve)
# Parse args
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Enable logging
if args.verbose == 1:
logger.setLevel("INFO")
elif args.verbose > 1:
logger.setLevel("DEBUG")
# Execute command
try:
if args.timing:
with timeit(f"Timing ({args.command}):"):
args.func(args)
else:
args.func(args)
except (BrokenPipeError, KeyboardInterrupt):
pass
Functions
def bool_like(v)
-
Expand source code
def bool_like(v): v = v.lower() if v in ("yes", "true", "t", "y", "1"): return True elif v in ("no", "false", "f", "n", "0"): return False raise argparse.ArgumentTypeError("Boolean value expected.")
def create(args)
-
Create a collection, the
*
indicate the index columns$ lakota create my_collection "timestamp timestamp*" "value float" $ lakota create another_collection "timestamp timestamp*" "category str*" "value float"
Expand source code
def create(args): """ Create a collection, the `*` indicate the index columns ``` $ lakota create my_collection "timestamp timestamp*" "value float" $ lakota create another_collection "timestamp timestamp*" "category str*" "value float" ``` """ repo = get_repo(args) collection = args.label columns = dict(col.split(maxsplit=1) for col in args.columns) schema = Schema(**columns) repo.create_collection(schema, collection)
def datetime_like(v)
-
Expand source code
def datetime_like(v): try: return strpt(v) except: raise argparse.ArgumentTypeError("Datetime-like value expected.")
def defrag(args)
-
Defrag changelog of given series. If no series is given, defrag repo changelog.
Expand source code
def defrag(args): """ Defrag changelog of given series. If no series is given, defrag repo changelog. """ repo = get_repo(args) labels = repo.ls() if args.all else args.labels if labels: for label in labels: collection = get_collection(repo, label) if not collection: exit(f'Collection "{label}" not found') collection.defrag() if args.all or not args.labels: repo.registry.defrag()
def delete(args)
-
Delete a series or a collection
$ lakota delete my_collection/my_series
Delete a collection
$ lakota delete my_collection
Expand source code
def delete(args): """ Delete a series or a collection ``` $ lakota delete my_collection/my_series ``` Delete a collection ``` $ lakota delete my_collection ``` """ repo = get_repo(args) if "/" in args.label: srs = get_series(args.label) srs.collection.delete(srs.label) else: clc = get_collection(repo, args.label) repo.delete(clc.label)
def export(args)
-
Expand source code
def export(args): repo = get_repo(args) repo.export_collections(args.uri, args.collection, args.file_type)
def gc(args)
-
Garbage-collec a repository
$ lakota gc
Expand source code
def gc(args): """ Garbage-collec a repository ``` $ lakota gc ``` """ repo = get_repo(args) hard, _ = repo.gc() print(f"{hard} segments deleted")
def get_collection(repo, label)
-
Expand source code
def get_collection(repo, label): collection = repo / label if collection: return collection match = [c for c in repo.ls() if c.startswith(label)] if len(match) == 1: return repo / match[0] exit(f'Collection "{label}" not found')
def get_repo(args)
-
Expand source code
def get_repo(args): return Repo(args.repo or default_repo)
def get_series(repo, label, auto_create=False)
-
Expand source code
def get_series(repo, label, auto_create=False): if not "/" in label: exit('Label argument should have the form "collection/series"') c_label, s_label = label.split("/", 1) collection = get_collection(repo, c_label) if auto_create or label in collection: return collection / s_label match = [s for s in collection.ls() if s.startswith(s_label)] if len(match) == 1: return collection / match[0] elif s_label in match: return collection / s_label exit(f"Series '{label}' not found")
def import_(args)
-
Expand source code
def import_(args): repo = get_repo(args) repo.import_collections(args.uri, args.collection)
def length(args)
-
Show total length of a collection/series
$ lakota len my_collection $ lakota len my_collection/my_series
Expand source code
def length(args): """ Show total length of a collection/series ``` $ lakota len my_collection $ lakota len my_collection/my_series ``` """ repo = get_repo(args) label = args.label if "/" in args.label: series = [get_series(repo, label)] else: repo = get_repo(args) clc = get_collection(repo, label) if clc is None: exit(f'Collection "{label}" not found') series = list(clc) print(sum(len(s) for s in series))
def ls(args)
-
List collections in a repo
$ lakota ls
List series in a collection
$ lakota ls my_collection
Expand source code
def ls(args): """ List collections in a repo ``` $ lakota ls ``` List series in a collection ``` $ lakota ls my_collection ``` """ repo = get_repo(args) if args.label: collection = get_collection(repo, args.label) header = "series" else: collection = repo header = "collection" rows = [[label] for label in collection.ls()] if args.pretty: print(tabulate(rows, headers=[header])) else: writer = csv.writer(sys.stdout) writer.writerow([header]) writer.writerows(rows)
def merge(args)
-
Merge a collection
$ lakota merge my_collection
Expand source code
def merge(args): """ Merge a collection ``` $ lakota merge my_collection ``` """ repo = get_repo(args) collection = repo / args.label if not collection: exit(f"Collection {args.label} not found") collection.merge()
def print_help(parser, args)
-
Expand source code
def print_help(parser, args): cmd = args.help_cmd and globals().get(args.help_cmd) if cmd and cmd.__doc__: print(cmd.__doc__) if cmd: parser.parse_args([args.help_cmd, "-h"])
def pull(args)
-
Similar to
push()
, but with direction inversedExpand source code
def pull(args): """ Similar to `push`, but with direction inversed """ repo = get_repo(args) remote_reg = Repo(args.remote) repo.pull(remote_reg, *args.labels, shallow=args.shallow)
def push(args)
-
Push (the local repo in
.lakota
) to a remote repo$ lakota push some_remote_repo
Push
some_repo
toanother_repo
$ lakota -r some_repo push another_repo
Expand source code
def push(args): """ Push (the local repo in `.lakota`) to a remote repo ``` $ lakota push some_remote_repo ``` Push `some_repo` to `another_repo` ``` $ lakota -r some_repo push another_repo ``` """ repo = get_repo(args) remote_repo = Repo(args.remote) repo.push(remote_repo, *args.labels, shallow=args.shallow)
def read(args)
-
Basic usage:
$ lakota read my_collection/my_series $ lakota read my_collection/my_series --limit 10 --offset 10 $ lakota read my_collection/my_series --greater-than 2020-01-01
Group-by and aggregate
$ lakota read my_collection/my_series '(floor self.timestamp "Y")' "(max self.value)"
Explore past revisions
$ lakota read my_collection/my_series--before 2021-01-01
Filter results
lakota read my_collection/my_series --mask "(< self.some_field 42)
Expand source code
def read(args): """ Basic usage: ``` $ lakota read my_collection/my_series $ lakota read my_collection/my_series --limit 10 --offset 10 $ lakota read my_collection/my_series --greater-than 2020-01-01 ``` Group-by and aggregate ``` $ lakota read my_collection/my_series '(floor self.timestamp "Y")' "(max self.value)" ``` Explore past revisions ``` $ lakota read my_collection/my_series--before 2021-01-01 ``` Filter results ``` lakota read my_collection/my_series --mask "(< self.some_field 42) ``` """ repo = get_repo(args) series = get_series(repo, args.label) reduce = False if not args.columns: columns = list(series.schema.columns) elif any("(" in c for c in args.columns): columns = list(series.schema.columns) reduce = True else: columns = args.columns kw = { "start": args.greater_than, "stop": args.less_than, "limit": args.limit, "offset": args.offset, "before": args.before, "select": columns, "closed": args.closed, } if args.paginate: frames = series.paginate(args.paginate, **kw) elif args.tail: frames = [series.tail(args.tail, **kw)] else: frames = [series.frame(**kw)] if args.mask: frames = (frm.mask(args.mask) for frm in frames) if reduce: kw = {c: c for c in args.columns} frames = (f.reduce(**kw) for f in frames) # Peek at first frame to get the colums first = next(frames) columns = list(first) frames = chain([first], frames) if args.pretty: for frm in frames: rows = zip(*(frm[col] for col in columns)) if len(frm) == 0: print(tabulate([], headers=columns)) else: print(tabulate(rows, headers=columns)) else: writer = csv.writer(sys.stdout) writer.writerow(columns) for frm in frames: rows = zip(*(frm[col] for col in columns)) writer.writerows(rows)
def rev(args)
-
Show Revision
$ lakota rev my_collection # -e for extended output
Expand source code
def rev(args): """ Show Revision ``` $ lakota rev my_collection # -e for extended output ``` """ repo = get_repo(args) if args.label: if "/" in args.label: series = get_series(repo, args.label) collection = series.collection else: series = None collection = get_collection(repo, args.label) if collection is None: exit(f"Collection '{args.label}' not found") else: collection = repo.registry series = None fmt = lambda a: " / ".join(map(str, a)) for rev in collection.changelog.log(): timestamp = str(datetime.fromtimestamp(int(rev.epoch, 16) / 1000)) ci = rev.commit(collection) print( f""" Revision: {rev.path}{"*" if rev.is_leaf else ""} Date: {timestamp} Total length: {sum(ci.length)} """ ) if not args.extended: continue if series is not None: ci = ci.mask(ci.label == series.label) starts = list(map(fmt, zip(*ci.start.values()))) stops = list(map(fmt, zip(*ci.stop.values()))) headers = ["label", "start", "stop", "length", "closed"] columns = [ci.label, starts, stops, ci.length, ci.closed] if args.extended > 1: digests = list(map(fmt, zip(*ci.digest.values()))) headers.append("digests") columns.append(digests) rows = zip(*columns) print(tabulate(rows, headers=headers)) print()
def run()
-
Expand source code
def run(): # top-level parser parser = argparse.ArgumentParser( prog="lakota", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--repo", "-r", action="append", help=f"Lakota repo (default: {default_repo}", ) parser.add_argument("--timing", "-t", action="store_true", help="Enable timing") parser.add_argument("--pretty", "-P", action="store_true", help="Tabulate output") parser.add_argument( "--verbose", "-v", action="count", help="Increase verbosity", default=0 ) subparsers = parser.add_subparsers(dest="command") # Add read command parser_read = subparsers.add_parser("read") parser_read.add_argument("label") parser_read.add_argument("columns", nargs="*") parser_read.add_argument("--limit", "-l", type=int, default=None) parser_read.add_argument("--offset", "-o", type=int, default=None) parser_read.add_argument("--paginate", "-p", type=int, default=None) parser_read.add_argument("--tail", "-t", type=int, default=None) parser_read.add_argument("--before", "-B", default=None, type=datetime_like) parser_read.add_argument("--mask", "-m", type=str, default=None) parser_read.add_argument( "--greater-than", # TODO rename into --ge (and provide proper --gt), get rid of --closed "--gt", action="append", help="Keep rows where index is bigger the given value", ) parser_read.add_argument( "--less-than", "--lt", action="append", help="Keep rows where index is less than given value", ) parser_read.add_argument( "--closed", "-c", type=str, default="LEFT", help="Include or exclude the bounds of interval defined by --gt" ' and --lt (defaults to "LEFT" or "l", other possible values: ' "RIGHT, r, BOTH, b and NONE, n)", ) parser_read.set_defaults(func=read) # Add export command parser_export = subparsers.add_parser("export") parser_export.add_argument("uri", help="Where to save the export") parser_export.add_argument( "--collection", "-c", action="append", help="Export only the given collection(s)", ) parser_export.add_argument( "--file-type", "-T", default="csv", help="File type: csv (default) or parquet " ) parser_export.set_defaults(func=export) parser_import = subparsers.add_parser("import") parser_import.add_argument("uri", help="From where to import collections") parser_import.add_argument( "--collection", "-c", action="append", help="Import only the given collection(s)", ) parser_import.set_defaults(func=import_) # Add len command parser_len = subparsers.add_parser("length", aliases=["len"]) parser_len.add_argument("label") parser_len.set_defaults(func=length) # Add rev command parser_rev = subparsers.add_parser("rev") parser_rev.add_argument("label", nargs="?") parser_rev.add_argument( "-e", "--extended", action="count", default=0, help="Extended output" ) parser_rev.set_defaults(func=rev) # Add len command parser_ls = subparsers.add_parser("ls") parser_ls.add_argument("label", nargs="?") parser_ls.set_defaults(func=ls) # Add defrag command parser_defrag = subparsers.add_parser("defrag") parser_defrag.add_argument("labels", nargs="*") parser_defrag.add_argument( "-a", "--all", action="store_true", help="Defrag all collections" ) parser_defrag.set_defaults(func=defrag) # Add trim command parser_trim = subparsers.add_parser("trim") parser_trim.add_argument("labels", nargs="*") parser_trim.add_argument( "-b", "--before", type=datetime_like, help="Delete revisions older than given date", ) parser_trim.add_argument("-a", "--all", action="store_true", help="Trim everything") parser_trim.set_defaults(func=trim) # Add push command parser_push = subparsers.add_parser("push") parser_push.add_argument("remote") parser_push.add_argument( "labels", nargs="*", help="Collection to push (all if not set)" ) parser_push.add_argument( "-s", "--shallow", action="store_true", help="Shallow push (send only last revision)", ) parser_push.set_defaults(func=push) # Add pull command parser_pull = subparsers.add_parser("pull") parser_pull.add_argument("remote") parser_pull.add_argument( "labels", nargs="*", help="Collection to pull (all if not set)" ) parser_pull.add_argument( "-s", "--shallow", action="store_true", help="Shallow pull (fetch only last revision)", ) parser_pull.set_defaults(func=pull) # Add create command parser_create = subparsers.add_parser("create") parser_create.add_argument("label") parser_create.add_argument("columns", nargs="+") parser_create.set_defaults(func=create) # Add write command parser_write = subparsers.add_parser("write") parser_write.add_argument("label") # TODO add --update flag to do Series.update instead of Series.write parser_write.set_defaults(func=write) # Add merge command parser_write = subparsers.add_parser("merge") parser_write.add_argument("label") parser_write.set_defaults(func=merge) # Add delete command parser_delete = subparsers.add_parser("delete", aliases=["del"]) parser_delete.add_argument("label", help="collection or series to delete") parser_delete.set_defaults(func=delete) # Add gc command parser_gc = subparsers.add_parser("gc") parser_gc.set_defaults(func=gc) # Add help command parser_help = subparsers.add_parser("help") parser_help.add_argument("help_cmd", nargs="?") parser_help.set_defaults(func=lambda args: print_help(parser, args)) # Add version command parser_len = subparsers.add_parser("version") parser_len.set_defaults(func=lambda *a: print(__version__)) # Add serve command parser_serve = subparsers.add_parser("serve") parser_serve.add_argument( "repo_map", nargs="+", metavar="repo-map", help="space-separated mapping of name and repo uri. " "Example: 'my-local-repo file://.lakota' 'a-remote-one http://host/foobar'. " "Use '/' as name to serve repo at root.", ) parser_serve.add_argument( "-w", "--web-uri", nargs="?", default="http://127.0.0.1:8080", help="Base url at which the repo will be served", ) parser_serve.set_defaults(func=serve) # Parse args args = parser.parse_args() if not args.command: parser.print_help() return # Enable logging if args.verbose == 1: logger.setLevel("INFO") elif args.verbose > 1: logger.setLevel("DEBUG") # Execute command try: if args.timing: with timeit(f"Timing ({args.command}):"): args.func(args) else: args.func(args) except (BrokenPipeError, KeyboardInterrupt): pass
def serve(args)
-
Expand source code
def serve(args): try: from lakota import server except ImportError: raise exit("Please install flask to run server") repo_map = {} for item in args.repo_map: name, *uris = item.split() if not uris: raise ValueError("Missing uri in repo-map argument") repo_map[name] = uris server.run(repo_map, args.web_uri, debug=args.verbose)
def trim(args)
-
Trim changelog of given series. If no series is given, trim repo changelog.
Expand source code
def trim(args): """ Trim changelog of given series. If no series is given, trim repo changelog. """ repo = get_repo(args) labels = repo.ls() if args.all else args.labels if labels: for label in labels: collection = get_collection(repo, label) if not collection: exit(f'Collection "{label}" not found') collection.trim(args.before) if args.all or not args.labels: repo.registry.trim(args.before)
def write(args)
-
Write is done through stdin
$ cat some_file.csv | lakota write my_collection/my_series
Expand source code
def write(args): """ Write is done through stdin ``` $ cat some_file.csv | lakota write my_collection/my_series ``` """ repo = get_repo(args) series = get_series(repo, args.label, auto_create=True) reader = csv.reader(sys.stdin) headers = next(reader) columns = list(zip(*reader)) df = dict(zip(headers, columns)) series.write(df)