Module lakota.pod
The POD
class implement low-level access to different
storage. The from_uri
method allow to instanciate a POD object based
on a uri. The supported schemes are file://
, s3://
, http://
and
memory://
. if no scheme is given, the uri is interpreted as a local
path.
>>> from lakota import POD
>>> pod = POD.from_uri('.lakota')
>>> pod.ls()
['00', '01', '02', '03', ... 'fb', 'fc', 'fd', 'fe', 'ff']
It is mainly used through the lakota.Repo
class.
Expand source code
"""
The `POD` class implement low-level access to different
storage. The `from_uri` method allow to instanciate a POD object based
on a uri. The supported schemes are `file://`, `s3://`, `http://` and
`memory://`. if no scheme is given, the uri is interpreted as a local
path.
``` python-console
>>> from lakota import POD
>>> pod = POD.from_uri('.lakota')
>>> pod.ls()
['00', '01', '02', '03', ... 'fb', 'fc', 'fd', 'fe', 'ff']
```
It is mainly used through the `lakota.Repo` class.
"""
import io
import os
import shutil
from pathlib import Path, PurePosixPath
from threading import Lock
from urllib.parse import parse_qs, urlsplit
try:
import boto3
except ImportError:
boto3 = None
try:
import requests
except ImportError:
requests = None
from .utils import logger
__all__ = ["POD", "FilePOD", "MemPOD", "CachePOD"]
class POD:
@classmethod
def from_token(cls, token):
return cls._by_token[token]
@classmethod
def from_uri(cls, uri=None):
# multi-uri -> CachePOD
if uri and isinstance(uri, (tuple, list)):
if len(uri) > 1:
return CachePOD(
local=POD.from_uri(uri[0]),
remote=POD.from_uri(uri[1:]),
)
else:
return POD.from_uri(uri[0])
# Define protocal and path
parts = urlsplit(uri or "")
scheme, path = parts.scheme, parts.path
kwargs = parse_qs(parts.query)
if not scheme:
if not path or path == ":memory:":
scheme = "memory"
path = "."
else:
scheme = "file"
# Massage path
if parts.scheme and path.startswith("/"):
# urlsplit keep the separator in the path
path = path[1:]
if scheme == "file":
path = Path(path).expanduser()
# Instatiate pod object
path = PurePosixPath(path)
if scheme == "file":
assert not parts.netloc, "Malformed repo uri, should start with 'file:///'"
return FilePOD(path)
elif scheme == "s3":
if boto3 is None:
raise ValueError(
f'Please install the "boto3" module in order to access {uri}'
)
profile = kwargs.get("profile", [None])[0]
verify = kwargs.get("verify", [""])[0].lower() != "false"
key = kwargs.get("key", [""])[0]
secret = kwargs.get("secret", [""])[0]
token = kwargs.get("token", [""])[0]
return S3POD(
path,
netloc=parts.netloc,
profile=profile,
verify=verify,
key=key,
secret=secret,
token=token,
)
elif scheme == "ssh":
raise NotImplementedError("SSH support not implemented yet")
elif scheme in ("http", "https"):
if requests is None:
raise ImportError(
f'Please install the "requests" module in order to access "{uri}"'
)
# Build base uri
base_uri = f"{parts.scheme}://{parts.netloc}/{path}"
# Extract headers
headers = {
k[7:]: v[0] for k, v in kwargs.items() if k.startswith("header-")
}
return HttpPOD(base_uri, headers=headers)
elif scheme == "memory":
lru_size = int(kwargs.get("lru_size", [0])[0])
return MemPOD(path, lru_size=lru_size)
else:
raise ValueError(f'Protocol "{scheme}" not supported in "{uri}"')
def __truediv__(self, relpath):
return self.cd(relpath)
def rm_many(self, pathes, recursive=False):
for path in pathes:
self.rm(path, recursive=recursive)
def walk(self, max_depth=None):
if max_depth == 0:
return []
folders = [("", f, 1) for f in self.ls("")]
while folders:
folder = folders.pop()
root, name, depth = folder
full_path = str(PurePosixPath(root) / name)
if self.isdir(full_path):
if max_depth is not None and depth >= max_depth:
continue
subfolders = [
(full_path, c, depth + 1) for c in reversed(self.ls(full_path))
]
folders.extend(subfolders)
else:
yield full_path
class FilePOD(POD):
protocol = "file"
def __init__(self, path):
self.path = Path(path)
super().__init__()
def cd(self, *others):
path = self.path.joinpath(*others)
return FilePOD(path)
def ls(self, relpath=".", missing_ok=False):
logger.debug("LIST %s %s", self.path, relpath)
path = self.path / relpath
try:
return list(p.name for p in path.iterdir())
except FileNotFoundError:
if missing_ok:
return []
raise
def read(self, relpath, mode="rb"):
logger.debug("READ %s %s", self.path, relpath)
path = self.path / relpath
# XXX make sure path is subpath of self.path
return path.open(mode).read()
def write(self, relpath, data, mode="wb", force=False):
if not force and self.isfile(relpath):
logger.debug("SKIP-WRITE %s %s", self.path, relpath)
return
logger.debug("WRITE %s %s", self.path, relpath)
path = self.path / relpath
path.parent.mkdir(parents=True, exist_ok=True)
return path.open(mode).write(data)
def isdir(self, relpath):
return self.path.joinpath(relpath).is_dir()
def isfile(self, relpath):
return self.path.joinpath(relpath).is_file()
def rm(self, relpath=".", recursive=False, missing_ok=False):
logger.debug("REMOVE %s %s", self.path, relpath)
path = self.path / relpath
try:
if recursive:
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
elif path.is_dir():
path.rmdir()
else:
path.unlink()
except FileNotFoundError:
if missing_ok:
return
raise
def mv(self, from_path, to_path, missing_ok=False):
orig = self.path / from_path
dest = self.path / to_path
logger.debug("MOVE %s to %s", orig, dest)
dest.parent.mkdir(parents=True, exist_ok=True)
try:
orig.rename(dest)
except FileNotFoundError:
if not missing_ok:
raise
@property
def size(self):
return sum(os.path.getsize(self.path / name) for name in self.walk())
class File:
"""Utility class for MemPOD"""
def __init__(self, content):
self.content = content
self.size = len(content)
class Folder:
"""Utility class for MemPOD"""
def __init__(self):
self.items = {}
def add(self, key, kind):
assert kind in (File, Folder)
current = self.items.get(key)
if current is not None:
assert current == kind
else:
self.items[key] = kind
def rm(self, entry):
self.items.pop(entry, None)
def ls(self):
return self.items.keys()
class Store:
"""Utility class for MemPOD"""
def __init__(self, lru_size=None):
self.front_kv = {tuple(): Folder()}
self.back_kv = {}
self._update_lock = Lock() # Lock to serialize updates
self._size = 0 # Size of the front kv
self._nb_swap = 0 # testing helper
self.lru_size = lru_size if lru_size and lru_size > 0 else None
def _ok_size(self):
'''
Re-compute size of front_kv and compare with self._size
'''
total = 0
for v in self.front_kv.values():
if isinstance(v, File):
total += len(v.content)
return total == self._size
def get(self, key):
item = self.front_kv.get(key)
if self.lru_size is None:
return item
if item is None:
# Percolate from back_kv
item = self.back_kv.get(key)
if item is not None:
self.front_kv[key] = item
if isinstance(item, File):
self._update_size(item.size)
return item
def set(self, key, item):
'''
Add item to the store.
We make the assumption that the key is not already in the store
(and if it is there, the associated value is the same).
'''
assert isinstance(item, (File, Folder))
self.front_kv[key] = item
if isinstance(item, File):
self._update_size(item.size)
def setdefault(self, key, item):
assert isinstance(item, Folder)
return self.front_kv.setdefault(key, item)
def delete(self, key):
item = self.front_kv.pop(key, None)
self.back_kv.pop(key, None)
if item is None or not isinstance(item, File):
return
self._update_size(-item.size)
def swap(self):
self.back_kv = self.front_kv
self.front_kv = {tuple(): Folder()}
self._size = 0
self._nb_swap += 1
def _update_size(self, value):
if self.lru_size is None:
return
with self._update_lock:
self._size += value
if self._size > self.lru_size // 2:
self.swap()
class MemPOD(POD):
protocol = "memory"
def __init__(self, path, store=None, lru_size=None):
self.path = PurePosixPath(path)
self.parts = self.path.parts
self.store = store or Store(lru_size=lru_size)
super().__init__()
def cd(self, *others):
path = PurePosixPath(*(self.parts + others))
return MemPOD(path, store=self.store)
def isdir(self, relpath):
relpath = self.split(relpath)
key = self.parts + relpath
item = self.store.get(key)
return isinstance(item, Folder)
def isfile(self, relpath):
relpath = self.split(relpath)
key = self.parts + relpath
item = self.store.get(key)
return isinstance(item, File)
def write(self, relpath, data, mode="rb", force=False):
current_path = tuple()
relpath = self.split(relpath)
full_path = self.parts + relpath
# Walk the tree
for part in full_path:
parent = current_path
current_path = current_path + (part,)
folder = self.store.setdefault(parent, Folder())
assert isinstance(folder, Folder)
if current_path != full_path:
folder.add(part, Folder)
folder.add(part, File)
current_file = self.store.get(current_path)
if not force and current_file is not None:
assert isinstance(current_file, File)
logger.debug("SKIP-WRITE memory://%s %s", self.path, "/".join(relpath))
return
logger.debug("WRITE memory://%s %s", "/".join(self.parts), "/".join(relpath))
self.store.set(current_path, File(data))
return len(data)
def ls(self, relpath="", missing_ok=False):
path = self.parts + self.split(relpath)
item = self.store.get(path)
if item is None:
if missing_ok:
return []
raise FileNotFoundError(f'Path "{path}" not found')
elif isinstance(item, Folder):
return list(item.ls())
else:
return ["/".join(path)]
def rm(self, relpath=".", recursive=False, missing_ok=False):
path = self.parts + self.split(relpath)
item = self.store.get(path)
if isinstance(item, File):
item = self.store.delete(path)
elif isinstance(item, Folder):
if not recursive:
raise OSError(f"{relpath} is not empty")
# Delete and recurse
self.store.delete(path)
for child in list(item.items):
self.rm(relpath + "/" + child, recursive=True)
elif not missing_ok:
raise FileNotFoundError(f"{relpath} not found")
# Update folder info
parent_path = path[:-1]
parent_folder = self.store.get(parent_path)
if parent_folder:
# Note: Parent my not exists in the middle of a
# recursive deletion
parent_folder.rm(path[-1])
def read(self, relpath, mode="rb"):
logger.debug("READ memory://%s %s", "/".join(self.parts), relpath)
path = self.parts + self.split(relpath)
item = self.store.get(path)
if not isinstance(item, File):
raise FileNotFoundError(f'Path "{path}" not found')
return item.content
def mv(self, from_path, to_path, missing_ok=False):
logger.debug("MOVE %s to %s", from_path, to_path)
try:
data = self.read(from_path)
except FileNotFoundError:
if missing_ok:
return
raise
self.write(to_path, data)
self.rm(from_path)
@classmethod
def split(cls, path):
if not path:
return tuple()
if isinstance(path, tuple):
return path
if isinstance(path, PurePosixPath):
return path.parts
return tuple(p for p in path.split("/") if p != ".")
class CachePOD(POD):
def __init__(self, local, remote):
self.local = local
self.remote = remote
self.protocol = f"{local.protocol}+{remote.protocol}"
super().__init__()
@property
def path(self):
return self.local.path
def cd(self, *others):
local = self.local.cd(*others)
remote = self.remote.cd(*others)
return CachePOD(local, remote)
def ls(self, relpath=".", missing_ok=False):
return self.remote.ls(relpath, missing_ok=missing_ok)
# More (too) aggressive implementation:
# res = self.local.ls(relpath, missing_ok=missing_ok)
# if not res:
# print('REMOTE!')
# res = self.remote.ls(relpath, missing_ok=missing_ok)
# return res
def read(self, relpath, mode="rb"):
try:
return self.local.read(relpath, mode=mode)
except FileNotFoundError:
pass
data = self.remote.read(relpath, mode=mode)
self.local.write(relpath, data)
return data
def write(self, relpath, data, mode="wb", force=False):
self.local.write(relpath, data, mode=mode, force=force)
return self.remote.write(relpath, data, mode=mode, force=force)
def isdir(self, relpath):
return self.remote.isdir(relpath)
def isfile(self, relpath):
return self.remote.isfile(relpath)
def rm(self, relpath, recursive=False, missing_ok=False):
self.remote.rm(relpath, recursive=recursive, missing_ok=missing_ok)
try:
self.local.rm(relpath, recursive=recursive, missing_ok=missing_ok)
except FileNotFoundError:
pass
def mv(self, from_path, to_path, missing_ok=False):
self.remote.mv(from_path, to_path, missing_ok=missing_ok)
try:
self.local.mv(from_path, to_path)
except FileNotFoundError:
pass
class SSHPOD(POD):
protocol = "ssh"
def __init__(self, client, path):
self.client = client
super().__init__()
@classmethod
def from_uri(cls, uri):
user, tail = uri.split("@")
host, path = tail.split("/", 1)
key = os.environ["SSH_KEY"]
file_obj = io.StringIO(key)
k = paramiko.RSAKey(file_obj=file_obj)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host, username="username", pkey=k)
return SSHPOD(client, path)
def cd(self, *others):
path = self.path.joinpath(*others)
return SSHPOD(self.client, path)
def ls(self, relpath=".", missing_ok=False):
logger.debug("LIST %s %s", self.path, relpath)
path = self.path / relpath
try:
return self.client.listdir(path)
except FileNotFoundError:
if missing_ok:
return []
raise
def read(self, relpath, mode="rb"):
logger.debug("READ %s %s", self.path, relpath)
path = self.path / relpath
return self.client.open(path, mode).read()
# Trigger imports if related dependencies are present
if requests is not None:
from .http_pod import HttpPOD
if boto3 is not None:
from .s3_pod import S3POD
Classes
class CachePOD (local, remote)
-
Expand source code
class CachePOD(POD): def __init__(self, local, remote): self.local = local self.remote = remote self.protocol = f"{local.protocol}+{remote.protocol}" super().__init__() @property def path(self): return self.local.path def cd(self, *others): local = self.local.cd(*others) remote = self.remote.cd(*others) return CachePOD(local, remote) def ls(self, relpath=".", missing_ok=False): return self.remote.ls(relpath, missing_ok=missing_ok) # More (too) aggressive implementation: # res = self.local.ls(relpath, missing_ok=missing_ok) # if not res: # print('REMOTE!') # res = self.remote.ls(relpath, missing_ok=missing_ok) # return res def read(self, relpath, mode="rb"): try: return self.local.read(relpath, mode=mode) except FileNotFoundError: pass data = self.remote.read(relpath, mode=mode) self.local.write(relpath, data) return data def write(self, relpath, data, mode="wb", force=False): self.local.write(relpath, data, mode=mode, force=force) return self.remote.write(relpath, data, mode=mode, force=force) def isdir(self, relpath): return self.remote.isdir(relpath) def isfile(self, relpath): return self.remote.isfile(relpath) def rm(self, relpath, recursive=False, missing_ok=False): self.remote.rm(relpath, recursive=recursive, missing_ok=missing_ok) try: self.local.rm(relpath, recursive=recursive, missing_ok=missing_ok) except FileNotFoundError: pass def mv(self, from_path, to_path, missing_ok=False): self.remote.mv(from_path, to_path, missing_ok=missing_ok) try: self.local.mv(from_path, to_path) except FileNotFoundError: pass
Ancestors
Instance variables
var path
-
Expand source code
@property def path(self): return self.local.path
Methods
def cd(self, *others)
-
Expand source code
def cd(self, *others): local = self.local.cd(*others) remote = self.remote.cd(*others) return CachePOD(local, remote)
def isdir(self, relpath)
-
Expand source code
def isdir(self, relpath): return self.remote.isdir(relpath)
def isfile(self, relpath)
-
Expand source code
def isfile(self, relpath): return self.remote.isfile(relpath)
def ls(self, relpath='.', missing_ok=False)
-
Expand source code
def ls(self, relpath=".", missing_ok=False): return self.remote.ls(relpath, missing_ok=missing_ok) # More (too) aggressive implementation: # res = self.local.ls(relpath, missing_ok=missing_ok) # if not res: # print('REMOTE!') # res = self.remote.ls(relpath, missing_ok=missing_ok) # return res
def mv(self, from_path, to_path, missing_ok=False)
-
Expand source code
def mv(self, from_path, to_path, missing_ok=False): self.remote.mv(from_path, to_path, missing_ok=missing_ok) try: self.local.mv(from_path, to_path) except FileNotFoundError: pass
def read(self, relpath, mode='rb')
-
Expand source code
def read(self, relpath, mode="rb"): try: return self.local.read(relpath, mode=mode) except FileNotFoundError: pass data = self.remote.read(relpath, mode=mode) self.local.write(relpath, data) return data
def rm(self, relpath, recursive=False, missing_ok=False)
-
Expand source code
def rm(self, relpath, recursive=False, missing_ok=False): self.remote.rm(relpath, recursive=recursive, missing_ok=missing_ok) try: self.local.rm(relpath, recursive=recursive, missing_ok=missing_ok) except FileNotFoundError: pass
def write(self, relpath, data, mode='wb', force=False)
-
Expand source code
def write(self, relpath, data, mode="wb", force=False): self.local.write(relpath, data, mode=mode, force=force) return self.remote.write(relpath, data, mode=mode, force=force)
class FilePOD (path)
-
Expand source code
class FilePOD(POD): protocol = "file" def __init__(self, path): self.path = Path(path) super().__init__() def cd(self, *others): path = self.path.joinpath(*others) return FilePOD(path) def ls(self, relpath=".", missing_ok=False): logger.debug("LIST %s %s", self.path, relpath) path = self.path / relpath try: return list(p.name for p in path.iterdir()) except FileNotFoundError: if missing_ok: return [] raise def read(self, relpath, mode="rb"): logger.debug("READ %s %s", self.path, relpath) path = self.path / relpath # XXX make sure path is subpath of self.path return path.open(mode).read() def write(self, relpath, data, mode="wb", force=False): if not force and self.isfile(relpath): logger.debug("SKIP-WRITE %s %s", self.path, relpath) return logger.debug("WRITE %s %s", self.path, relpath) path = self.path / relpath path.parent.mkdir(parents=True, exist_ok=True) return path.open(mode).write(data) def isdir(self, relpath): return self.path.joinpath(relpath).is_dir() def isfile(self, relpath): return self.path.joinpath(relpath).is_file() def rm(self, relpath=".", recursive=False, missing_ok=False): logger.debug("REMOVE %s %s", self.path, relpath) path = self.path / relpath try: if recursive: if path.is_dir(): shutil.rmtree(path) else: path.unlink() elif path.is_dir(): path.rmdir() else: path.unlink() except FileNotFoundError: if missing_ok: return raise def mv(self, from_path, to_path, missing_ok=False): orig = self.path / from_path dest = self.path / to_path logger.debug("MOVE %s to %s", orig, dest) dest.parent.mkdir(parents=True, exist_ok=True) try: orig.rename(dest) except FileNotFoundError: if not missing_ok: raise @property def size(self): return sum(os.path.getsize(self.path / name) for name in self.walk())
Ancestors
Class variables
var protocol
Instance variables
var size
-
Expand source code
@property def size(self): return sum(os.path.getsize(self.path / name) for name in self.walk())
Methods
def cd(self, *others)
-
Expand source code
def cd(self, *others): path = self.path.joinpath(*others) return FilePOD(path)
def isdir(self, relpath)
-
Expand source code
def isdir(self, relpath): return self.path.joinpath(relpath).is_dir()
def isfile(self, relpath)
-
Expand source code
def isfile(self, relpath): return self.path.joinpath(relpath).is_file()
def ls(self, relpath='.', missing_ok=False)
-
Expand source code
def ls(self, relpath=".", missing_ok=False): logger.debug("LIST %s %s", self.path, relpath) path = self.path / relpath try: return list(p.name for p in path.iterdir()) except FileNotFoundError: if missing_ok: return [] raise
def mv(self, from_path, to_path, missing_ok=False)
-
Expand source code
def mv(self, from_path, to_path, missing_ok=False): orig = self.path / from_path dest = self.path / to_path logger.debug("MOVE %s to %s", orig, dest) dest.parent.mkdir(parents=True, exist_ok=True) try: orig.rename(dest) except FileNotFoundError: if not missing_ok: raise
def read(self, relpath, mode='rb')
-
Expand source code
def read(self, relpath, mode="rb"): logger.debug("READ %s %s", self.path, relpath) path = self.path / relpath # XXX make sure path is subpath of self.path return path.open(mode).read()
def rm(self, relpath='.', recursive=False, missing_ok=False)
-
Expand source code
def rm(self, relpath=".", recursive=False, missing_ok=False): logger.debug("REMOVE %s %s", self.path, relpath) path = self.path / relpath try: if recursive: if path.is_dir(): shutil.rmtree(path) else: path.unlink() elif path.is_dir(): path.rmdir() else: path.unlink() except FileNotFoundError: if missing_ok: return raise
def write(self, relpath, data, mode='wb', force=False)
-
Expand source code
def write(self, relpath, data, mode="wb", force=False): if not force and self.isfile(relpath): logger.debug("SKIP-WRITE %s %s", self.path, relpath) return logger.debug("WRITE %s %s", self.path, relpath) path = self.path / relpath path.parent.mkdir(parents=True, exist_ok=True) return path.open(mode).write(data)
class MemPOD (path, store=None, lru_size=None)
-
Expand source code
class MemPOD(POD): protocol = "memory" def __init__(self, path, store=None, lru_size=None): self.path = PurePosixPath(path) self.parts = self.path.parts self.store = store or Store(lru_size=lru_size) super().__init__() def cd(self, *others): path = PurePosixPath(*(self.parts + others)) return MemPOD(path, store=self.store) def isdir(self, relpath): relpath = self.split(relpath) key = self.parts + relpath item = self.store.get(key) return isinstance(item, Folder) def isfile(self, relpath): relpath = self.split(relpath) key = self.parts + relpath item = self.store.get(key) return isinstance(item, File) def write(self, relpath, data, mode="rb", force=False): current_path = tuple() relpath = self.split(relpath) full_path = self.parts + relpath # Walk the tree for part in full_path: parent = current_path current_path = current_path + (part,) folder = self.store.setdefault(parent, Folder()) assert isinstance(folder, Folder) if current_path != full_path: folder.add(part, Folder) folder.add(part, File) current_file = self.store.get(current_path) if not force and current_file is not None: assert isinstance(current_file, File) logger.debug("SKIP-WRITE memory://%s %s", self.path, "/".join(relpath)) return logger.debug("WRITE memory://%s %s", "/".join(self.parts), "/".join(relpath)) self.store.set(current_path, File(data)) return len(data) def ls(self, relpath="", missing_ok=False): path = self.parts + self.split(relpath) item = self.store.get(path) if item is None: if missing_ok: return [] raise FileNotFoundError(f'Path "{path}" not found') elif isinstance(item, Folder): return list(item.ls()) else: return ["/".join(path)] def rm(self, relpath=".", recursive=False, missing_ok=False): path = self.parts + self.split(relpath) item = self.store.get(path) if isinstance(item, File): item = self.store.delete(path) elif isinstance(item, Folder): if not recursive: raise OSError(f"{relpath} is not empty") # Delete and recurse self.store.delete(path) for child in list(item.items): self.rm(relpath + "/" + child, recursive=True) elif not missing_ok: raise FileNotFoundError(f"{relpath} not found") # Update folder info parent_path = path[:-1] parent_folder = self.store.get(parent_path) if parent_folder: # Note: Parent my not exists in the middle of a # recursive deletion parent_folder.rm(path[-1]) def read(self, relpath, mode="rb"): logger.debug("READ memory://%s %s", "/".join(self.parts), relpath) path = self.parts + self.split(relpath) item = self.store.get(path) if not isinstance(item, File): raise FileNotFoundError(f'Path "{path}" not found') return item.content def mv(self, from_path, to_path, missing_ok=False): logger.debug("MOVE %s to %s", from_path, to_path) try: data = self.read(from_path) except FileNotFoundError: if missing_ok: return raise self.write(to_path, data) self.rm(from_path) @classmethod def split(cls, path): if not path: return tuple() if isinstance(path, tuple): return path if isinstance(path, PurePosixPath): return path.parts return tuple(p for p in path.split("/") if p != ".")
Ancestors
Class variables
var protocol
Static methods
def split(path)
-
Expand source code
@classmethod def split(cls, path): if not path: return tuple() if isinstance(path, tuple): return path if isinstance(path, PurePosixPath): return path.parts return tuple(p for p in path.split("/") if p != ".")
Methods
def cd(self, *others)
-
Expand source code
def cd(self, *others): path = PurePosixPath(*(self.parts + others)) return MemPOD(path, store=self.store)
def isdir(self, relpath)
-
Expand source code
def isdir(self, relpath): relpath = self.split(relpath) key = self.parts + relpath item = self.store.get(key) return isinstance(item, Folder)
def isfile(self, relpath)
-
Expand source code
def isfile(self, relpath): relpath = self.split(relpath) key = self.parts + relpath item = self.store.get(key) return isinstance(item, File)
def ls(self, relpath='', missing_ok=False)
-
Expand source code
def ls(self, relpath="", missing_ok=False): path = self.parts + self.split(relpath) item = self.store.get(path) if item is None: if missing_ok: return [] raise FileNotFoundError(f'Path "{path}" not found') elif isinstance(item, Folder): return list(item.ls()) else: return ["/".join(path)]
def mv(self, from_path, to_path, missing_ok=False)
-
Expand source code
def mv(self, from_path, to_path, missing_ok=False): logger.debug("MOVE %s to %s", from_path, to_path) try: data = self.read(from_path) except FileNotFoundError: if missing_ok: return raise self.write(to_path, data) self.rm(from_path)
def read(self, relpath, mode='rb')
-
Expand source code
def read(self, relpath, mode="rb"): logger.debug("READ memory://%s %s", "/".join(self.parts), relpath) path = self.parts + self.split(relpath) item = self.store.get(path) if not isinstance(item, File): raise FileNotFoundError(f'Path "{path}" not found') return item.content
def rm(self, relpath='.', recursive=False, missing_ok=False)
-
Expand source code
def rm(self, relpath=".", recursive=False, missing_ok=False): path = self.parts + self.split(relpath) item = self.store.get(path) if isinstance(item, File): item = self.store.delete(path) elif isinstance(item, Folder): if not recursive: raise OSError(f"{relpath} is not empty") # Delete and recurse self.store.delete(path) for child in list(item.items): self.rm(relpath + "/" + child, recursive=True) elif not missing_ok: raise FileNotFoundError(f"{relpath} not found") # Update folder info parent_path = path[:-1] parent_folder = self.store.get(parent_path) if parent_folder: # Note: Parent my not exists in the middle of a # recursive deletion parent_folder.rm(path[-1])
def write(self, relpath, data, mode='rb', force=False)
-
Expand source code
def write(self, relpath, data, mode="rb", force=False): current_path = tuple() relpath = self.split(relpath) full_path = self.parts + relpath # Walk the tree for part in full_path: parent = current_path current_path = current_path + (part,) folder = self.store.setdefault(parent, Folder()) assert isinstance(folder, Folder) if current_path != full_path: folder.add(part, Folder) folder.add(part, File) current_file = self.store.get(current_path) if not force and current_file is not None: assert isinstance(current_file, File) logger.debug("SKIP-WRITE memory://%s %s", self.path, "/".join(relpath)) return logger.debug("WRITE memory://%s %s", "/".join(self.parts), "/".join(relpath)) self.store.set(current_path, File(data)) return len(data)
class POD
-
Expand source code
class POD: @classmethod def from_token(cls, token): return cls._by_token[token] @classmethod def from_uri(cls, uri=None): # multi-uri -> CachePOD if uri and isinstance(uri, (tuple, list)): if len(uri) > 1: return CachePOD( local=POD.from_uri(uri[0]), remote=POD.from_uri(uri[1:]), ) else: return POD.from_uri(uri[0]) # Define protocal and path parts = urlsplit(uri or "") scheme, path = parts.scheme, parts.path kwargs = parse_qs(parts.query) if not scheme: if not path or path == ":memory:": scheme = "memory" path = "." else: scheme = "file" # Massage path if parts.scheme and path.startswith("/"): # urlsplit keep the separator in the path path = path[1:] if scheme == "file": path = Path(path).expanduser() # Instatiate pod object path = PurePosixPath(path) if scheme == "file": assert not parts.netloc, "Malformed repo uri, should start with 'file:///'" return FilePOD(path) elif scheme == "s3": if boto3 is None: raise ValueError( f'Please install the "boto3" module in order to access {uri}' ) profile = kwargs.get("profile", [None])[0] verify = kwargs.get("verify", [""])[0].lower() != "false" key = kwargs.get("key", [""])[0] secret = kwargs.get("secret", [""])[0] token = kwargs.get("token", [""])[0] return S3POD( path, netloc=parts.netloc, profile=profile, verify=verify, key=key, secret=secret, token=token, ) elif scheme == "ssh": raise NotImplementedError("SSH support not implemented yet") elif scheme in ("http", "https"): if requests is None: raise ImportError( f'Please install the "requests" module in order to access "{uri}"' ) # Build base uri base_uri = f"{parts.scheme}://{parts.netloc}/{path}" # Extract headers headers = { k[7:]: v[0] for k, v in kwargs.items() if k.startswith("header-") } return HttpPOD(base_uri, headers=headers) elif scheme == "memory": lru_size = int(kwargs.get("lru_size", [0])[0]) return MemPOD(path, lru_size=lru_size) else: raise ValueError(f'Protocol "{scheme}" not supported in "{uri}"') def __truediv__(self, relpath): return self.cd(relpath) def rm_many(self, pathes, recursive=False): for path in pathes: self.rm(path, recursive=recursive) def walk(self, max_depth=None): if max_depth == 0: return [] folders = [("", f, 1) for f in self.ls("")] while folders: folder = folders.pop() root, name, depth = folder full_path = str(PurePosixPath(root) / name) if self.isdir(full_path): if max_depth is not None and depth >= max_depth: continue subfolders = [ (full_path, c, depth + 1) for c in reversed(self.ls(full_path)) ] folders.extend(subfolders) else: yield full_path
Subclasses
Static methods
def from_token(token)
-
Expand source code
@classmethod def from_token(cls, token): return cls._by_token[token]
def from_uri(uri=None)
-
Expand source code
@classmethod def from_uri(cls, uri=None): # multi-uri -> CachePOD if uri and isinstance(uri, (tuple, list)): if len(uri) > 1: return CachePOD( local=POD.from_uri(uri[0]), remote=POD.from_uri(uri[1:]), ) else: return POD.from_uri(uri[0]) # Define protocal and path parts = urlsplit(uri or "") scheme, path = parts.scheme, parts.path kwargs = parse_qs(parts.query) if not scheme: if not path or path == ":memory:": scheme = "memory" path = "." else: scheme = "file" # Massage path if parts.scheme and path.startswith("/"): # urlsplit keep the separator in the path path = path[1:] if scheme == "file": path = Path(path).expanduser() # Instatiate pod object path = PurePosixPath(path) if scheme == "file": assert not parts.netloc, "Malformed repo uri, should start with 'file:///'" return FilePOD(path) elif scheme == "s3": if boto3 is None: raise ValueError( f'Please install the "boto3" module in order to access {uri}' ) profile = kwargs.get("profile", [None])[0] verify = kwargs.get("verify", [""])[0].lower() != "false" key = kwargs.get("key", [""])[0] secret = kwargs.get("secret", [""])[0] token = kwargs.get("token", [""])[0] return S3POD( path, netloc=parts.netloc, profile=profile, verify=verify, key=key, secret=secret, token=token, ) elif scheme == "ssh": raise NotImplementedError("SSH support not implemented yet") elif scheme in ("http", "https"): if requests is None: raise ImportError( f'Please install the "requests" module in order to access "{uri}"' ) # Build base uri base_uri = f"{parts.scheme}://{parts.netloc}/{path}" # Extract headers headers = { k[7:]: v[0] for k, v in kwargs.items() if k.startswith("header-") } return HttpPOD(base_uri, headers=headers) elif scheme == "memory": lru_size = int(kwargs.get("lru_size", [0])[0]) return MemPOD(path, lru_size=lru_size) else: raise ValueError(f'Protocol "{scheme}" not supported in "{uri}"')
Methods
def rm_many(self, pathes, recursive=False)
-
Expand source code
def rm_many(self, pathes, recursive=False): for path in pathes: self.rm(path, recursive=recursive)
def walk(self, max_depth=None)
-
Expand source code
def walk(self, max_depth=None): if max_depth == 0: return [] folders = [("", f, 1) for f in self.ls("")] while folders: folder = folders.pop() root, name, depth = folder full_path = str(PurePosixPath(root) / name) if self.isdir(full_path): if max_depth is not None and depth >= max_depth: continue subfolders = [ (full_path, c, depth + 1) for c in reversed(self.ls(full_path)) ] folders.extend(subfolders) else: yield full_path