Module lakota.s3_pod

Expand source code
from itertools import chain
from pathlib import PurePosixPath

import boto3
import urllib3
from botocore.exceptions import ClientError

from .pod import POD
from .utils import logger


def silence_insecure_warning():
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class S3POD(POD):

    protocol = "s3"

    def __init__(
        self,
        path,
        netloc=None,
        profile=None,
        verify=True,
        client=None,
        key=None,
        secret=None,
        token=None,
    ):
        """
        `path` must contains bucket name and sub-path in the bucket
        (separated by a `/`). `verify` set to False will disable ssl
        verifications. Please note that boto3 will also use
        "REQUESTS_CA_BUNDLE" env variable.
        """
        bucket, *parts = path.parts
        self.path = PurePosixPath(*parts)
        self.bucket = PurePosixPath(bucket)
        if client:
            self.client = client
        else:
            # TODO support for https on custom endpoints
            # TODO document use of param: endpoint_url='http://127.0.0.1:5300'
            if not verify:
                silence_insecure_warning()
            endpoint_url = f"http://{netloc}" if netloc else None
            session = boto3.session.Session(
                aws_access_key_id=key,
                aws_secret_access_key=secret,
                aws_session_token=token,
                profile_name=profile,
            )
            self.client = session.client(
                "s3", verify=verify, aws_session_token=token, endpoint_url=endpoint_url
            )
        super().__init__()

    def cd(self, *others):
        path = self.path.joinpath(*others)
        return S3POD(self.bucket / path, client=self.client)

    def ls_iter(self, prefix, limit=None, delimiter="/"):
        '''
        Iterable that yield lists of object keys
        '''
        logger.debug("LIST s3:///%s/%s", self.bucket, prefix)
        paginator = self.client.get_paginator("list_objects")
        options = {
            "Bucket": str(self.bucket),
            "Prefix": prefix,
        }
        if delimiter:
            options["Delimiter"] = delimiter
        if limit is not None:
            options["PaginationConfig"] = {"MaxItems": limit}

        page_iterator = paginator.paginate(**options)
        for page in page_iterator:
            # Extract pseudo-folder names
            common_prefixes = page.get("CommonPrefixes", [])
            yield [item["Prefix"].rstrip("/") for item in common_prefixes]
            # Extract keys (filenames)
            contents = page.get("Contents", [])
            yield [item["Key"] for item in contents]

    def ls(self, relpath=".", missing_ok=False, limit=None, delimiter="/"):
        '''
        The parameter `missing_ok` is not supported, it is present for
        compatibility reason with the other pods.
        '''
        prefix = str(self.path / relpath)
        prefix = "" if prefix in (".", "") else prefix + delimiter
        cut = len(prefix)
        it = self.ls_iter(prefix, limit=limit, delimiter=delimiter)
        return [item[cut:] for item in chain.from_iterable(it)]

    def read(self, relpath, mode="rb"):
        logger.debug("READ s3:///%s/%s %s", self.bucket, self.path, relpath)
        key = str(self.path / relpath)
        try:
            resp = self.client.get_object(Bucket=str(self.bucket), Key=key)
        except ClientError as err:
            if err.response["Error"]["Code"] == "NoSuchKey":
                raise FileNotFoundError(f'Key "{relpath}" not found')
            raise

        return resp["Body"].read()

    def write(self, relpath, data, mode="wb", force=False):
        if not force and self.isfile(relpath):
            logger.debug("SKIP-WRITE s3:///%s/%s %s", self.bucket, self.path, relpath)
            return
        logger.debug("WRITE s3:///%s%s %s", self.bucket, self.path, relpath)
        key = str(self.path / relpath)
        response = self.client.put_object(
            Bucket=str(self.bucket),
            Body=data,
            Key=key,
        )
        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
        return len(data)

    def isdir(self, relpath):
        return len(self.ls(relpath, limit=1)) > 0

    def isfile(self, relpath):
        key = str(self.path / relpath)
        try:
            _ = self.client.get_object(Bucket=str(self.bucket), Key=key)
        except ClientError as err:
            if err.response["Error"]["Code"] == "NoSuchKey":
                return False
            # Other kind of error, reraise
            raise
        return True

    def rm(self, relpath=".", recursive=False, missing_ok=False):
        logger.debug("REMOVE s3://%s/%s %s", self.bucket, self.path, relpath)
        prefix = str(self.path / relpath)
        if not recursive:
            if missing_ok:
                keys = [prefix]
            else:
                # We must make sure the key exists
                it = self.ls_iter(prefix, limit=2, delimiter="")
                keys = list(chain.from_iterable(it))
                if not keys:
                    raise FileNotFoundError(f"{relpath} not found")

            if len(keys) > 1:
                # We raise an OSError to mimic file based access
                raise OSError(f"{relpath} is not empty")
            self._delete_keys([prefix])
            return

        # Recursive case, we use ls_iter to loop on a potentially
        # large number of files
        prefix = "" if prefix in (".", "") else prefix + "/"
        files_found = False
        for chunk in self.ls_iter(prefix, delimiter=""):
            if not chunk:
                # can be empty
                continue
            files_found = True
            self._delete_keys(chunk)

        if not files_found and not missing_ok:
            raise FileNotFoundError(f"{relpath} not found")

    def _delete_keys(self, keys):
        try:
            _ = self.client.delete_objects(
                Bucket=str(self.bucket),
                Delete={
                    "Objects": [{"Key": k} for k in keys],
                    "Quiet": True,
                },
            )

            # TODO check for error in response
        except ClientError as err:
            if err.response["Error"]["Code"] != "MalformedXML":
                raise
            # As of version 2.2.6, Moto doesn't support correctly
            # delete_objects() calls, we fall back to delete_object()
            for key in keys:
                self.client.delete_object(
                    Bucket=str(self.bucket),
                    Key=key,
                )

    def mv(self, from_path, to_path, missing_ok=False):
        orig = str(self.path / from_path)
        dest = str(self.path / to_path)
        logger.debug(
            "MOVE s3://%s/%s to s3://%s/%s", self.bucket, orig, self.bucket, dest
        )
        try:
            # Copy to dest
            self.client.copy(
                {"Bucket": str(self.bucket), "Key": orig},
                str(self.bucket),
                dest,
            )
            # Delete orig
            self.client.delete_object(
                Bucket=str(self.bucket),
                Key=orig,
            )
        except ClientError as err:
            if err.response["Error"]["Code"] == "404":
                if missing_ok:
                    return
                raise FileNotFoundError(f'Path "{orig}" not found')
            raise

