Module lakota.sexpr

The AST (abstract syntax tree) implement parsing and evaluation of s-expressions.

Example:

>>> from lakota.sexpr import AST
>>> AST.parse('(+ 1 1)')
<lakota.sexpr.AST object at 0x7f1bcc5b2fd0>
>>> ast = AST.parse('(+ 1 1)')
>>> ast.eval()
2

The eval method accepts an env parameter, a dictionary used to evaluate non-litteral tokens:

>>> ast = AST.parse('(+ 1 x)')
>>> ast.eval()
Traceback (most recent call last):
   ...
ValueError: Unexpected token: "x"
>>> ast.eval({'x': 2})
3

The reduce method on Frame use AST with self already setup in the evaluation environment as the frame itself.

>>> frm = series.frame()
>>> frm.slice(0,10)
date_reported-> ['2020-01-03T00:00:00' '2020-01-04T00:00:00'
 '2020-01-05T00:00:00' '2020-01-06T00:00:00' '2020-01-07T00:00:00'
 '2020-01-08T00:00:00' '2020-01-09T00:00:00' '2020-01-10T00:00:00'
 '2020-01-11T00:00:00' '2020-01-12T00:00:00']
new_cases-> [0 0 0 0 0 0 0 0 0 0]
>>> frm.reduce('(max self.new_cases)')
(max self.new_cases)-> [22210]
 # Use the 'as' operator to specify an alias:
>>> frm.reduce('(as (max self.new_cases) "max_new_cases")')
max_new_cases-> [22210]
Expand source code
"""
The `AST` (abstract syntax tree) implement parsing and evaluation
of [s-expressions](https://en.wikipedia.org/wiki/S-expression).

Example:

``` python-console
>>> from lakota.sexpr import AST
>>> AST.parse('(+ 1 1)')
<lakota.sexpr.AST object at 0x7f1bcc5b2fd0>
>>> ast = AST.parse('(+ 1 1)')
>>> ast.eval()
2
```

The `eval` method accepts an `env` parameter, a dictionary used to
evaluate non-litteral tokens:

``` python-console
>>> ast = AST.parse('(+ 1 x)')
>>> ast.eval()
Traceback (most recent call last):
   ...
ValueError: Unexpected token: "x"
>>> ast.eval({'x': 2})
3
```

The `reduce` method on `lakota.frame.Frame` use AST with `self` already
setup in the evaluation environment as the frame itself.

``` python-console
>>> frm = series.frame()
>>> frm.slice(0,10)
date_reported-> ['2020-01-03T00:00:00' '2020-01-04T00:00:00'
 '2020-01-05T00:00:00' '2020-01-06T00:00:00' '2020-01-07T00:00:00'
 '2020-01-08T00:00:00' '2020-01-09T00:00:00' '2020-01-10T00:00:00'
 '2020-01-11T00:00:00' '2020-01-12T00:00:00']
new_cases-> [0 0 0 0 0 0 0 0 0 0]
>>> frm.reduce('(max self.new_cases)')
(max self.new_cases)-> [22210]
 # Use the 'as' operator to specify an alias:
>>> frm.reduce('(as (max self.new_cases) "max_new_cases")')
max_new_cases-> [22210]
```
"""

import operator
import shlex
from functools import reduce

import numpy
from numpy import bincount, inf, max, maximum, mean, min, minimum, quantile, repeat, sum

__all__ = ["AST"]


UNSET = object()


def list_to_dict(*items):
    it = iter(items)
    return dict(zip(it, it))


class KWargs:
    def __init__(self, *items):
        self.value = list_to_dict(*items)

    def __repr__(self):
        return f'<KWargs {self.value}>'


class Env:
    def __init__(self, values):
        if isinstance(values, Env):
            # Un-wrap env
            values = values.values
        self.values = values

    def get(self, key, default=UNSET):
        res = self.values
        for part in key.split("."):
            try:
                res = res[part]
            except KeyError:
                pass
            else:
                continue
            try:
                res = getattr(res, part)
            except AttributeError:
                if default is not UNSET:
                    return default
                raise KeyError(part)
        return res

    def __repr__(self):
        content = repr(self.values)
        return f"<Env {content}>"


