import inspect
from .column_operations.base import _Operation
from .. import types as ott
[docs]class Column(_Operation):
"""
:py:class:`~onetick.py.Source` column container.
This is the object you get when using :py:meth:`~onetick.py.Source.__getitem__`.
You can use this object everywhere where :py:class:`~onetick.py.Operation` object can be used.
Examples
--------
>>> t = otp.Tick(A=1)
>>> t['A']
Column(A, <class 'int'>)
"""
def __init__(self, name, dtype=float, obj_ref=None, to_str_func=None, precision=None,
pre_format=None, post_format=None):
if not dtype or not inspect.isclass(dtype) or not ott.is_type_supported(dtype):
raise TypeError(f'Column does not support "{dtype}" type')
self.name = name
super().__init__(dtype=dtype, obj_ref=obj_ref, op_str=name)
if to_str_func:
self.to_str_func = to_str_func
else:
self.to_str_func = self.__str__
self.pre_format = None # TODO: not used yet, but going to be used in
self.post_format = None # PY-35
# optional properties
if precision is not None:
if issubclass(dtype, float):
self._precision = precision
else:
raise ValueError("precision is supported only for columns with float or decimal dtypes")
def rename(self, new_name, update_parent_object=True):
if self.obj_ref and update_parent_object:
self.obj_ref.rename({self.name: new_name}, inplace=True)
self.name = new_name
def __len__(self):
if issubclass(self.dtype, str):
if issubclass(self.dtype, ott.string):
return self.dtype.length
else:
return ott.string.DEFAULT_LENGTH
else:
raise TypeError(f'It is not applicable for the column with type {self.dtype}') # TODO: test
def __hash__(self):
return hash(self.name)
def __str__(self):
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
if self.obj_ref:
result = ""
if isinstance(self.obj_ref, _SymbolParamSource):
# TODO: PY-35
# This is ad-hoc check, really we need to change column formatting to
# pre- and post-formats, and copy columns through the .copy() method
# on the _Column instead of copying them manually in different places
# of the _Source class
if self.name != "_SYMBOL_NAME":
result = "_SYMBOL_PARAM."
if self.obj_ref.use_name_for_column_prefix():
if self.obj_ref.node_name().strip() == "":
raise Exception("You set to use name for column prefix, but name is empty")
result = self.obj_ref.node_name() + "."
result += self.name
if isinstance(self.obj_ref, _SymbolParamSource):
# symbol params always are string, need to convert
if self.dtype is float:
result = f"atof({result})"
elif self.dtype is int:
result = f"atol({result})"
elif self.dtype is ott.msectime:
result = f"msec_str_to_nsectime({result})"
elif self.dtype is ott.nsectime:
result = f"msec_str_to_nsectime({result})"
else:
result = self.name
return result
def __repr__(self):
return f"Column({str(self)}, {self.dtype})"
def copy(self, obj_ref=None):
return _Column(self.name, self.dtype, obj_ref, pre_format=self.pre_format, post_format=self.post_format)
def __bool__(self):
if _Column.emulation_enabled:
if issubclass(self.dtype, int):
return (self != 0).__bool__()
if issubclass(self.dtype, float):
return (self != 0).__bool__()
if issubclass(self.dtype, str):
return (self != "").__bool__()
raise TypeError("It is not allowed to use columns in if-else and while clauses")
[docs] def __getitem__(self, item):
"""
Provides an ability to get values from future or past ticks.
- Negative values refer to past ticks
- Zero to current tick
- Positive - future ticks
Boundary values will be defaulted. For instance for ``item=-1`` first tick value will be defaulted
(there is no tick before first tick)
Parameters
----------
item: int
number of ticks to look back/forward
Returns
-------
Operation
Examples
--------
>>> data = otp.Ticks({'A': [1, 2, 3]})
>>> data['PAST1'] = data['A'][-1]
>>> data['PAST2'] = data['A'][-2]
>>> data['FUTURE1'] = data['A'][1]
>>> data['FUTURE2'] = data['A'][2]
>>> otp.run(data)
Time A PAST1 PAST2 FUTURE1 FUTURE2
0 2003-12-01 00:00:00.000 1 0 0 2 3
1 2003-12-01 00:00:00.001 2 1 0 3 0
2 2003-12-01 00:00:00.002 3 2 1 0 0
"""
if not isinstance(item, int):
raise TypeError(
"Lag operation supports only integer const values," f" but passed value of type '{type(item)}'"
)
if item == 0:
return self
return _LagOperator(self, item)
[docs] def cumsum(self):
"""
Cumulative sum of the column.
Can only be used when creating or updating column.
Examples
--------
>>> t = otp.Ticks({'A': [1, 2, 3]})
>>> t['X'] = t['A'].cumsum()
>>> otp.run(t)
Time A X
0 2003-12-01 00:00:00.000 1 1.0
1 2003-12-01 00:00:00.001 2 3.0
2 2003-12-01 00:00:00.002 3 6.0
"""
import onetick.py as otp
return _ColumnAggregation(
otp.agg.sum(self.name, running=True, all_fields=True, overwrite_output_field=True)
)
def __iter__(self):
raise TypeError("It is not allowed to use columns in for-clauses")
class _LagOperator(_Operation):
"""
Implements referencing to the prior tick
"""
def __init__(self, base_column, inx):
self._inx = inx
op_str = f"{str(base_column)}[{self.index}]"
super().__init__(op_params=[base_column], dtype=base_column.dtype,
op_str=op_str, obj_ref=base_column.obj_ref)
@property
def index(self):
return self._inx
class _ColumnAggregation:
"""
Object to specify how column will be aggregated.
"""
def __init__(self, aggregation):
from ..aggregations._base import _Aggregation
if not isinstance(aggregation, _Aggregation):
raise ValueError(f'Expected aggregation object, got {type(aggregation)}')
if not aggregation.running or not aggregation.all_fields or not aggregation.overwrite_output_field:
raise ValueError("Column aggregations only support 'running' aggregations"
" with 'all_fields' and 'overwrite_output_field' parameters set")
self.aggregation = aggregation
def apply(self, src, name):
return self.aggregation.apply(src, name=name, inplace=True)
_Column = Column # alias for backward compatibility