Functions

def silence_insecure_warning()
Expand source code
def silence_insecure_warning():
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Classes

class S3POD (path, netloc=None, profile=None, verify=True, client=None, key=None, secret=None, token=None)

path must contains bucket name and sub-path in the bucket (separated by a /). verify set to False will disable ssl verifications. Please note that boto3 will also use "REQUESTS_CA_BUNDLE" env variable.

Expand source code
class S3POD(POD):

    protocol = "s3"

    def __init__(
        self,
        path,
        netloc=None,
        profile=None,
        verify=True,
        client=None,
        key=None,
        secret=None,
        token=None,
    ):
        """
        `path` must contains bucket name and sub-path in the bucket
        (separated by a `/`). `verify` set to False will disable ssl
        verifications. Please note that boto3 will also use
        "REQUESTS_CA_BUNDLE" env variable.
        """
        bucket, *parts = path.parts
        self.path = PurePosixPath(*parts)
        self.bucket = PurePosixPath(bucket)
        if client:
            self.client = client
        else:
            # TODO support for https on custom endpoints
            # TODO document use of param: endpoint_url='http://127.0.0.1:5300'
            if not verify:
                silence_insecure_warning()
            endpoint_url = f"http://{netloc}" if netloc else None
            session = boto3.session.Session(
                aws_access_key_id=key,
                aws_secret_access_key=secret,
                aws_session_token=token,
                profile_name=profile,
            )
            self.client = session.client(
                "s3", verify=verify, aws_session_token=token, endpoint_url=endpoint_url
            )
        super().__init__()

    def cd(self, *others):
        path = self.path.joinpath(*others)
        return S3POD(self.bucket / path, client=self.client)

    def ls_iter(self, prefix, limit=None, delimiter="/"):
        '''
        Iterable that yield lists of object keys
        '''
        logger.debug("LIST s3:///%s/%s", self.bucket, prefix)
        paginator = self.client.get_paginator("list_objects")
        options = {
            "Bucket": str(self.bucket),
            "Prefix": prefix,
        }
        if delimiter:
            options["Delimiter"] = delimiter
        if limit is not None:
            options["PaginationConfig"] = {"MaxItems": limit}

        page_iterator = paginator.paginate(**options)
        for page in page_iterator:
            # Extract pseudo-folder names
            common_prefixes = page.get("CommonPrefixes", [])
            yield [item["Prefix"].rstrip("/") for item in common_prefixes]
            # Extract keys (filenames)
            contents = page.get("Contents", [])
            yield [item["Key"] for item in contents]

    def ls(self, relpath=".", missing_ok=False, limit=None, delimiter="/"):
        '''
        The parameter `missing_ok` is not supported, it is present for
        compatibility reason with the other pods.
        '''
        prefix = str(self.path / relpath)
        prefix = "" if prefix in (".", "") else prefix + delimiter
        cut = len(prefix)
        it = self.ls_iter(prefix, limit=limit, delimiter=delimiter)
        return [item[cut:] for item in chain.from_iterable(it)]

    def read(self, relpath, mode="rb"):
        logger.debug("READ s3:///%s/%s %s", self.bucket, self.path, relpath)
        key = str(self.path / relpath)
        try:
            resp = self.client.get_object(Bucket=str(self.bucket), Key=key)
        except ClientError as err:
            if err.response["Error"]["Code"] == "NoSuchKey":
                raise FileNotFoundError(f'Key "{relpath}" not found')
            raise

        return resp["Body"].read()

    def write(self, relpath, data, mode="wb", force=False):
        if not force and self.isfile(relpath):
            logger.debug("SKIP-WRITE s3:///%s/%s %s", self.bucket, self.path, relpath)
            return
        logger.debug("WRITE s3:///%s%s %s", self.bucket, self.path, relpath)
        key = str(self.path / relpath)
        response = self.client.put_object(
            Bucket=str(self.bucket),
            Body=data,
            Key=key,
        )
        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
        return len(data)

    def isdir(self, relpath):
        return len(self.ls(relpath, limit=1)) > 0

    def isfile(self, relpath):
        key = str(self.path / relpath)
        try:
            _ = self.client.get_object(Bucket=str(self.bucket), Key=key)
        except ClientError as err:
            if err.response["Error"]["Code"] == "NoSuchKey":
                return False
            # Other kind of error, reraise
            raise
        return True

    def rm(self, relpath=".", recursive=False, missing_ok=False):
        logger.debug("REMOVE s3://%s/%s %s", self.bucket, self.path, relpath)
        prefix = str(self.path / relpath)
        if not recursive:
            if missing_ok:
                keys = [prefix]
            else:
                # We must make sure the key exists
                it = self.ls_iter(prefix, limit=2, delimiter="")
                keys = list(chain.from_iterable(it))
                if not keys:
                    raise FileNotFoundError(f"{relpath} not found")

            if len(keys) > 1:
                # We raise an OSError to mimic file based access
                raise OSError(f"{relpath} is not empty")
            self._delete_keys([prefix])
            return

        # Recursive case, we use ls_iter to loop on a potentially
        # large number of files
        prefix = "" if prefix in (".", "") else prefix + "/"
        files_found = False
        for chunk in self.ls_iter(prefix, delimiter=""):
            if not chunk:
                # can be empty
                continue
            files_found = True
            self._delete_keys(chunk)

        if not files_found and not missing_ok:
            raise FileNotFoundError(f"{relpath} not found")

    def _delete_keys(self, keys):
        try:
            _ = self.client.delete_objects(
                Bucket=str(self.bucket),
                Delete={
                    "Objects": [{"Key": k} for k in keys],
                    "Quiet": True,
                },
            )

            # TODO check for error in response
        except ClientError as err:
            if err.response["Error"]["Code"] != "MalformedXML":
                raise
            # As of version 2.2.6, Moto doesn't support correctly
            # delete_objects() calls, we fall back to delete_object()
            for key in keys:
                self.client.delete_object(
                    Bucket=str(self.bucket),
                    Key=key,
                )

    def mv(self, from_path, to_path, missing_ok=False):
        orig = str(self.path / from_path)
        dest = str(self.path / to_path)
        logger.debug(
            "MOVE s3://%s/%s to s3://%s/%s", self.bucket, orig, self.bucket, dest
        )
        try:
            # Copy to dest
            self.client.copy(
                {"Bucket": str(self.bucket), "Key": orig},
                str(self.bucket),
                dest,
            )
            # Delete orig
            self.client.delete_object(
                Bucket=str(self.bucket),
                Key=orig,
            )
        except ClientError as err:
            if err.response["Error"]["Code"] == "404":
                if missing_ok:
                    return
                raise FileNotFoundError(f'Path "{orig}" not found')
            raise