class Token:
    def __init__(self, value):
        self.value = value

    def as_string(self):
        res = self.value.strip("'\"")
        if len(res) < len(self.value):
            return res
        return None

    def __repr__(self):
        return f"<Token {self.value}>"

    def as_number(self):
        try:
            return int(self.value)
        except ValueError:
            pass

        try:
            return float(self.value)
        except ValueError:
            pass

        return None

    def is_aggregate(self):
        return self.value in AST.aggregates

    def eval(self, env):
        # Eval builtins
        if self.value in AST.builtins:
            return AST.builtins[self.value]
        # Eval aggregates
        if self.is_aggregate():
            return Agg(self.value, env)
        # Eval floats and int
        res = self.as_number()
        if res is not None:
            return res
        # Eval strings
        res = self.as_string()
        if res is not None:
            return res
        # Eval env
        try:
            return env.get(self.value)
        except KeyError:
            pass

        # Eval numpy function
        fn = numpy
        for v in self.value.split("."):
            fn = getattr(fn, v, None)

        if fn and callable(fn):
            return fn

        raise ValueError(f'Unexpected token: "{self.value}"')


class Agg:
    def __init__(self, op, env):
        self.op = op
        self.env = env

    def __call__(self, arr, *operands):
        bins = self.env.get("_bins", None)
        keys = self.env.get("_keys", None)
        if bins is not None:
            return self.binned(arr, bins, keys)
        return self.plain(arr, operands=operands)

    def plain(self, arr, operands=None):
        """
        Plain aggregates
        """
        if self.op == "max":
            return max(arr)
        elif self.op == "min":
            return min(arr)
        elif self.op == "mean":
            return mean(arr)
        elif self.op == "sum":
            return sum(arr)
        elif self.op in ("count", "len"):
            return len(arr)
        elif self.op == "quantile":
            qt = operands[0] if len(operands) > 0 else 0.5
            interpolation = operands[1] if len(operands) > 1 else "linear"
            return quantile(arr, qt, interpolation=interpolation)
        raise ValueError(f'Aggregation "{self.op}" is not supported')

    def binned(self, arr, bins, keys):  # XXX operands ?
        if self.op == "sum":
            return bincount(bins, weights=arr)
        elif self.op in ("mean", "average"):
            res = bincount(bins, weights=arr)
            counts = bincount(bins)
            return res / counts
        elif self.op == "max":
            res = repeat(-inf, len(keys))
            maximum.at(res, bins, arr)
            return res
        elif self.op == "min":
            res = repeat(inf, len(keys))
            minimum.at(res, bins, arr)
            return res
        elif self.op == "last":
            res = repeat(None, len(keys))
            # Repeated index keep last value
            res[bins] = arr
            return res
        elif self.op == "first":
            res = repeat(None, len(keys))
            # Repeated (revsersed) index gives first value
            res[bins[::-1]] = arr[::-1]
            return res.astype(arr.dtype)
        elif self.op in ("count", "len"):
            return bincount(bins)

        raise ValueError(f'Aggregation "{self.op}" is not supported')


class Alias:
    """
    Simple wrapper that combine a value and an alias name
    """

    def __init__(self, value, name):
        self.value = value
        self.name = name


def tokenize(expr):
    lexer = shlex.shlex(expr)
    lexer.wordchars += ".!=<>:{}-"
    for i in lexer:
        yield Token(i)


def scan(tokens, end_tk=")"):
    res = []
    for tk in tokens:
        if tk.value == end_tk:
            return res
        elif tk.value == "(":
            res.append(scan(tokens))
        else:
            res.append(tk)

    tail = next(tokens, None)
    if tail:
        raise ValueError(f'Unexpected token: "{tail.value}"')
    return res


class AST:
    builtins = {
        "true": True,
        "false": False,
        "+": lambda *x: reduce(operator.add, x),
        "-": lambda *x: reduce(operator.sub, x),
        "*": lambda *x: reduce(operator.mul, x),
        "/": lambda *x: reduce(operator.truediv, x),
        "%": lambda *x: reduce(operator.mod, x),
        "and": lambda *x: reduce(operator.and_, x),
        "or": lambda *x: reduce(operator.or_, x),
        "<": lambda *x: reduce(operator.lt, x),
        "<=": lambda *x: reduce(operator.le, x),
        "=": lambda *x: reduce(operator.eq, x),
        "!=": lambda *x: reduce(operator.ne, x),
        ">=": lambda *x: reduce(operator.ge, x),
        ">": lambda *x: reduce(operator.gt, x),
        "~": lambda *xs: all(not x for x in xs),
        "in": lambda *x: x[0] in x[1],
        "list": lambda *x: list(x),
        "as": lambda *x: Alias(x[0], x[1]),
        "dict": list_to_dict,
        "kw": KWargs,
    }

    aggregates = {
        "min",
        "max",
        "sum",
        "first",
        "last",
        "mean",
        "average",
        "quantile",
        "count",
        "len",
    }

    def __init__(self, tokens):
        self.tokens = tokens

    @classmethod
    def parse(cls, expr):
        res = tokenize(expr)
        tokens = scan(res)[0]
        return AST(tokens)

    def eval(self, env=None):
        env = Env(env or {})
        if isinstance(self.tokens, Token):
            return self.tokens.eval(env)
        head, tail = self.tokens[0], self.tokens[1:]
        args = [AST(tk).eval(env) for tk in tail]

        # Split normal and kw args
        simple_args = []
        kw_args = {}
        for a in args:
            if isinstance(a, KWargs):
                kw_args.update(a.value)
            else:
                simple_args.append(a)

        fn = head.eval(env)
        return fn(*simple_args, **kw_args)

    def is_aggregate(self):
        for tk in self.extract_tokens():
            if tk.is_aggregate():
                return True
        return False

    def extract_tokens(self):
        """
        Flatten tree into a list of tokens
        """
        for tk in self.tokens:
            if isinstance(tk, Token):
                yield tk
            elif isinstance(tk, list):
                yield from tk
            else:
                raise ValueError(f"Unexpected token: {tk}")

