import warnings
from abc import ABC
from typing import Type
from onetick.py import types as ott
from onetick.py.core.column_operations import _methods
from onetick.py.core.column_operations._methods.op_types import (
are_ints_not_time,
are_time,
are_floats,
are_strings
)
from onetick.py.core.column_operations._methods.methods import DatetimeSubtractionWarning
[docs]class Expr:
"""
EP parameter's value can be set to an expression.
Expressions are evaluated before parameters are actually passed to event processors.
See also
--------
:py:attr:`onetick.py.Operation.expr`
"""
def __init__(self, operation):
self.operation = operation
def __str__(self):
return f'expr({self.operation})'
[docs]class Operation(ABC):
"""
:py:class:`~onetick.py.Source` column operation container.
This is the object you get when applying most operations on :py:class:`~onetick.py.Column`
or on other operations.
Eventually you can add a new column using the operation you got or pass it as a parameter
to some functions.
Examples
--------
>>> t = otp.Tick(A=1)
>>> t['A']
Column(A, <class 'int'>)
>>> t['A'] / 2
Operation((A) / (2))
>>> t['B'] = t['A'] / 2
>>> t['B']
Column(B, <class 'float'>)
"""
emulation_enabled = False
def __init__(self, op_func=None, op_params=None, dtype=None, obj_ref=None, op_str=None):
self._op_func = op_func
self._op_params = op_params
self.obj_ref = obj_ref
self.__warnings = []
if op_func:
if op_str:
raise ValueError("You should specify either op_func or op_str")
with warnings.catch_warnings(record=True) as warning_list:
# we want to raise this warning only in some cases
# that's why we're catching it and saving for later use
warnings.simplefilter('always', category=DatetimeSubtractionWarning)
op_str, dtype = self._evaluate_func()
for w in warning_list:
if w.category is DatetimeSubtractionWarning:
self.__warnings.append(w)
else:
warnings.warn_explicit(w.message, w.category, w.filename, w.lineno)
# save it for later check and reevaluate func if name was changed by prefix adding or renaming
self._params_names = self._get_param_names()
self._op_str = op_str
self._dtype = dtype
def __bool__(self):
if Operation.emulation_enabled:
# True is default for classes without overriden __bool__
return True
raise TypeError('It is not allowed to use compare in if-else and while clauses')
def __str__(self):
if self._get_param_names() != self._params_names:
self._evaluate_func(set_fields=True)
return self.op_str
def __repr__(self):
return f"Operation({str(self)})"
@property
def dtype(self):
"""
Returns the type of the column or operation.
See also
--------
:py:meth:`Source.schema <onetick.py.Source.schema>`
Examples
--------
>>> t = otp.Tick(A=1, B=2.3, C='3')
>>> t['TIMESTAMP'].dtype
<class 'onetick.py.types.nsectime'>
>>> t['A'].dtype
<class 'int'>
>>> t['B'].dtype
<class 'float'>
>>> t['C'].dtype
<class 'str'>
"""
dtype = self._dtype
if not dtype:
op_str, dtype = self._evaluate_func(set_fields=True)
return dtype
@property
def op_str(self):
for w in self.__warnings:
warnings.warn_explicit(w.message, w.category, w.filename, w.lineno)
op_str = self._op_str
if not op_str:
op_str, dtype = self._evaluate_func(set_fields=True)
return op_str
@property
def expr(self):
"""
Get expression to use in EP parameters.
See also
--------
:py:class:`~onetick.py.core.column_operations.base.Expr`
"""
return Expr(self)
[docs] def round(self, precision=None):
"""
Rounds input column with specified `precision`.
Parameters
----------
precision: int
Number from -12 to 12.
Positive precision is precision after the floating point.
Negative precision is precision before the floating point.
See also
--------
__round__
Examples
--------
>>> t = otp.Tick(A=1234.5678)
>>> t['B'] = t['A'].round()
>>> t['C'] = t['A'].round(2)
>>> t['D'] = t['A'].round(-2)
>>> otp.run(t)
Time A B C D
0 2003-12-01 1234.5678 1235 1234.57 1200.0
Returns
-------
Operation
"""
return round(self, precision)
[docs] def map(self, arg, default=None):
"""
Map values of the column to new values according to the mapping in ``arg``.
If the value is not in the mapping, it is set to the ``default`` value.
If ``default`` value is not set, it is set to default value for the column type.
Parameters
----------
arg: dict
Mapping from old values to new values.
All values must have the same type, compatible with the column type.
default: simple value or Column or Operation
Default value if no mapping is found in ``arg``.
By default, it is set to default value for the column type.
(0 for numbers, empty string for strings, etc.)
Examples
--------
>>> t = otp.Ticks(A=[1, 2, 3, 4, 5])
>>> t['B'] = t['A'].map({1: 10, 2: 20, 3: 30})
>>> otp.run(t)
Time A B
0 2003-12-01 00:00:00.000 1 10
1 2003-12-01 00:00:00.001 2 20
2 2003-12-01 00:00:00.002 3 30
3 2003-12-01 00:00:00.003 4 0
4 2003-12-01 00:00:00.004 5 0
Example with ``default`` parameter set:
>>> t = otp.Ticks(A=[1, 2, 3, 4, 5])
>>> t['B'] = t['A'].map({1: 10, 2: 20, 3: 30}, default=-1)
>>> otp.run(t)
Time A B
0 2003-12-01 00:00:00.000 1 10
1 2003-12-01 00:00:00.001 2 20
2 2003-12-01 00:00:00.002 3 30
3 2003-12-01 00:00:00.003 4 -1
4 2003-12-01 00:00:00.004 5 -1
Returns
-------
Operation
"""
if not isinstance(arg, dict) or not arg:
raise TypeError("map() argument must be a dict with keys and values to map")
try:
values_type = ott.get_type_by_objects(arg.values())
except TypeError as e:
raise TypeError("map() argument must be a dict with same types for all values") from e
if default is not None:
try:
default_type = ott.get_type_by_objects([default])
ott.get_type_by_objects([default_type, values_type])
except TypeError as e:
raise TypeError(
f"map() default value type {default_type} must be compatible with values type {values_type}"
) from e
try:
keys_type = ott.get_type_by_objects(arg.keys())
except TypeError as e:
raise TypeError("map() argument must be a dict with same types for all keys") from e
try:
ott.get_type_by_objects([keys_type, self.dtype])
except TypeError as e:
raise TypeError(f"map() keys type {keys_type} must be compatible with column type {self.dtype}") from e
return _Operation(_methods._map, [self, arg, values_type, default])
[docs] def apply(self, lambda_f):
"""
Apply function or type to column
Parameters
----------
lambda_f: type or callable
if type - will convert column to requested type
if callable - will translate python code to similar OneTick's CASE expression.
There are some limitations to which python operators can be used in this callable.
See :ref:`Python callables parsing guide <python callable parser>` article for details.
In :ref:`Remote OTP with Ray<ray-remote>` any `Callable` must be decorated with `@otp.remote` decorator,
see :ref:`Ray usage examples<apply-remote-context>` for details.
Examples
--------
Converting type of the column, e.g. string column to integer:
>>> data = otp.Ticks({'A': ['1', '2', '3']})
>>> data['B'] = data['A'].apply(int) + 10 # OTdirective: snippet-name: column operations.type convertation;
>>> otp.run(data)
Time A B
0 2003-12-01 00:00:00.000 1 11
1 2003-12-01 00:00:00.001 2 12
2 2003-12-01 00:00:00.002 3 13
More complicated logic:
>>> data = otp.Ticks({'A': [-321, 0, 123]})
>>> data['SIGN'] = data['A'].apply(lambda x: 1 if x > 0 else -1 if x < 0 else 0)
>>> otp.run(data)
Time A SIGN
0 2003-12-01 00:00:00.000 -321 -1
1 2003-12-01 00:00:00.001 0 0
2 2003-12-01 00:00:00.002 123 1
See also
--------
:py:meth:`onetick.py.Source.apply`
:ref:`Python callables parsing guide <python callable parser>`
"""
if isinstance(lambda_f, Type) and ott.is_type_basic(lambda_f):
return self._convert_to(lambda_f)
from onetick.py.core.lambda_object import apply_lambda
return apply_lambda(lambda_f, self)
[docs] def astype(self, to_type):
"""
Alias for the :meth:`apply` method with type.
See also
--------
:meth:`apply`
Examples
--------
>>> data = otp.Tick(A=1, B=2.2, C='3.3')
>>> data['A'] = data['A'].astype(str) + 'A'
>>> data['B'] = data['B'].astype(int) + 1
>>> data['C'] = data['C'].astype(float) + 0.1
>>> otp.run(data)
Time B A C
0 2003-12-01 3 1A 3.4
"""
return self.apply(to_type)
[docs] def isin(self, *items):
"""
Check if column's value is in ``items``.
Parameters
----------
items
possible values
Returns
-------
Operation
See also
--------
:py:meth:`Source.__getitem__`
Examples
--------
>>> data = otp.Ticks(A=['a', 'b', 'c'])
>>> data['B'] = data['A'].isin('a', 'c')
>>> otp.run(data)
Time A B
0 2003-12-01 00:00:00.000 a 1.0
1 2003-12-01 00:00:00.001 b 0.0
2 2003-12-01 00:00:00.002 c 1.0
Can be used as filter
>>> data = otp.Ticks(A=[1, 2, 3, 0])
>>> yes, no = data[data["A"].isin(0, 1)] # OTdirective: snippet-name: column operations.is in.constant;
>>> otp.run(yes)[["A"]]
A
0 1
1 0
columns and expressions are also supported
>>> # OTdirective: snippet-name: column operations.is in.from fields;
>>> data = otp.Ticks(A=["ab", "cv", "bc", "a", "d"], B=["a", "c", "b", "a", "a"])
>>> yes, no = data[data["A"].isin(data["B"], data["B"] + "b")]
>>> otp.run(yes)[["A", "B"]]
A B
0 ab a
1 a a
"""
return _Operation(_methods.isin, [self, items])
[docs] def fillna(self, value):
"""
Fill :py:class:`~onetick.py.nan` values with ``value``
Parameters
----------
value: float, int
value to use instead :py:class:`~onetick.py.nan`
Examples
--------
>>> data = otp.Ticks({'A': [1, otp.nan, 2]})
>>> data['A'] = data['A'].fillna(100) # OTdirective: snippet-name: column operations.fillna;
>>> otp.run(data)
Time A
0 2003-12-01 00:00:00.000 1.0
1 2003-12-01 00:00:00.001 100.0
2 2003-12-01 00:00:00.002 2.0
"""
return _Operation(_methods.fillna, [self, value])
@property
def str(self):
"""
Property that provides access to methods specific to string types.
See also
--------
:py:class:`otp.string <onetick.py.types.string>`
"""
if issubclass(self.dtype, str):
from onetick.py.core.column_operations.accessors.str_accessor import _StrAccessor
return _StrAccessor(self)
else:
raise TypeError(".str accessor is available only for string type columns")
@property
def dt(self):
"""
Property that provides access to methods specific to datetime types.
See also
--------
:py:class:`otp.nsectime <onetick.py.types.nsectime>`
:py:class:`otp.msectime <onetick.py.types.msectime>`
"""
if issubclass(self.dtype, ott.nsectime) \
or issubclass(self.dtype, ott.msectime):
from onetick.py.core.column_operations.accessors.dt_accessor import _DtAccessor
return _DtAccessor(self)
else:
raise TypeError(".dt accessor is available only for datetime type columns")
@property
def float(self):
"""
Property that provides access to
methods specific to float type.
"""
if issubclass(self.dtype, float) and self.dtype is not ott.decimal:
from onetick.py.core.column_operations.accessors.float_accessor import _FloatAccessor
return _FloatAccessor(self)
else:
raise TypeError(".float accessor is available only for float type columns")
@property
def decimal(self):
"""
Property that provides access to methods specific to decimal type.
See also
--------
:py:class:`otp.decimal <onetick.py.types.decimal>`
"""
if self.dtype is ott.decimal:
from onetick.py.core.column_operations.accessors.decimal_accessor import _DecimalAccessor
return _DecimalAccessor(self)
else:
raise TypeError(".decimal accessor is available only for decimal type columns")
[docs] def __abs__(self):
"""
Return the absolute value of float or int column.
Examples
--------
>>> t = otp.Tick(A=-1, B=-2.3)
>>> t['A'] = abs(t['A'])
>>> t['B'] = abs(t['B'])
>>> otp.run(t)[['A', 'B']]
A B
0 1 2.3
"""
return _Operation(_methods.abs, [self])
[docs] def __round__(self, precision=None):
"""
Rounds value with specified ``precision``.
Parameters
----------
precision: int
Number from -12 to 12.
Positive precision is precision after the floating point.
Negative precision is precision before the floating point.
Examples
--------
>>> t = otp.Tick(A=1234.5678)
>>> t['B'] = round(t['A'])
>>> t['C'] = round(t['A'], 2)
>>> t['D'] = round(t['A'], -2)
>>> otp.run(t)
Time A B C D
0 2003-12-01 1234.5678 1235 1234.57 1200.0
Returns
-------
Operation
"""
return _Operation(_methods.round, [self, precision])
def __pos__(self):
# TODO: is it working in OneTick?
return _Operation(_methods.pos, [self])
[docs] def __neg__(self):
"""
Return the negative value of float or int column.
Examples
--------
>>> t = otp.Tick(A=1, B=2.3)
>>> t['A'] = -t['A']
>>> t['B'] = -t['B']
>>> otp.run(t)[['A', 'B']]
A B
0 -1 -2.3
"""
return _Operation(_methods.neg, [self])
[docs] def __add__(self, other):
"""
Return the sum of column and ``other`` value.
Parameters
----------
other: int, float, str, :ref:`offset <datetime_offsets>`, :py:class:`onetick.py.Column`
Examples
--------
>>> t = otp.Tick(A=1, B=2.3, C='c', D=otp.datetime(2022, 5, 12))
>>> t['A'] = t['A'] + t['B']
>>> t['B'] = t['B'] + 1
>>> t['C'] = t['C'] + '_suffix'
>>> t['D'] = t['D'] + otp.Day(1)
>>> otp.run(t)[['A', 'B', 'C', 'D']]
A B C D
0 3.3 3.3 c_suffix 2022-05-13
"""
return _Operation(_methods.add, [self, other])
[docs] def __radd__(self, other):
"""
See also
--------
__add__
Examples
--------
>>> t = otp.Tick(A=1, B=2.3, C='c', D=otp.datetime(2022, 5, 12))
>>> t['A'] += t['B']
>>> t['B'] += 1
>>> t['C'] += '_suffix'
>>> t['D'] += otp.Day(1)
>>> otp.run(t)[['A', 'B', 'C', 'D']]
A B C D
0 3.3 3.3 c_suffix 2022-05-13
"""
return _Operation(_methods.add, [other, self])
[docs] def __sub__(self, other):
"""
Subtract ``other`` value from column.
Parameters
----------
other: int, float, :ref:`offset <datetime_offsets>`, :py:class:`onetick.py.Column`
Examples
--------
>>> t = otp.Tick(A=1, B=2.3, D=otp.datetime(2022, 5, 12))
>>> t['A'] = t['A'] - t['B']
>>> t['B'] = t['B'] - 1
>>> t['D'] = t['D'] - otp.Day(1)
>>> otp.run(t)[['A', 'B', 'D']]
A B D
0 -1.3 1.3 2022-05-11
"""
return _Operation(_methods.sub, [self, other])
[docs] def __rsub__(self, other):
"""
See also
--------
__sub__
Examples
--------
>>> t = otp.Tick(A=1, B=2.3, D=otp.datetime(2022, 5, 12))
>>> t['A'] -= t['B']
>>> t['B'] -= 1
>>> t['D'] -= otp.Day(1)
>>> otp.run(t)[['A', 'B', 'D']]
A B D
0 -1.3 1.3 2022-05-11
"""
return _Operation(_methods.sub, [other, self])
[docs] def __mul__(self, other):
"""
Multiply column by ``other`` value.
Parameters
----------
other: int, float, str, :py:class:`onetick.py.Column`
Examples
--------
>>> t = otp.Tick(A=1, B=2.3, C='c')
>>> t['A'] = t['A'] * t['B']
>>> t['B'] = t['B'] * 2
>>> t['C'] = t['C'] * 3
>>> otp.run(t)[['A', 'B', 'C']]
A B C
0 2.3 4.6 ccc
"""
return _Operation(_methods.mul, [self, other])
[docs] def __rmul__(self, other):
"""
See also
--------
__mul__
Examples
--------
>>> t = otp.Tick(A=1, B=2.3, C='c')
>>> t['A'] *= t['B']
>>> t['B'] *= 2
>>> t['C'] *= 3
>>> otp.run(t)[['A', 'B', 'C']]
A B C
0 2.3 4.6 ccc
"""
return _Operation(_methods.mul, [other, self])
[docs] def __truediv__(self, other):
"""
Divide column by ``other`` value.
Parameters
----------
other: int, float, :py:class:`onetick.py.Column`
Examples
--------
>>> t = otp.Tick(A=1, B=2.3)
>>> t['A'] = t['A'] / t['B']
>>> t['B'] = t['B'] / 2
>>> otp.run(t)[['A', 'B']]
A B
0 0.434783 1.15
"""
return _Operation(_methods.div, [self, other])
[docs] def __rtruediv__(self, other):
"""
See also
--------
__truediv__
Examples
--------
>>> t = otp.Tick(A=1, B=2.3)
>>> t['A'] /= t['B']
>>> t['B'] /= 2
>>> otp.run(t)[['A', 'B']]
A B
0 0.434783 1.15
"""
return _Operation(_methods.div, [other, self])
[docs] def __mod__(self, other):
"""
Return modulo of division of int column by ``other`` value.
Parameters
----------
other: int, :py:class:`onetick.py.Column`
Examples
--------
>>> t = otp.Tick(A=3, B=3)
>>> t['A'] = t['A'] % t['B']
>>> t['B'] = t['B'] % 2
>>> otp.run(t)[['A', 'B']]
A B
0 0 1
"""
return _Operation(_methods.mod, [self, other])
[docs] def __invert__(self):
"""
Return inversion of filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[~(t['A'] > 1)]
>>> otp.run(t)[['A']]
A
0 0
1 1
"""
result = _Operation(_methods.invert, [self])
return result
[docs] def __eq__(self, other):
"""
Return equality in filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[(t['A'] == 1)]
>>> otp.run(t)[['A']]
A
0 1
"""
result = _Operation(_methods.eq, [self, other])
return result
[docs] def __ne__(self, other):
"""
Return inequality in filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[(t['A'] != 1)]
>>> otp.run(t)[['A']]
A
0 0
1 2
2 3
"""
result = _Operation(_methods.ne, [self, other])
return result
[docs] def __or__(self, other):
"""
Return logical ``or`` in filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[(t['A'] == 1) | (t['A'] == 2)]
>>> otp.run(t)[['A']]
A
0 1
1 2
"""
result = _Operation(_methods.or_, [self, other])
return result
[docs] def __and__(self, other):
"""
Return logical ``and`` in filter operation.
Examples
--------
>>> t = otp.Ticks(A=[1, 1], B=[1, 2])
>>> t, _ = t[(t['A'] == 1) & (t['B'] == 1)]
>>> otp.run(t)[['A', 'B']]
A B
0 1 1
"""
result = _Operation(_methods.and_, [self, other])
return result
[docs] def __le__(self, other):
"""
Return <= in filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[t['A'] <= 2]
>>> otp.run(t)[['A']]
A
0 0
1 1
2 2
"""
result = _Operation(_methods.le, [self, other])
return result
[docs] def __lt__(self, other):
"""
Return < in filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[t['A'] < 2]
>>> otp.run(t)[['A']]
A
0 0
1 1
"""
result = _Operation(_methods.lt, [self, other])
return result
[docs] def __ge__(self, other):
"""
Return >= in filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[t['A'] >= 2]
>>> otp.run(t)[['A']]
A
0 2
1 3
"""
result = _Operation(_methods.ge, [self, other])
return result
[docs] def __gt__(self, other):
"""
Return > in filter operation.
Examples
--------
>>> t = otp.Ticks(A=range(4))
>>> t, _ = t[t['A'] > 2]
>>> otp.run(t)[['A']]
A
0 3
"""
result = _Operation(_methods.gt, [self, other])
return result
def _invalidate_cache(self):
self._params_names = None
if self._op_params:
for op in self._op_params:
if isinstance(op, _Operation):
op._invalidate_cache()
def _evaluate_func(self, *, set_fields=False):
if self._op_func:
op_str, dtype = self._op_func(*self._op_params) if self._op_params else self._op_func()
if set_fields:
self._params_names = self._get_param_names()
self._op_str = op_str
self._dtype = dtype
return op_str, dtype
def _get_param_names(self):
return [str(param) for param in self._op_params] if self._op_params else []
def _convert_to(self, to_type):
return _Operation(_methods.CONVERSIONS[self.dtype, to_type], [self])
def _make_python_way_bool_expression(self):
dtype = ott.get_object_type(self)
if dtype is bool:
return self
if are_ints_not_time(dtype):
self = _Operation(_methods.ne, (self, 0))
elif are_time(dtype):
self = _Operation(_methods.ne, (self._convert_to(int), 0))
elif are_floats(dtype):
self = _Operation(_methods.ne, (self, 0.0))
elif are_strings(dtype):
self = _Operation(_methods.ne, (self, ""))
else:
raise TypeError("Filter expression should return bool, int, float or string")
return self
_Operation = Operation # alias to support backward compatibility
[docs]class Raw(Operation):
"""
Data type representing raw OneTick expression.
Examples
--------
>>> t = otp.Tick(A=1)
>>> t['A'] = '_TIMEZONE'
>>> t['B'] = otp.raw('_TIMEZONE', dtype=str)
>>> t(timezone='Asia/Yerevan')
Time A B
0 2003-12-01 _TIMEZONE Asia/Yerevan
"""
def __init__(self, raw, dtype):
if dtype is str:
warnings.warn(
f'Be careful, default string length in OneTick is {ott.string.DEFAULT_LENGTH}.'
"Length of the result raw expression can't be calculated automatically, "
"so you'd better use onetick.py.string type."
)
super().__init__(op_str=raw, dtype=dtype)