Ancestors

Class variables

var protocol

Methods

def cd(self, *others)
Expand source code
def cd(self, *others):
    path = self.path.joinpath(*others)
    return S3POD(self.bucket / path, client=self.client)
def isdir(self, relpath)
Expand source code
def isdir(self, relpath):
    return len(self.ls(relpath, limit=1)) > 0
def isfile(self, relpath)
Expand source code
def isfile(self, relpath):
    key = str(self.path / relpath)
    try:
        _ = self.client.get_object(Bucket=str(self.bucket), Key=key)
    except ClientError as err:
        if err.response["Error"]["Code"] == "NoSuchKey":
            return False
        # Other kind of error, reraise
        raise
    return True
def ls(self, relpath='.', missing_ok=False, limit=None, delimiter='/')

The parameter missing_ok is not supported, it is present for compatibility reason with the other pods.

Expand source code
def ls(self, relpath=".", missing_ok=False, limit=None, delimiter="/"):
    '''
    The parameter `missing_ok` is not supported, it is present for
    compatibility reason with the other pods.
    '''
    prefix = str(self.path / relpath)
    prefix = "" if prefix in (".", "") else prefix + delimiter
    cut = len(prefix)
    it = self.ls_iter(prefix, limit=limit, delimiter=delimiter)
    return [item[cut:] for item in chain.from_iterable(it)]