Classes

class AST (tokens)
Expand source code
class AST:
    builtins = {
        "true": True,
        "false": False,
        "+": lambda *x: reduce(operator.add, x),
        "-": lambda *x: reduce(operator.sub, x),
        "*": lambda *x: reduce(operator.mul, x),
        "/": lambda *x: reduce(operator.truediv, x),
        "%": lambda *x: reduce(operator.mod, x),
        "and": lambda *x: reduce(operator.and_, x),
        "or": lambda *x: reduce(operator.or_, x),
        "<": lambda *x: reduce(operator.lt, x),
        "<=": lambda *x: reduce(operator.le, x),
        "=": lambda *x: reduce(operator.eq, x),
        "!=": lambda *x: reduce(operator.ne, x),
        ">=": lambda *x: reduce(operator.ge, x),
        ">": lambda *x: reduce(operator.gt, x),
        "~": lambda *xs: all(not x for x in xs),
        "in": lambda *x: x[0] in x[1],
        "list": lambda *x: list(x),
        "as": lambda *x: Alias(x[0], x[1]),
        "dict": list_to_dict,
        "kw": KWargs,
    }

    aggregates = {
        "min",
        "max",
        "sum",
        "first",
        "last",
        "mean",
        "average",
        "quantile",
        "count",
        "len",
    }

    def __init__(self, tokens):
        self.tokens = tokens

    @classmethod
    def parse(cls, expr):
        res = tokenize(expr)
        tokens = scan(res)[0]
        return AST(tokens)

    def eval(self, env=None):
        env = Env(env or {})
        if isinstance(self.tokens, Token):
            return self.tokens.eval(env)
        head, tail = self.tokens[0], self.tokens[1:]
        args = [AST(tk).eval(env) for tk in tail]

        # Split normal and kw args
        simple_args = []
        kw_args = {}
        for a in args:
            if isinstance(a, KWargs):
                kw_args.update(a.value)
            else:
                simple_args.append(a)

        fn = head.eval(env)
        return fn(*simple_args, **kw_args)

    def is_aggregate(self):
        for tk in self.extract_tokens():
            if tk.is_aggregate():
                return True
        return False

    def extract_tokens(self):
        """
        Flatten tree into a list of tokens
        """
        for tk in self.tokens:
            if isinstance(tk, Token):
                yield tk
            elif isinstance(tk, list):
                yield from tk
            else:
                raise ValueError(f"Unexpected token: {tk}")

Class variables

var aggregates
var builtins

Static methods

def parse(expr)
Expand source code
@classmethod
def parse(cls, expr):
    res = tokenize(expr)
    tokens = scan(res)[0]
    return AST(tokens)

Methods

def eval(self, env=None)
Expand source code
def eval(self, env=None):
    env = Env(env or {})
    if isinstance(self.tokens, Token):
        return self.tokens.eval(env)
    head, tail = self.tokens[0], self.tokens[1:]
    args = [AST(tk).eval(env) for tk in tail]

    # Split normal and kw args
    simple_args = []
    kw_args = {}
    for a in args:
        if isinstance(a, KWargs):
            kw_args.update(a.value)
        else:
            simple_args.append(a)

    fn = head.eval(env)
    return fn(*simple_args, **kw_args)
def extract_tokens(self)

Flatten tree into a list of tokens

Expand source code
def extract_tokens(self):
    """
    Flatten tree into a list of tokens
    """
    for tk in self.tokens:
        if isinstance(tk, Token):
            yield tk
        elif isinstance(tk, list):
            yield from tk
        else:
            raise ValueError(f"Unexpected token: {tk}")
def is_aggregate(self)
Expand source code
def is_aggregate(self):
    for tk in self.extract_tokens():
        if tk.is_aggregate():
            return True
    return False