Source code for onetick.py.core.column_operations.base

import warnings
from abc import ABC
from typing import Type

from onetick.py import types as ott
from onetick.py.core.column_operations import _methods

from onetick.py.core.column_operations._methods.op_types import (
    are_ints_not_time,
    are_time,
    are_floats,
    are_strings
)
from onetick.py.core.column_operations._methods.methods import DatetimeSubtractionWarning


[docs]class Expr: """ EP parameter's value can be set to an expression. Expressions are evaluated before parameters are actually passed to event processors. See also -------- :py:attr:`onetick.py.Operation.expr` """ def __init__(self, operation): self.operation = operation def __str__(self): return f'expr({self.operation})'
[docs]class Operation(ABC): """ :py:class:`~onetick.py.Source` column operation container. This is the object you get when applying most operations on :py:class:`~onetick.py.Column` or on other operations. Eventually you can add a new column using the operation you got or pass it as a parameter to some functions. Examples -------- >>> t = otp.Tick(A=1) >>> t['A'] Column(A, <class 'int'>) >>> t['A'] / 2 Operation((A) / (2)) >>> t['B'] = t['A'] / 2 >>> t['B'] Column(B, <class 'float'>) """ emulation_enabled = False def __init__(self, op_func=None, op_params=None, dtype=None, obj_ref=None, op_str=None): self._op_func = op_func self._op_params = op_params self.obj_ref = obj_ref self.__warnings = [] if op_func: if op_str: raise ValueError("You should specify either op_func or op_str") with warnings.catch_warnings(record=True) as warning_list: # we want to raise this warning only in some cases # that's why we're catching it and saving for later use warnings.simplefilter('always', category=DatetimeSubtractionWarning) op_str, dtype = self._evaluate_func() for w in warning_list: if w.category is DatetimeSubtractionWarning: self.__warnings.append(w) else: warnings.warn_explicit(w.message, w.category, w.filename, w.lineno) # save it for later check and reevaluate func if name was changed by prefix adding or renaming self._params_names = self._get_param_names() self._op_str = op_str self._dtype = dtype def __bool__(self): if Operation.emulation_enabled: # True is default for classes without overriden __bool__ return True raise TypeError('It is not allowed to use compare in if-else and while clauses') def __str__(self): if self._get_param_names() != self._params_names: self._evaluate_func(set_fields=True) return self.op_str def __repr__(self): return f"Operation({str(self)})" @property def dtype(self): """ Returns the type of the column or operation. See also -------- :py:meth:`Source.schema <onetick.py.Source.schema>` Examples -------- >>> t = otp.Tick(A=1, B=2.3, C='3') >>> t['TIMESTAMP'].dtype <class 'onetick.py.types.nsectime'> >>> t['A'].dtype <class 'int'> >>> t['B'].dtype <class 'float'> >>> t['C'].dtype <class 'str'> """ dtype = self._dtype if not dtype: op_str, dtype = self._evaluate_func(set_fields=True) return dtype @property def op_str(self): for w in self.__warnings: warnings.warn_explicit(w.message, w.category, w.filename, w.lineno) op_str = self._op_str if not op_str: op_str, dtype = self._evaluate_func(set_fields=True) return op_str @property def expr(self): """ Get expression to use in EP parameters. See also -------- :py:class:`~onetick.py.core.column_operations.base.Expr` """ return Expr(self)
[docs] def round(self, precision=None): """ Rounds input column with specified `precision`. Parameters ---------- precision: int Number from -12 to 12. Positive precision is precision after the floating point. Negative precision is precision before the floating point. See also -------- __round__ Examples -------- >>> t = otp.Tick(A=1234.5678) >>> t['B'] = t['A'].round() >>> t['C'] = t['A'].round(2) >>> t['D'] = t['A'].round(-2) >>> otp.run(t) Time A B C D 0 2003-12-01 1234.5678 1235 1234.57 1200.0 Returns ------- Operation """ return round(self, precision)
[docs] def map(self, arg, default=None): """ Map values of the column to new values according to the mapping in ``arg``. If the value is not in the mapping, it is set to the ``default`` value. If ``default`` value is not set, it is set to default value for the column type. Parameters ---------- arg: dict Mapping from old values to new values. All values must have the same type, compatible with the column type. default: simple value or Column or Operation Default value if no mapping is found in ``arg``. By default, it is set to default value for the column type. (0 for numbers, empty string for strings, etc.) Examples -------- >>> t = otp.Ticks(A=[1, 2, 3, 4, 5]) >>> t['B'] = t['A'].map({1: 10, 2: 20, 3: 30}) >>> otp.run(t) Time A B 0 2003-12-01 00:00:00.000 1 10 1 2003-12-01 00:00:00.001 2 20 2 2003-12-01 00:00:00.002 3 30 3 2003-12-01 00:00:00.003 4 0 4 2003-12-01 00:00:00.004 5 0 Example with ``default`` parameter set: >>> t = otp.Ticks(A=[1, 2, 3, 4, 5]) >>> t['B'] = t['A'].map({1: 10, 2: 20, 3: 30}, default=-1) >>> otp.run(t) Time A B 0 2003-12-01 00:00:00.000 1 10 1 2003-12-01 00:00:00.001 2 20 2 2003-12-01 00:00:00.002 3 30 3 2003-12-01 00:00:00.003 4 -1 4 2003-12-01 00:00:00.004 5 -1 Returns ------- Operation """ if not isinstance(arg, dict) or not arg: raise TypeError("map() argument must be a dict with keys and values to map") try: values_type = ott.get_type_by_objects(arg.values()) except TypeError as e: raise TypeError("map() argument must be a dict with same types for all values") from e if default is not None: try: default_type = ott.get_type_by_objects([default]) ott.get_type_by_objects([default_type, values_type]) except TypeError as e: raise TypeError( f"map() default value type {default_type} must be compatible with values type {values_type}" ) from e try: keys_type = ott.get_type_by_objects(arg.keys()) except TypeError as e: raise TypeError("map() argument must be a dict with same types for all keys") from e try: ott.get_type_by_objects([keys_type, self.dtype]) except TypeError as e: raise TypeError(f"map() keys type {keys_type} must be compatible with column type {self.dtype}") from e return _Operation(_methods._map, [self, arg, values_type, default])
[docs] def apply(self, lambda_f): """ Apply function or type to column Parameters ---------- lambda_f: type or callable if type - will convert column to requested type if callable - will translate python code to similar OneTick's CASE expression. There are some limitations to which python operators can be used in this callable. See :ref:`Python callables parsing guide <python callable parser>` article for details. In :ref:`Remote OTP with Ray<ray-remote>` any `Callable` must be decorated with `@otp.remote` decorator, see :ref:`Ray usage examples<apply-remote-context>` for details. Examples -------- Converting type of the column, e.g. string column to integer: >>> data = otp.Ticks({'A': ['1', '2', '3']}) >>> data['B'] = data['A'].apply(int) + 10 # OTdirective: snippet-name: column operations.type convertation; >>> otp.run(data) Time A B 0 2003-12-01 00:00:00.000 1 11 1 2003-12-01 00:00:00.001 2 12 2 2003-12-01 00:00:00.002 3 13 More complicated logic: >>> data = otp.Ticks({'A': [-321, 0, 123]}) >>> data['SIGN'] = data['A'].apply(lambda x: 1 if x > 0 else -1 if x < 0 else 0) >>> otp.run(data) Time A SIGN 0 2003-12-01 00:00:00.000 -321 -1 1 2003-12-01 00:00:00.001 0 0 2 2003-12-01 00:00:00.002 123 1 See also -------- :py:meth:`onetick.py.Source.apply` :ref:`Python callables parsing guide <python callable parser>` """ if isinstance(lambda_f, Type) and ott.is_type_basic(lambda_f): return self._convert_to(lambda_f) from onetick.py.core.lambda_object import apply_lambda return apply_lambda(lambda_f, self)
[docs] def astype(self, to_type): """ Alias for the :meth:`apply` method with type. See also -------- :meth:`apply` Examples -------- >>> data = otp.Tick(A=1, B=2.2, C='3.3') >>> data['A'] = data['A'].astype(str) + 'A' >>> data['B'] = data['B'].astype(int) + 1 >>> data['C'] = data['C'].astype(float) + 0.1 >>> otp.run(data) Time B A C 0 2003-12-01 3 1A 3.4 """ return self.apply(to_type)
[docs] def isin(self, *items): """ Check if column's value is in ``items``. Parameters ---------- items possible values Returns ------- Operation See also -------- :py:meth:`Source.__getitem__` Examples -------- >>> data = otp.Ticks(A=['a', 'b', 'c']) >>> data['B'] = data['A'].isin('a', 'c') >>> otp.run(data) Time A B 0 2003-12-01 00:00:00.000 a 1.0 1 2003-12-01 00:00:00.001 b 0.0 2 2003-12-01 00:00:00.002 c 1.0 Can be used as filter >>> data = otp.Ticks(A=[1, 2, 3, 0]) >>> yes, no = data[data["A"].isin(0, 1)] # OTdirective: snippet-name: column operations.is in.constant; >>> otp.run(yes)[["A"]] A 0 1 1 0 columns and expressions are also supported >>> # OTdirective: snippet-name: column operations.is in.from fields; >>> data = otp.Ticks(A=["ab", "cv", "bc", "a", "d"], B=["a", "c", "b", "a", "a"]) >>> yes, no = data[data["A"].isin(data["B"], data["B"] + "b")] >>> otp.run(yes)[["A", "B"]] A B 0 ab a 1 a a """ return _Operation(_methods.isin, [self, items])
[docs] def fillna(self, value): """ Fill :py:class:`~onetick.py.nan` values with ``value`` Parameters ---------- value: float, int value to use instead :py:class:`~onetick.py.nan` Examples -------- >>> data = otp.Ticks({'A': [1, otp.nan, 2]}) >>> data['A'] = data['A'].fillna(100) # OTdirective: snippet-name: column operations.fillna; >>> otp.run(data) Time A 0 2003-12-01 00:00:00.000 1.0 1 2003-12-01 00:00:00.001 100.0 2 2003-12-01 00:00:00.002 2.0 """ return _Operation(_methods.fillna, [self, value])
@property def str(self): """ Property that provide access to string accessors """ if issubclass(self.dtype, str): from onetick.py.core.column_operations.accessors.str_accessor import _StrAccessor return _StrAccessor(self) else: raise TypeError(".str accessor is available only for string type columns") @property def dt(self): """ Property that provide access to datetime accessors """ if issubclass(self.dtype, ott.nsectime) \ or issubclass(self.dtype, ott.msectime): from onetick.py.core.column_operations.accessors.dt_accessor import _DtAccessor return _DtAccessor(self) else: raise TypeError(".dt accessor is available only for datetime type columns") @property def float(self): """ Property that provide access to float accessors """ if issubclass(self.dtype, float) and self.dtype is not ott.decimal: from onetick.py.core.column_operations.accessors.float_accessor import _FloatAccessor return _FloatAccessor(self) else: raise TypeError(".float accessor is available only for float type columns") @property def decimal(self): """ Property that provide access to decimal accessors """ if self.dtype is ott.decimal: from onetick.py.core.column_operations.accessors.decimal_accessor import _DecimalAccessor return _DecimalAccessor(self) else: raise TypeError(".decimal accessor is available only for decimal type columns")
[docs] def __abs__(self): """ Return the absolute value of float or int column. Examples -------- >>> t = otp.Tick(A=-1, B=-2.3) >>> t['A'] = abs(t['A']) >>> t['B'] = abs(t['B']) >>> otp.run(t)[['A', 'B']] A B 0 1 2.3 """ return _Operation(_methods.abs, [self])
[docs] def __round__(self, precision=None): """ Rounds value with specified ``precision``. Parameters ---------- precision: int Number from -12 to 12. Positive precision is precision after the floating point. Negative precision is precision before the floating point. Examples -------- >>> t = otp.Tick(A=1234.5678) >>> t['B'] = round(t['A']) >>> t['C'] = round(t['A'], 2) >>> t['D'] = round(t['A'], -2) >>> otp.run(t) Time A B C D 0 2003-12-01 1234.5678 1235 1234.57 1200.0 Returns ------- Operation """ return _Operation(_methods.round, [self, precision])
def __pos__(self): # TODO: is it working in OneTick? return _Operation(_methods.pos, [self])
[docs] def __neg__(self): """ Return the negative value of float or int column. Examples -------- >>> t = otp.Tick(A=1, B=2.3) >>> t['A'] = -t['A'] >>> t['B'] = -t['B'] >>> otp.run(t)[['A', 'B']] A B 0 -1 -2.3 """ return _Operation(_methods.neg, [self])
[docs] def __add__(self, other): """ Return the sum of column and ``other`` value. Parameters ---------- other: int, float, str, :ref:`offset <datetime_offsets>`, :py:class:`onetick.py.Column` Examples -------- >>> t = otp.Tick(A=1, B=2.3, C='c', D=otp.datetime(2022, 5, 12)) >>> t['A'] = t['A'] + t['B'] >>> t['B'] = t['B'] + 1 >>> t['C'] = t['C'] + '_suffix' >>> t['D'] = t['D'] + otp.Day(1) >>> otp.run(t)[['A', 'B', 'C', 'D']] A B C D 0 3.3 3.3 c_suffix 2022-05-13 """ return _Operation(_methods.add, [self, other])
[docs] def __radd__(self, other): """ See also -------- __add__ Examples -------- >>> t = otp.Tick(A=1, B=2.3, C='c', D=otp.datetime(2022, 5, 12)) >>> t['A'] += t['B'] >>> t['B'] += 1 >>> t['C'] += '_suffix' >>> t['D'] += otp.Day(1) >>> otp.run(t)[['A', 'B', 'C', 'D']] A B C D 0 3.3 3.3 c_suffix 2022-05-13 """ return _Operation(_methods.add, [other, self])
[docs] def __sub__(self, other): """ Subtract ``other`` value from column. Parameters ---------- other: int, float, :ref:`offset <datetime_offsets>`, :py:class:`onetick.py.Column` Examples -------- >>> t = otp.Tick(A=1, B=2.3, D=otp.datetime(2022, 5, 12)) >>> t['A'] = t['A'] - t['B'] >>> t['B'] = t['B'] - 1 >>> t['D'] = t['D'] - otp.Day(1) >>> otp.run(t)[['A', 'B', 'D']] A B D 0 -1.3 1.3 2022-05-11 """ return _Operation(_methods.sub, [self, other])
[docs] def __rsub__(self, other): """ See also -------- __sub__ Examples -------- >>> t = otp.Tick(A=1, B=2.3, D=otp.datetime(2022, 5, 12)) >>> t['A'] -= t['B'] >>> t['B'] -= 1 >>> t['D'] -= otp.Day(1) >>> otp.run(t)[['A', 'B', 'D']] A B D 0 -1.3 1.3 2022-05-11 """ return _Operation(_methods.sub, [other, self])
[docs] def __mul__(self, other): """ Multiply column by ``other`` value. Parameters ---------- other: int, float, str, :py:class:`onetick.py.Column` Examples -------- >>> t = otp.Tick(A=1, B=2.3, C='c') >>> t['A'] = t['A'] * t['B'] >>> t['B'] = t['B'] * 2 >>> t['C'] = t['C'] * 3 >>> otp.run(t)[['A', 'B', 'C']] A B C 0 2.3 4.6 ccc """ return _Operation(_methods.mul, [self, other])
[docs] def __rmul__(self, other): """ See also -------- __mul__ Examples -------- >>> t = otp.Tick(A=1, B=2.3, C='c') >>> t['A'] *= t['B'] >>> t['B'] *= 2 >>> t['C'] *= 3 >>> otp.run(t)[['A', 'B', 'C']] A B C 0 2.3 4.6 ccc """ return _Operation(_methods.mul, [other, self])
[docs] def __truediv__(self, other): """ Divide column by ``other`` value. Parameters ---------- other: int, float, :py:class:`onetick.py.Column` Examples -------- >>> t = otp.Tick(A=1, B=2.3) >>> t['A'] = t['A'] / t['B'] >>> t['B'] = t['B'] / 2 >>> otp.run(t)[['A', 'B']] A B 0 0.434783 1.15 """ return _Operation(_methods.div, [self, other])
[docs] def __rtruediv__(self, other): """ See also -------- __truediv__ Examples -------- >>> t = otp.Tick(A=1, B=2.3) >>> t['A'] /= t['B'] >>> t['B'] /= 2 >>> otp.run(t)[['A', 'B']] A B 0 0.434783 1.15 """ return _Operation(_methods.div, [other, self])
[docs] def __mod__(self, other): """ Return modulo of division of int column by ``other`` value. Parameters ---------- other: int, :py:class:`onetick.py.Column` Examples -------- >>> t = otp.Tick(A=3, B=3) >>> t['A'] = t['A'] % t['B'] >>> t['B'] = t['B'] % 2 >>> otp.run(t)[['A', 'B']] A B 0 0 1 """ return _Operation(_methods.mod, [self, other])
[docs] def __invert__(self): """ Return inversion of filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[~(t['A'] > 1)] >>> otp.run(t)[['A']] A 0 0 1 1 """ result = _Operation(_methods.invert, [self]) return result
[docs] def __eq__(self, other): """ Return equality in filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[(t['A'] == 1)] >>> otp.run(t)[['A']] A 0 1 """ result = _Operation(_methods.eq, [self, other]) return result
[docs] def __ne__(self, other): """ Return inequality in filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[(t['A'] != 1)] >>> otp.run(t)[['A']] A 0 0 1 2 2 3 """ result = _Operation(_methods.ne, [self, other]) return result
[docs] def __or__(self, other): """ Return logical ``or`` in filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[(t['A'] == 1) | (t['A'] == 2)] >>> otp.run(t)[['A']] A 0 1 1 2 """ result = _Operation(_methods.or_, [self, other]) return result
[docs] def __and__(self, other): """ Return logical ``and`` in filter operation. Examples -------- >>> t = otp.Ticks(A=[1, 1], B=[1, 2]) >>> t, _ = t[(t['A'] == 1) & (t['B'] == 1)] >>> otp.run(t)[['A', 'B']] A B 0 1 1 """ result = _Operation(_methods.and_, [self, other]) return result
[docs] def __le__(self, other): """ Return <= in filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[t['A'] <= 2] >>> otp.run(t)[['A']] A 0 0 1 1 2 2 """ result = _Operation(_methods.le, [self, other]) return result
[docs] def __lt__(self, other): """ Return < in filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[t['A'] < 2] >>> otp.run(t)[['A']] A 0 0 1 1 """ result = _Operation(_methods.lt, [self, other]) return result
[docs] def __ge__(self, other): """ Return >= in filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[t['A'] >= 2] >>> otp.run(t)[['A']] A 0 2 1 3 """ result = _Operation(_methods.ge, [self, other]) return result
[docs] def __gt__(self, other): """ Return > in filter operation. Examples -------- >>> t = otp.Ticks(A=range(4)) >>> t, _ = t[t['A'] > 2] >>> otp.run(t)[['A']] A 0 3 """ result = _Operation(_methods.gt, [self, other]) return result
def _invalidate_cache(self): self._params_names = None if self._op_params: for op in self._op_params: if isinstance(op, _Operation): op._invalidate_cache() def _evaluate_func(self, *, set_fields=False): if self._op_func: op_str, dtype = self._op_func(*self._op_params) if self._op_params else self._op_func() if set_fields: self._params_names = self._get_param_names() self._op_str = op_str self._dtype = dtype return op_str, dtype def _get_param_names(self): return [str(param) for param in self._op_params] if self._op_params else [] def _convert_to(self, to_type): return _Operation(_methods.CONVERSIONS[self.dtype, to_type], [self]) def _make_python_way_bool_expression(self): dtype = ott.get_object_type(self) if dtype is bool: return self if are_ints_not_time(dtype): self = _Operation(_methods.ne, (self, 0)) elif are_time(dtype): self = _Operation(_methods.ne, (self._convert_to(int), 0)) elif are_floats(dtype): self = _Operation(_methods.ne, (self, 0.0)) elif are_strings(dtype): self = _Operation(_methods.ne, (self, "")) else: raise TypeError("Filter expression should return bool, int, float or string") return self
_Operation = Operation # alias to support backward compatibility
[docs]class Raw(Operation): """ Data type representing raw OneTick expression. Examples -------- >>> t = otp.Tick(A=1) >>> t['A'] = '_TIMEZONE' >>> t['B'] = otp.raw('_TIMEZONE', dtype=str) >>> t(timezone='Asia/Yerevan') Time A B 0 2003-12-01 _TIMEZONE Asia/Yerevan """ def __init__(self, raw, dtype): if dtype is str: warnings.warn( f'Be careful, default string length in OneTick is {ott.string.DEFAULT_LENGTH}.' "Length of the result raw expression can't be calculated automatically, " "so you'd better use onetick.py.string type." ) super().__init__(op_str=raw, dtype=dtype)