def ls_iter(self, prefix, limit=None, delimiter='/')

Iterable that yield lists of object keys

Expand source code
def ls_iter(self, prefix, limit=None, delimiter="/"):
    '''
    Iterable that yield lists of object keys
    '''
    logger.debug("LIST s3:///%s/%s", self.bucket, prefix)
    paginator = self.client.get_paginator("list_objects")
    options = {
        "Bucket": str(self.bucket),
        "Prefix": prefix,
    }
    if delimiter:
        options["Delimiter"] = delimiter
    if limit is not None:
        options["PaginationConfig"] = {"MaxItems": limit}

    page_iterator = paginator.paginate(**options)
    for page in page_iterator:
        # Extract pseudo-folder names
        common_prefixes = page.get("CommonPrefixes", [])
        yield [item["Prefix"].rstrip("/") for item in common_prefixes]
        # Extract keys (filenames)
        contents = page.get("Contents", [])
        yield [item["Key"] for item in contents]
def mv(self, from_path, to_path, missing_ok=False)
Expand source code
def mv(self, from_path, to_path, missing_ok=False):
    orig = str(self.path / from_path)
    dest = str(self.path / to_path)
    logger.debug(
        "MOVE s3://%s/%s to s3://%s/%s", self.bucket, orig, self.bucket, dest
    )
    try:
        # Copy to dest
        self.client.copy(
            {"Bucket": str(self.bucket), "Key": orig},
            str(self.bucket),
            dest,
        )
        # Delete orig
        self.client.delete_object(
            Bucket=str(self.bucket),
            Key=orig,
        )
    except ClientError as err:
        if err.response["Error"]["Code"] == "404":
            if missing_ok:
                return
            raise FileNotFoundError(f'Path "{orig}" not found')
        raise
def read(self, relpath, mode='rb')
Expand source code
def read(self, relpath, mode="rb"):
    logger.debug("READ s3:///%s/%s %s", self.bucket, self.path, relpath)
    key = str(self.path / relpath)
    try:
        resp = self.client.get_object(Bucket=str(self.bucket), Key=key)
    except ClientError as err:
        if err.response["Error"]["Code"] == "NoSuchKey":
            raise FileNotFoundError(f'Key "{relpath}" not found')
        raise

    return resp["Body"].read()
def rm(self, relpath='.', recursive=False, missing_ok=False)
Expand source code
def rm(self, relpath=".", recursive=False, missing_ok=False):
    logger.debug("REMOVE s3://%s/%s %s", self.bucket, self.path, relpath)
    prefix = str(self.path / relpath)
    if not recursive:
        if missing_ok:
            keys = [prefix]
        else:
            # We must make sure the key exists
            it = self.ls_iter(prefix, limit=2, delimiter="")
            keys = list(chain.from_iterable(it))
            if not keys:
                raise FileNotFoundError(f"{relpath} not found")

        if len(keys) > 1:
            # We raise an OSError to mimic file based access
            raise OSError(f"{relpath} is not empty")
        self._delete_keys([prefix])
        return

    # Recursive case, we use ls_iter to loop on a potentially
    # large number of files
    prefix = "" if prefix in (".", "") else prefix + "/"
    files_found = False
    for chunk in self.ls_iter(prefix, delimiter=""):
        if not chunk:
            # can be empty
            continue
        files_found = True
        self._delete_keys(chunk)

    if not files_found and not missing_ok:
        raise FileNotFoundError(f"{relpath} not found")
def write(self, relpath, data, mode='wb', force=False)
Expand source code
def write(self, relpath, data, mode="wb", force=False):
    if not force and self.isfile(relpath):
        logger.debug("SKIP-WRITE s3:///%s/%s %s", self.bucket, self.path, relpath)
        return
    logger.debug("WRITE s3:///%s%s %s", self.bucket, self.path, relpath)
    key = str(self.path / relpath)
    response = self.client.put_object(
        Bucket=str(self.bucket),
        Body=data,
        Key=key,
    )
    assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
    return len(data)