Source code for onetick.py.core.column
import inspect
from .column_operations.base import _Operation
from .. import types as ott
[docs]class Column(_Operation):
    """
    :py:class:`~onetick.py.Source` column container.
    This is the object you get when using :py:meth:`~onetick.py.Source.__getitem__`.
    You can use this object everywhere where :py:class:`~onetick.py.Operation` object can be used.
    Examples
    --------
    >>> t = otp.Tick(A=1)
    >>> t['A']
    Column(A, <class 'int'>)
    """
    def __init__(self, name, dtype=float, obj_ref=None, precision=None):
        if not dtype or not inspect.isclass(dtype) or not ott.is_type_supported(dtype):
            raise TypeError(f'Column does not support "{dtype}" type')
        self.name = name
        super().__init__(dtype=dtype, obj_ref=obj_ref, op_str=name)
        # optional properties
        if precision is not None:
            if issubclass(dtype, float):
                self._precision = precision
            else:
                raise ValueError("precision is supported only for columns with float or decimal dtypes")
    def rename(self, new_name, update_parent_object=True):
        if self.obj_ref and update_parent_object:
            self.obj_ref.rename({self.name: new_name}, inplace=True)
        self.name = new_name
    def __len__(self):
        if issubclass(self.dtype, str):
            if issubclass(self.dtype, ott.string):
                return self.dtype.length
            else:
                return ott.string.DEFAULT_LENGTH
        else:
            raise TypeError(f'It is not applicable for the column with type {self.dtype}')  # TODO: test
    def __hash__(self):
        return hash(self.name)
    def __str__(self):
        if not self.obj_ref:
            return self.name
        result = ''
        if self.obj_ref.use_name_for_column_prefix():
            if self.obj_ref.node_name().strip() == '':
                raise Exception('You set to use name for column prefix, but name is empty')
            result = self.obj_ref.node_name() + '.'
        result += self.name
        return result
    def __repr__(self):
        return f"Column({str(self)}, {self.dtype})"
    def copy(self, obj_ref=None):
        return _Column(self.name, self.dtype, obj_ref)
    def __bool__(self):
        if _Column.emulation_enabled:
            if issubclass(self.dtype, int):
                return (self != 0).__bool__()
            if issubclass(self.dtype, float):
                return (self != 0).__bool__()
            if issubclass(self.dtype, str):
                return (self != "").__bool__()
        raise TypeError("It is not allowed to use columns in if-else and while clauses")
[docs]    def __getitem__(self, item):
        """
        Provides an ability to get values from future or past ticks.
        - Negative values refer to past ticks
        - Zero to current tick
        - Positive - future ticks
        Boundary values will be defaulted. For instance for ``item=-1`` first tick value will be defaulted
        (there is no tick before first tick)
        Parameters
        ----------
        item: int
            number of ticks to look back/forward
        Returns
        -------
        Operation
        Examples
        --------
        >>> data = otp.Ticks({'A': [1, 2, 3]})
        >>> data['PAST1'] = data['A'][-1]
        >>> data['PAST2'] = data['A'][-2]
        >>> data['FUTURE1'] = data['A'][1]
        >>> data['FUTURE2'] = data['A'][2]
        >>> otp.run(data)
                             Time  A  PAST1  PAST2  FUTURE1  FUTURE2
        0 2003-12-01 00:00:00.000  1      0      0        2        3
        1 2003-12-01 00:00:00.001  2      1      0        3        0
        2 2003-12-01 00:00:00.002  3      2      1        0        0
        """
        if not isinstance(item, int):
            raise TypeError(
                "Lag operation supports only integer const values," f" but passed value of type '{type(item)}'"
            )
        if item == 0:
            return self
        return _LagOperator(self, item) 
[docs]    def cumsum(self):
        """
        Cumulative sum of the column.
        Can only be used when creating or updating column.
        Examples
        --------
        >>> t = otp.Ticks({'A': [1, 2, 3]})
        >>> t['X'] = t['A'].cumsum()
        >>> otp.run(t)
                             Time  A    X
        0 2003-12-01 00:00:00.000  1  1.0
        1 2003-12-01 00:00:00.001  2  3.0
        2 2003-12-01 00:00:00.002  3  6.0
        """
        import onetick.py as otp
        return _ColumnAggregation(
            otp.agg.sum(self.name, running=True, all_fields=True, overwrite_output_field=True)
        ) 
    def __iter__(self):
        raise TypeError("It is not allowed to use columns in for-clauses") 
class _LagOperator(_Operation):
    """
    Implements referencing to the prior tick
    """
    def __init__(self, base_column, inx):
        self._inx = inx
        op_str = f"{str(base_column)}[{self.index}]"
        super().__init__(op_params=[base_column], dtype=base_column.dtype,
                         op_str=op_str, obj_ref=base_column.obj_ref)
    @property
    def index(self):
        return self._inx
class _ColumnAggregation:
    """
    Object to specify how column will be aggregated.
    """
    def __init__(self, aggregation):
        from ..aggregations._base import _Aggregation
        if not isinstance(aggregation, _Aggregation):
            raise ValueError(f'Expected aggregation object, got {type(aggregation)}')
        if not aggregation.running or not aggregation.all_fields or not aggregation.overwrite_output_field:
            raise ValueError("Column aggregations only support 'running' aggregations"
                             " with 'all_fields' and 'overwrite_output_field' parameters set")
        self.aggregation = aggregation
    def apply(self, src, name):
        return self.aggregation.apply(src, name=name, inplace=True)
_Column = Column  # alias for backward compatibility