Source code for onetick.py.core.source

import copy
import functools
import inspect
import os
import re
import uuid
import warnings
from collections import defaultdict
from datetime import datetime, date, time
import pandas as pd
from typing import Callable, Optional, List, Any, Tuple, Union, Type, Dict, Collection
from onetick.py.backports import Literal

import onetick.query as otq
from onetick.lib.instance import OneTickLib

import onetick.py.functions
import onetick.py.sources
from onetick import py as otp
from onetick.py import aggregations
from onetick.py import types as ott
from onetick.py import utils, configuration
from onetick.py.core._internal._manually_bound_value import _ManuallyBoundValue
from onetick.py.core._internal._multi_symbols_source import _MultiSymbolsSource
from onetick.py.core._internal._proxy_node import _ProxyNode
from onetick.py.core._internal._state_objects import _StateColumn
from onetick.py.core._internal._state_vars import StateVars
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source.schema import Schema
from onetick.py.core._source.symbol import Symbol
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.core.column import _Column, _LagOperator
from onetick.py.core.column_operations._methods.methods import is_arithmetical, is_compare
from onetick.py.core.column_operations._methods.op_types import are_strings, _replace_parameters
from onetick.py.core.column_operations.base import _Operation
from onetick.py.core.lambda_object import _LambdaIfElse, apply_lambda, apply_script, _EmulateObject
from onetick.py.core.query_inspector import get_query_info, add_pins, get_query_parameter_list
from onetick.py.core.eval_query import _QueryEvalWrapper, prepare_params
from onetick.py.core.cut_builder import _BaseCutBuilder
from onetick.py.core import db_constants
from onetick.py.utils import adaptive, adaptive_to_default
from onetick.py.aggregations._docs import (docstring, copy_method,
                                           _running_doc, _all_fields_with_policy_doc, _bucket_interval_doc,
                                           _bucket_time_doc, _bucket_units_doc, _bucket_end_condition_doc,
                                           _end_condition_per_group_doc, _boundary_tick_bucket_doc, _group_by_doc,
                                           _param_doc)
from onetick.py.aggregations.functions import (high_tick, low_tick,
                                               high_time, low_time,
                                               first_tick, last_tick,
                                               distinct, ranking,
                                               ob_snapshot, ob_snapshot_wide, ob_snapshot_flat)


def inplace_operation(method):
    """ Decorator that adds the `inplace` parameter and logic according to this
    flag. inplace=True means that method modifies an object, otherwise it copies
    the object firstly, modifies copy and returns the copy.
    """

    @functools.wraps(method)
    def _inner(self, *args, inplace=False, **kwargs):
        kwargs['inplace'] = inplace
        if inplace:
            method(self, *args, **kwargs)
        else:
            obj = self.copy()
            return method(obj, *args, **kwargs)

    return _inner


def _is_dict_required(symbols):
    """
    Depending on symbols, determine if output of otp.run() or Source.__call__() should always be a dictionary
    of {symbol: dataframe} even if only one symbol is present in the results
    """
    if isinstance(symbols, (list, tuple)):
        if len(symbols) == 0:
            return False
        elif len(symbols) > 1:
            return True
        else:
            symbols = symbols[0]

    if isinstance(symbols, otp.Source):
        return True
    if isinstance(symbols, otq.Symbol):
        symbols = symbols.name
    if isinstance(symbols, str) and 'eval' in symbols:
        return True
    return False


_agg_doc = _param_doc(name='aggs',
                      annotation=Dict,
                      str_annotation='dict of aggregations',
                      desc="""
                      aggregation dict: key - output column name; value - aggregation""")


[docs]class MetaFields: """ http://solutions.pages.soltest.onetick.com/iac/onetick-server/ep_guide/EP/Pseudo-fields.htm OneTick defines several pseudo-columns that can be treated as if they were columns of every tick. These columns can be accessed directly via :py:meth:`onetick.py.Source.__getitem__` method. But in case they are used in :py:class:`~onetick.py.core.column_operations.base.Expr` they can be accessed via ``onetick.py.Source.meta_fields``. Examples -------- Accessing pseudo-fields as columns or as class properties >>> data = otp.Tick(A=1) >>> data['X'] = data['_START_TIME'] >>> data['Y'] = otp.Source.meta_fields['_TIMEZONE'] >>> data(start=otp.dt(2003, 12, 2), timezone='GMT') Time A X Y 0 2003-12-02 1 2003-12-02 GMT """ def __init__(self): self.timestamp = _Column('TIMESTAMP', dtype=ott.nsectime) self.time = self.timestamp self.start_time = _Column('_START_TIME', dtype=ott.nsectime) self.start = self.start_time self.end_time = _Column('_END_TIME', dtype=ott.nsectime) self.end = self.end_time self.timezone = _Column('_TIMEZONE', dtype=str) self.db_name = _Column('_DBNAME', dtype=str) self.symbol_name = _Column('_SYMBOL_NAME', dtype=str) self.tick_type = _Column('_TICK_TYPE', dtype=str) self.__fields = set(map(str, self.__dict__.values())) | {'Time'} def __iter__(self): yield from self.__fields def __contains__(self, item): return item in self.__fields def __len__(self): return len(self.__fields)
[docs] def __getitem__(self, item): """ These fields are available: * ``TIMESTAMP`` (or ``Time``) * ``START_TIME`` (or ``_START_TIME``) * ``END_TIME`` (or ``_END_TIME``) * ``TIMEZONE`` (or ``_TIMEZONE``) * ``DBNAME`` (or ``_DBNAME``) * ``SYMBOL_NAME`` (or ``_SYMBOL_NAME``) * ``TICK_TYPE`` (or ``_TICK_TYPE``) """ return { 'TIMESTAMP': self.timestamp, 'Time': self.time, 'START_TIME': self.start_time, '_START_TIME': self.start_time, 'END_TIME': self.end_time, '_END_TIME': self.end_time, 'TIMEZONE': self.timezone, '_TIMEZONE': self.timezone, 'DB_NAME': self.db_name, 'DBNAME': self.db_name, '_DBNAME': self.db_name, 'SYMBOL_NAME': self.symbol_name, '_SYMBOL_NAME': self.symbol_name, 'TICK_TYPE': self.tick_type, '_TICK_TYPE': self.tick_type, }[item]
[docs]class Source: """ Base class for representing Onetick execution graph. All :ref:`onetick-py sources <sources>` are derived from this class and have access to all its methods. Examples -------- >>> data = otp.Tick(A=1) >>> isinstance(data, otp.Source) True Also this class can be used to initialize raw source with the help of ``onetick.query`` classes, but it should be done with caution as the user is required to set such properties as symbol name and tick type manually. >>> data = otp.Source(otq.TickGenerator(bucket_interval=0, fields='long A = 123').tick_type('TT')) >>> data(symbols='LOCAL::') Time A 0 2003-12-04 123 """ # TODO: need to support transactions for every _source # transaction is set of calls between _source creation and call or between two calls # if transaction have the same operations, then it seems we should add only one set of operations _PROPERTIES = [ "__node", "__hash", "__use_name_for_column_prefix", "__sources_keys_dates", "__sources_modify_query_times", "__sources_base_ep_func", "__sources_symbols", "__source_has_output", "__name", "_tmp_otq" ] _OT_META_FIELDS = ["_START_TIME", "_END_TIME", "_SYMBOL_NAME", "_DBNAME", "_TICK_TYPE", '_TIMEZONE'] meta_fields = MetaFields() Symbol = Symbol def __init__( self, node=None, _symbols=None, _start=adaptive, _end=adaptive, _base_ep_func=None, _has_output=True, **kwargs ): self._tmp_otq = TmpOtq() self.__name = None if "Time" in kwargs: # TODO: add tests raise ValueError( "It is not allowed to have 'Time' or 'TIMESTAMP' columns, because they are the key columns" ) if "TIMESTAMP" not in kwargs: kwargs["TIMESTAMP"] = ott.nsectime kwargs["Time"] = ott.nsectime kwargs["_START_TIME"] = ott.nsectime kwargs["_END_TIME"] = ott.nsectime kwargs["_SYMBOL_NAME"] = str kwargs["_DBNAME"] = str kwargs["_TICK_TYPE"] = str kwargs["_TIMEZONE"] = str for key, value in kwargs.items(): # calculate value type value_type = None if inspect.isclass(value): value_type = value value = None else: value_type = type(value) if issubclass(value_type, bool): value_type = float if issubclass(value_type, (date, datetime, ott.datetime, ott.date)): value_type = ott.nsectime # check valid value type if ott.get_base_type(value_type) not in [int, float, str, bool]: raise TypeError(f'"{type(value)}" is not supported') # convert string to custom string if necessary if value_type is str and value and len(value) > ott.string.DEFAULT_LENGTH: value_type = ott.string[len(value)] self.__dict__[key] = _Column(name=key, dtype=value_type, obj_ref=self) # just an alias to Timestamp self.__dict__['Time'] = self.__dict__['TIMESTAMP'] self.__dict__['_state_vars'] = StateVars(self) if node is None: node = otq.Passthrough() self.__hash = uuid.uuid4() self.__sources_keys_dates = {} self.__sources_modify_query_times = {} self.__sources_base_ep_func = {} self.__sources_symbols = {} self.__source_has_output = _has_output if isinstance(node, _ProxyNode): self.__node = _ProxyNode(*node.copy_graph(), refresh_func=self.__refresh_hash) else: self.__node = _ProxyNode(*self.__from_ep_to_proxy(node), refresh_func=self.__refresh_hash) self.__sources_keys_dates[self.__node.key()] = (_start, _end) self.__sources_modify_query_times[self.__node.key()] = False self.__sources_base_ep_func[self.__node.key()] = _base_ep_func self.__sources_symbols[self.__node.key()] = _symbols # flag controls whether we have to add node_name prefix when convert # columns into string self.__use_name_for_column_prefix = False def _try_default_constructor(self, *args, node=None, **kwargs): if node is not None: # Source.copy() method will use this way # all info from original source will be copied by copy() method after Source.__init__(self, *args, node=node, **kwargs) return True return False def base_ep(self, **kwargs): # default implementation # real implementation should return a Source object return None def _clean_sources_dates(self): self.__sources_keys_dates = {} self.__sources_modify_query_times = {} self.__sources_base_ep_func = {} self.__sources_symbols = {} def _set_sources_dates(self, other, copy_symbols=True): self.__sources_keys_dates.update(other._get_sources_dates()) self.__sources_modify_query_times.update(other._get_sources_modify_query_times()) self.__sources_base_ep_func.update(other._get_sources_base_ep_func()) if copy_symbols: self.__sources_symbols.update(other._get_sources_symbols()) else: # this branch is applicable for the bound symbols with callbacks, # where we drop all adaptive symbols and keep only manually specified # symbols manually_bound = { key: _ManuallyBoundValue(value) for key, value in other._get_sources_symbols().items() if value is not adaptive and value is not adaptive_to_default } self.__sources_symbols.update(manually_bound) self.__source_has_output = other._get_source_has_output() def _change_sources_keys(self, keys: dict): """ Change keys in sources dictionaries. Need to do it, for example, after rebuilding the node history with new keys. Parameters ---------- keys: dict Mapping from old key to new key """ sources = (self.__sources_keys_dates, self.__sources_modify_query_times, self.__sources_base_ep_func, self.__sources_symbols) for dictionary in sources: for key in list(dictionary): dictionary[keys[key]] = dictionary.pop(key) def _get_source_has_output(self): return self.__source_has_output def _get_sources_dates(self): return self.__sources_keys_dates def _get_sources_modify_query_times(self): return self.__sources_modify_query_times def _get_sources_base_ep_func(self): return self.__sources_base_ep_func def _get_sources_symbols(self): return self.__sources_symbols def use_name_for_column_prefix(self, flag=None): if flag is not None: self.__use_name_for_column_prefix = flag return self.__use_name_for_column_prefix def _check_key_in_properties(self, key: str) -> bool: if key in self.__class__._PROPERTIES: return True if key.replace('_' + Source.__name__.lstrip('_'), "") in self.__class__._PROPERTIES: return True if key.replace(self.__class__.__name__, "") in self.__class__._PROPERTIES: return True return False def __setattr__(self, key, value): if self._check_key_in_properties(key): self.__dict__[key] = value return if isinstance(value, _BaseCutBuilder): value(key) return value = self.__validate_before_setting(key, value) if key in self.__dict__: field = self.__dict__[key] if issubclass(type(field), _Column): self.__update_field(field, value) else: raise AttributeError(f'Column "{key}" not found') else: assert not ( isinstance(value, _StateColumn) and value.obj_ref is None ), "State variables should be in `state` field" self.__add_field(key, value) def __validate_before_setting(self, key, value): if key in ["Symbol", "_SYMBOL_NAME"]: raise ValueError("Symbol setting is supported during creation only") if key == "_state_vars": raise ValueError("state field is necessary for keeping state variables and can't be rewritten") if isinstance(value, ott.ExpressionDefinedTimeOffset): value = value.n if not (ott.is_type_supported(ott.get_object_type(value)) or isinstance(value, _Operation) or type(value) is tuple): raise TypeError(f'It is not allowed to set objects of "{type(value)}" type') return value def _update_field(self, field, value): self.__update_field(field, value) def __update_field(self, field, value): def _replace_positive_lag_operator_with_tmp_column(operation): if isinstance(operation, _LagOperator) and operation.index > 0: column = operation._op_params[0] name = column.name if name.startswith("__"): raise ValueError("Column name started with two underscores should be used by system only, " "please do not use such names.") name = f"__{name}_{operation.index}_NEW__" return _Column(name, column.dtype, column.obj_ref, precision=getattr(column, "_precision", None)) new_names, old_names = self.__get_old_and_new_names(_replace_positive_lag_operator_with_tmp_column, value) if old_names: self.sink(otq.AddFields(", ".join(f"{new} = {arg}" for arg, new in zip(old_names, new_names)))) if type(value) is tuple: # support to be compatible with adding fields to get rid of some strange problems # but really we do not use passed type, because update field does not support it value, _ = value convert_to_type = None str_value = ott.value2str(value) value_dtype = ott.get_object_type(value) base_type = ott.get_base_type(value_dtype) if base_type is bool: # according OneTick base_type = float type_changes = False # because mantis 0021194 if base_type is str: # update_field non-string field to string field (of any length) or value # changes type to default string if not issubclass(field.dtype, str): field._dtype = str type_changes = True else: if ( (issubclass(field.dtype, int) or issubclass(field.dtype, float) or issubclass(field.dtype, str)) and (issubclass(value_dtype, ott.msectime) or issubclass(value_dtype, ott.nsectime)) and (str(field) != 'TIMESTAMP') and (not isinstance(field, _StateColumn)) ): # in OneTick after updating fields with functions that return datetime values # the type of column will not change for long and double columns # and will change to long (or double in older versions) when updating string column # (see BDS-267) # That's why we are explicitly setting type for returned value convert_to_type = value_dtype if issubclass(field.dtype, str): # using update_field only for string because update_fields preserves type # by default and raises exception if it can't be done type_changes = True elif issubclass(field.dtype, float) and base_type is int and not isinstance(field, _StateColumn): # PY-574 if field was float and int, then # it is still float in onetick, so no need to change type on otp level convert_to_type = int elif (issubclass(field.dtype, ott.msectime) or issubclass(field.dtype, ott.nsectime)) and base_type is int: # if field was time type and we add something to it, then # no need to change type pass else: if issubclass(value_dtype, bool): value_dtype = float if isinstance(field, _StateColumn): pass # do nothing else: field._dtype = value_dtype type_changes = True # for aliases, TIMESTAMP ~ Time as an example key = str(field) if key == "TIMESTAMP": self.__update_timestamp(key, value, str_value) elif type_changes: if (value_dtype is otp.nsectime and issubclass(field.dtype, str)): # PY-416 string field changing the type from str to datetime leads to losing nanoseconds # work around is: make a new column first, delete accessor column # and then recreate it with value from temp column self.sink(otq.AddField(field=f"_TMP_{key}", value=str_value)) self.sink(otq.Passthrough(fields=key, drop_fields=True)) self.sink(otq.AddField(field=f"{key}", value=f"_TMP_{key}")) self.sink(otq.Passthrough(fields=f"_TMP_{key}", drop_fields=True)) else: self.sink(otq.UpdateField(field=key, value=str_value)) else: self.sink(otq.UpdateFields(set=key + "=" + str_value)) if new_names: self.sink(otq.Passthrough(drop_fields=True, fields=",".join(new_names))) if convert_to_type: # manual type conversion after update fields for some cases self.table(**{key: convert_to_type}, inplace=True, strict=False) def __update_timestamp(self, key, value, str_value): if not hasattr(value, "dtype"): # A constant value: no need to pre- or post-sort self.sink(otq.UpdateField(field=key, value=str_value)) elif ( isinstance(value, _Column) and not isinstance(value, _StateColumn) and hasattr(value, "name") and value.name in self.__dict__ ): # An existing, present column: no need to create a temporary one. See PY-253 need_to_sort = str_value not in ("_START_TIME", "_END_TIME") if need_to_sort: self.sort(value, inplace=True) self.sink(otq.UpdateField(field=key, value=str_value)) if need_to_sort: self.sort(self.Time, inplace=True) elif (not is_compare(value) and isinstance(value, (_Operation, _LambdaIfElse)) or is_arithmetical(value)): # An expression or a statevar: create a temp column with its value, pre- and post- sort. self.sink(otq.AddField(field="__TEMP_TIMESTAMP__", value=str_value)) self.sink(otq.OrderByEp(order_by="__TEMP_TIMESTAMP__ ASC")) self.sink(otq.UpdateField(field=key, value="__TEMP_TIMESTAMP__")) self.sink(otq.Passthrough(fields="__TEMP_TIMESTAMP__", drop_fields=True)) self.sort(self.Time, inplace=True) else: raise Exception(f"Illegal type for timestamp assignment: {value.__class__}") def __get_old_and_new_names(self, _replace_positive_lag_operator_with_tmp_column, value): old_args, new_args = _replace_parameters(value, _replace_positive_lag_operator_with_tmp_column) old_args = {str(old): (old, new) for old, new in zip(old_args, new_args)} old_names = [] new_names = [] for op_str, (_, new) in old_args.items(): old_names.append(op_str) new_names.append(str(new)) return new_names, old_names def append(self, other) -> 'Source': """ Merge data source with `other` Parameters ---------- other: List, Source data source to merge Returns ------- Source """ if isinstance(other, list): return onetick.py.functions.merge(other + [self]) else: return onetick.py.functions.merge([self, other]) def __add_field(self, key, value): # ------------- # ADD_FIELDS # ------------- # TODO: merge together several add fields in one add_fields if type(value) is tuple: value, dtype = value else: dtype = ott.get_object_type(value) if type(value) is str: if len(value) > ott.string.DEFAULT_LENGTH: dtype = ott.string[len(value)] if issubclass(dtype, bool): # according to OneTick transformations dtype = float if issubclass(dtype, (ott.datetime, ott.date)): # according to OneTick transformations dtype = ott.nsectime type_str = ott.type2str(dtype) # format value str_value = ott.value2str(value) self.sink(otq.AddField(field=f'{type_str} {key}', value=str_value)) self.__dict__[key] = _Column(key, dtype, self)
[docs] def __getitem__(self, item): """ Allows to express multiple things: - access a field by name - filter ticks by condition - select subset of fields - set order of fields Parameters ---------- item: str, :class:`Operation`, :func:`eval`, List[str] - ``str`` is to access column. - ``Operation`` to express filter condition. - ``otp.eval`` to express filter condition based on external query - ``List[str]`` select subset of specified columns - ``slice[List[str]::]`` set order of columns - ``slice[Tuple[str, Type]::]`` type defaulting - ``slice[:]`` alias to :meth:`Source.copy()` Returns ------- Column, Source or tuple of Sources - Column if column name was specified. - Two sources if filtering expression or eval was provided: the first one is for ticks that pass condition and the second one that do not. Examples -------- Access to the `X` column: add `Y` based on `X` >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'] * 2 >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 2 1 2003-12-01 00:00:00.001 2 4 2 2003-12-01 00:00:00.002 3 6 Filtering based on expression >>> data = otp.Ticks(X=[1, 2, 3]) >>> data_more, data_less = data[(data['X'] > 2)] >>> otp.run(data_more) Time X 0 2003-12-01 00:00:00.002 3 >>> otp.run(data_less) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 Filtering based on the result of another query. Another query should have only one tick as a result with only one field (whatever it names). >>> exp_to_select = otp.Ticks(WHERE=['X > 2']) >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1]) >>> data, _ = data[otp.eval(exp_to_select)] >>> otp.run(data) Time X Y Z 0 2003-12-01 00:00:00.002 3 c 0.1 Select subset of specified columns >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1]) >>> data = data[['X', 'Z']] >>> otp.run(data) Time X Z 0 2003-12-01 00:00:00.000 1 0.4 1 2003-12-01 00:00:00.001 2 0.3 2 2003-12-01 00:00:00.002 3 0.1 Slice with list will keep all columns, but change order: >>> data=otp.Tick(Y=1, X=2, Z=3) >>> data() Time Y X Z 0 2003-12-01 1 2 3 >>> data = data[['X', 'Y']:] >>> data() Time X Y Z 0 2003-12-01 2 1 3 Slice can be used as short-cut for :meth:`Source.copy`: >>> data[:] # doctest: +ELLIPSIS <onetick.py.sources.Tick object at ...> See Also -------- | :meth:`Source.table`: another and more generic way to select subset of specified columns | **PASSTHROUGH** OneTick event processor | **WHERE_CLAUSE** OneTick event processor """ strict = True if isinstance(item, (_Operation, _QueryEvalWrapper)): if isinstance(item, _Operation): item = item._make_python_way_bool_expression() where = self.copy(ep=otq.WhereClause(where=str(item))) if_source = where.copy() if_source.node().out_pin("IF") else_source = where.copy() else_source.node().out_pin("ELSE") # TODO: add ability to remove then this ep, because it is required only for right output else_source.sink(otq.Passthrough()) return if_source, else_source if isinstance(item, slice): if item.step: raise AttributeError("Source columns slice with step set makes no sense") if item.start and item.stop: raise AttributeError("Source columns slice with both start and stop set is not available now") if not item.start and item.stop: raise AttributeError("Source columns slice with only stop set is not implemented yet") if item.start is None and item.stop is None: return self.copy() item = item.start strict = False if isinstance(item, tuple): item = dict([item]) elif isinstance(item, list): if not item: return self.copy() item_type = list(set([type(x) for x in item])) if len(item_type) > 1: raise AttributeError(f"Different types {item_type} in slice list is not supported") if item_type[0] == tuple: item = dict(item) if isinstance(item, list): # --------- # TABLE # --------- items = [] for it in item: if isinstance(it, _Column): items.append(it.name) elif isinstance(it, str): items.append(it) else: raise ValueError(f"It is not supported to filter '{it}' object of '{type(it)}' type") # validation for item in items: if item not in self.schema: existing_columns = ", ".join(self.schema.keys()) raise AttributeError( f"There is no '{item}' column. There are existing columns: {existing_columns}" ) columns = {} dtypes = self.columns(skip_meta_fields=True) for column_name in items: if column_name not in ['Time', 'Timestamp', 'TIMESTAMP']: columns[column_name] = dtypes[column_name] return self.table(strict=strict, **columns) if isinstance(item, dict): return self.table(strict=strict, **item) name, dtype = "", None # way to set type if isinstance(item, tuple): name, dtype = item else: name = item if name not in self.__dict__: if dtype is None: raise KeyError(f'Column name {name} is not in the schema. Please, check that this column ' 'is in the schema or add it using the .schema property') if name == 0 or name == 1: raise ValueError(f"constant {name} are not supported for indexing for now, please use otp.Empty") if type(name) in (int, float): raise ValueError("integer indexes are not supported") self.__dict__[name] = _Column(name, dtype, self) else: if not isinstance(self.__dict__[name], _Column): raise AttributeError(f"There is no '{name}' column") if dtype: type1, type2 = self.__dict__[name].dtype, dtype b_type1, b_type2 = ott.get_base_type(type1), ott.get_base_type(type2) if b_type1 != b_type2: if {type1, type2} == {int, float}: self.__dict__[name]._dtype = float else: raise Warning( f"Column '{name}' was declared as '{type1}', but you want to change it to '{type2}', " "that is not possible without setting type directly via assigning value" ) else: if issubclass(b_type1, str): t1_length = ott.string.DEFAULT_LENGTH if type1 is str else type1.length t2_length = ott.string.DEFAULT_LENGTH if type2 is str else type2.length self.__dict__[name]._dtype = type2 if t1_length < t2_length else type1 if {type1, type2} == {ott.nsectime, ott.msectime}: self.__dict__[name]._dtype = ott.nsectime return self.__dict__[name]
[docs] def __setitem__(self, key, value): """ Add new column to the source or update existing one. Parameters ---------- key: str The name of the new or existing column. value: int, str, float, datetime, date, \ :py:class:`~onetick.py.Column`, :py:class:`~onetick.py.Operation`, :py:class:`~onetick.py.string`, \ :py:class:`otp.date <onetick.py.date>`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`~onetick.py.nsectime`, :py:class:`~onetick.py.msectime` The new value of the column. See also -------- | **ADD_FIELD** OneTick event processor | **UPDATE_FIELD** OneTick event processor Examples -------- >>> data = otp.Tick(A='A') >>> data['D'] = otp.datetime(2022, 2, 2) >>> data['X'] = 1 >>> data['Y'] = data['X'] >>> data['X'] = 12345 >>> data['Z'] = data['Y'].astype(str) + 'abc' >>> data() Time A D X Y Z 0 2003-12-01 A 2022-02-02 12345 1 1abc """ return self.__setattr__(key, value)
[docs] @inplace_operation def update(self, if_set, else_set=None, where=1, inplace=False) -> 'Source': """ Update field of the Source Parameters ---------- if_set: dict Dictionary <field name>: <expression>. else_set: dict, optional Dictionary <field name>: <expression> where: expression, optional Condition of updating. If ``where`` is True the fields from ``if_set`` will be updated with corresponding expression. If ``where`` is False, the fields from ``else_set`` will be updated with corresponding expression. inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None``. See also -------- **UPDATE_FIELD** and **UPDATE_FIELDS** OneTick event processors Examples -------- >>> # OTdirective: snippet-name: Arrange.conditional update; >>> t = otp.Ticks({'X': [1, 2, 3], ... 'Y': [4, 5, 6], ... 'Z': [1, 0, 1]}) >>> t = t.update(if_set={'X': t['X'] + t['Y']}, ... else_set={'X': t['X'] - t['Y']}, ... where=t['Z'] == 1) >>> t() # OTdirective: snippet-example; Time X Y Z 0 2003-12-01 00:00:00.000 5 4 1 1 2003-12-01 00:00:00.001 -3 5 0 2 2003-12-01 00:00:00.002 9 6 1 """ if else_set is None: else_set = {} if len(if_set) == 0 or not isinstance(if_set, dict): raise ValueError( f"'if_set' parameter should be non empty dict, but got '{if_set}' of type '{type(if_set)}'" ) def _prepare(to_prepare): result = {} for in_obj, out_obj in to_prepare.items(): if isinstance(in_obj, _Column): result[in_obj.name.strip()] = out_obj elif isinstance(in_obj, str): result[in_obj.strip()] = out_obj else: raise AttributeError( f"It is not supported to update item '{in_obj}' of type '{type(in_obj)}'" ) return result def _validate(to_validate): for in_key, out_obj in to_validate.items(): if not (in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column)): raise AttributeError(f"There is no '{in_key}' column to update") if in_key == "Time" or in_key == "TIMESTAMP": raise ValueError("It is not allowed to modify 'Time' column using .update method") dtype = ott.get_object_type(out_obj) if not ( issubclass(dtype, int) or issubclass(dtype, float) or issubclass(dtype, int) or issubclass(dtype, str) ): raise TypeError(f"Type '{dtype}' is not supported for setting '{in_key}' property") if isinstance(out_obj, bool): to_validate[in_key] = int(out_obj) return to_validate # prepare and validate items = _validate(_prepare(if_set)) else_items = _validate(_prepare(else_set)) if isinstance(where, bool): where = int(where) if not (getattr(where, "dtype", None) is bool or isinstance(where, int)): raise ValueError(f"Where has not supported type '{type(where)}'") # apply set_rules = [f"{self[key]}=({ott.value2str(value)})" for key, value in items.items()] else_set_rules = [f"{self[key]}=({ott.value2str(value)})" for key, value in else_items.items()] self.sink(otq.UpdateFields(set=",".join(set_rules), else_set=",".join(else_set_rules), where=str(where))) return self
[docs] @docstring(parameters=[_agg_doc, _running_doc, _all_fields_with_policy_doc, _bucket_interval_doc, _bucket_time_doc, _bucket_units_doc, _bucket_end_condition_doc, _end_condition_per_group_doc, _boundary_tick_bucket_doc, _group_by_doc], add_self=True) def agg(self, aggs, *args, **kwargs) -> 'Source': """ Applies composition of :ref:`otp.agg <aggregations_funcs>` aggregations See Also -------- | :ref:`Aggregations <aggregations_funcs>` | **COMPUTE** OneTick event processor Examples -------- By default the whole data is aggregated: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}) >>> otp.run(data) Time X_SUM 0 2003-12-04 10 Multiple aggregations can be applied at the same time: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X'), ... 'X_MEAN': otp.agg.average('X')}) >>> otp.run(data) Time X_SUM X_MEAN 0 2003-12-04 10 2.5 Aggregation can be used in running mode: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True) >>> otp.run(data) Time CUM_SUM 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 3 2 2003-12-01 00:00:00.002 6 3 2003-12-01 00:00:00.003 10 Aggregation can be split in buckets: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks') >>> otp.run(data) Time X_SUM 0 2003-12-01 00:00:00.001 3 1 2003-12-01 00:00:00.003 7 Running aggregation can be used with buckets too: >>> data = otp.Ticks(X=[1, 2, 3, 4], offsets=[0, 1000, 1500, 3000]) >>> data = data.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... running=True, bucket_interval=2) >>> otp.run(data) Time X_MEAN X_STD 0 2003-12-01 00:00:00.000 1.0 0.000000 1 2003-12-01 00:00:00.001 1.5 0.500000 2 2003-12-01 00:00:00.002 2.0 0.816497 3 2003-12-01 00:00:00.003 2.5 1.118034 4 2003-12-01 00:00:02.000 3.0 0.816497 5 2003-12-01 00:00:02.001 3.5 0.500000 6 2003-12-01 00:00:02.002 4.0 0.000000 7 2003-12-01 00:00:02.003 NaN NaN """ aggs = aggs.copy() result = self.copy() what_to_aggregate = aggregations.compute(*args, **kwargs) for name, ag in aggs.items(): what_to_aggregate.add(name, ag) result = what_to_aggregate.apply(result) result._add_table() return result
def sort_values(self, *args, **kwargs): """ alias of sort See Also -------- :meth:`Source.sort` """ return self.sort(*args, **kwargs)
[docs] @inplace_operation def sort(self, by: Union[str, Collection[Union[str, 'onetick.py.Column']]], ascending=True, inplace=False) -> Optional['Source']: """ Sort ticks by columns. Parameters ---------- by: str, Column or list of them Column(s) to sort by. It is possible to pass a list of column, where is the order is important: from the left to the right. ascending: bool or list Order to sort by. If list of columns is specified, then list of ascending values per column is expected. (the :class:`nan` is the smallest for ``float`` type fields) inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object Returns ------- :class:`Source` See also -------- **ORDER_BY** OneTick event processor Examples -------- Single column examples >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['X']) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.001 5 3.1 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.000 94 NaN >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['Y']) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 Inplace >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data.sort(data['Y'], inplace=True) # OTdirective: snippet-name: Arrange.sort.inplace; >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 Multiple columns >>> data = otp.Ticks({'X':[ 5, 6, 3, 6], ... 'Y':[1.4, 3.1, 9.1, 5.5]}) >>> data = data.sort([data['X'], data['Y']]) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.002 3 9.1 1 2003-12-01 00:00:00.000 5 1.4 2 2003-12-01 00:00:00.001 6 3.1 3 2003-12-01 00:00:00.003 6 5.5 Ascending/descending control >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['X'], ascending=False) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 >>> # OTdirective: snippet-name: Arrange.sort.sort; >>> data = otp.Ticks({'X':[ 5, 6, 3, 6], ... 'Y':[1.4, 3.1, 9.1, 5.5]}) >>> data = data.sort([data['X'], data['Y']], ascending=[True, False]) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.002 3 9.1 1 2003-12-01 00:00:00.000 5 1.4 2 2003-12-01 00:00:00.003 6 5.5 3 2003-12-01 00:00:00.001 6 3.1 """ columns = by if isinstance(columns, list): objs = columns else: objs = [columns] if isinstance(ascending, list): asc_objs = ascending else: asc_objs = [ascending] items = [] # ------------------------------- # Columns processing # ------------------------------- # convert to strings # TODO: it seems as a common code, need to move to a separate function for obj in objs: if isinstance(obj, _Column): items.append(obj.name) elif isinstance(obj, str): items.append(obj) else: # TODO: cover with tests raise TypeError(f"It is not supported to order by '{obj}' of type '{type(obj)}'") # validate for item in items: if item in self.__dict__: if not isinstance(self.__dict__[item], _Column): # TODO: cover with tests raise AttributeError(f"There is no '{item}' column") # if else: # TODO: covert with tests raise AttributeError(f"There is no '{item}' column") # ------------------------------- # Asc processing # ------------------------------- asc_items = [True] * len(items) def asc_convert(v): return "ASC" if v else "DESC" for inx in range(len(items)): if inx >= len(asc_objs): asc_obj = asc_items[inx] else: asc_obj = asc_objs[inx] if isinstance(asc_obj, bool): asc_items[inx] = asc_convert(asc_obj) elif isinstance(asc_obj, int): asc_items[inx] = asc_convert(asc_obj) else: raise TypeError(f"asc can not be '{asc_obj}' of type '{type(asc_obj)}'") # --------------- # combine together order_by = [column_name + " " + asc for column_name, asc in zip(items, asc_items)] self.sink(otq.OrderByEp(order_by=",".join(order_by))) return self
def _hash(self): return self.__hash def _merge_tmp_otq(self, source): self._tmp_otq.merge(source._tmp_otq) def __prepare_graph(self, symbols=None, start=None, end=None, has_output=False): # We copy object here, because we will change it according to passed # symbols and date ranges. For example, we can add modify_query_times EP # if it is neccessary obj = self.copy() if has_output: obj.sink(otq.Passthrough()) start, end, symbols = obj._get_date_range(symbols, start, end) if start is adaptive: start = None if end is adaptive: end = None if symbols is not None and isinstance(symbols, pd.DataFrame): symbols = utils.get_symbol_list_from_df(symbols) if symbols is not None and not isinstance(symbols, list): symbols = [symbols] elif symbols is None: symbols = [] _symbols = [] for sym in symbols: _symbols.append(self._convert_symbol_to_string(sym, tmp_otq=obj._tmp_otq, start=start, end=end)) return obj, start, end, _symbols
[docs] def to_otq(self, file_name=None, file_suffix=None, query_name=None, symbols=None, start=None, end=None, timezone=None, raw=None, add_passthrough=True): """ Save data source to .otq file and return path to the saved file. Parameters ---------- file_name: str Absolute or relative path to the saved file. If ``None``, create temporary file and name it randomly. file_suffix: str Suffix to add to the saved file name (including extension). Can be specified if ``file_name`` is ``None`` to distinguish between different temporary files. Default: ".to_otq.otq" query_name: str Name of the main query in the created file. If ``None``, take name from this Source object. If that name is empty, set name to "query". symbols: str, list, :pandas:`DataFrame <pandas.DataFrame>`, :class:`Source` symbols to save query with start: datetime start time to save query with end: datetime end time to save query with timezone: str timezone to save query with raw .. deprecated:: 1.4.17 add_passthrough: bool will add Passthrough ep at the end of resulting graph Returns ------- result: str Relative (if ``file_name`` is relative) or absolute path to the created query in the format ``file_name::query_name`` """ if raw is not None: warnings.warn('The "raw" flag is deprecated and makes no effect') if timezone is None: timezone = configuration.config.tz file_path = str(file_name) if file_name is not None else None if file_suffix is None: file_suffix = self._name_suffix('to_otq.otq') if query_name is None: query_name = self.get_name(remove_invalid_symbols=True) if query_name is None: query_name = 'query' obj, start, end, symbols = self.__prepare_graph(symbols, start, end) graph = obj._to_graph(add_passthrough=add_passthrough) graph.set_symbols(symbols) return obj._tmp_otq.save_to_file(query=graph, query_name=query_name, file_path=file_path, file_suffix=file_suffix, start=start, end=end, timezone=timezone)
def _store_in_tmp_otq(self, tmp_otq, operation_suffix="tmp_query", symbols=None, start=None, end=None, raw=None, add_passthrough=True, name=None): """ Adds this source to the tmp_otq storage Parameters: tmp_otq: TmpOtq Storage object operation_suffix: str Suffix string to be added to the autogenerated graph name in the otq file name: str, optional If specified, this ``name`` will be used to save query and ``suffix`` parameter will be ignored. Returns: result: str String with the name of the saved graph (starting with THIS::) """ # We copy object here, because we will change it according to passed # symbols and date ranges. For example, we can add modify_query_times EP # if it is neccessary if raw is not None: warnings.warn('The "raw" flag is deprecated and makes no effect') obj = self.copy() start, end, symbols = obj._get_date_range(symbols, start, end) tmp_otq.merge(obj._tmp_otq) if symbols and not isinstance(symbols, list): symbols = [symbols] elif symbols is None: symbols = [] _symbols = [] for sym in symbols: _symbols.append(self._convert_symbol_to_string(sym, tmp_otq)) if isinstance(start, ott.dt): # OT save_to_file checks for the datetime time start = datetime.fromtimestamp(start.timestamp()) if isinstance(end, ott.dt): end = datetime.fromtimestamp(end.timestamp()) graph = obj._to_graph(add_passthrough=add_passthrough) graph.set_start_time(start) graph.set_end_time(end) graph.set_symbols(_symbols) suffix = self._name_suffix(suffix=operation_suffix, separator='__', remove_invalid_symbols=True) return tmp_otq.add_query(graph, suffix=suffix, name=name) def _save_as_tmp_otq(self, start=None, end=None, symbols=None, suffix=""): tmp_otq = utils.TmpFile(f"{suffix}.otq") query_path = self.to_otq(tmp_otq, symbols=symbols, start=start, end=end) return query_path
[docs] def plot(self, y, x='Time', kind='line', **kwargs): """ Executes the query with known properties and builds a plot resulting dataframe. Uses the pandas dataframe plot method to plot data. Other parameters could be specified through the ``kwargs``. Parameters ---------- x: str Column name for the X axis y: str Column name for the Y axis kind: str The kind of plot Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data.plot(y='X', kind='bar') # doctest: +SKIP """ result = self.copy() return result[[y, x]]().plot(x=x, y=y, kind=kind, **kwargs)
[docs] def count(self, **kwargs): """ Returns the number of ticks in the query. Adds an aggregation that calculate total ticks count, and executes a query. Result is a single value -- number of ticks. Possible application is the jupyter when a developer wants to check data presences for example. Parameters ---------- kwargs parameters that will be passed to :meth:`Source.to_df` Returns ------- int See Also -------- Source.to_df Source.head, Source.tail, :py:func:`~onetick.py.agg.count` Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data.count() 3 >>> data = otp.Empty() >>> data.count() 0 """ result = self.copy() result = result.agg({'__num_rows': otp.agg.count()})(**kwargs) if result.empty: return 0 return int(result['__num_rows'][0])
[docs] def head(self, n=5, **kwargs) -> 'Union[pd.DataFrame, Source]': """ Executes the query and returns first ``n`` ticks as a pandas dataframe. It is useful in the jupyter case when you want to observe first ``n`` values. Parameters ---------- n: int, default=5 number of ticks to return kwargs: parameters will be passed to :meth:`Source.to_df` Returns ------- :pandas:`DataFrame <pandas.DataFrame>` See Also -------- Source.to_df Source.tail, Source.count Examples -------- >>> data = otp.Ticks(X=list('abcdefgik')) >>> data.head()[['X']] X 0 a 1 b 2 c 3 d 4 e """ result = self.copy() result = result.first(n=n) # pylint: disable=E1123 return result(**kwargs)
[docs] def tail(self, n=5, **kwargs) -> Union['pd.DataFrame', 'Source']: """ Executes the query and returns last ``n`` ticks as a pandas dataframe. It is useful in the jupyter case when you want to observe last ``n`` values. Parameters ---------- n: int number of ticks to return kwargs: parameters will be passed to :meth:`Source.to_df` Returns ------- :pandas:`DataFrame <pandas.DataFrame>` See Also -------- Source.to_df :meth:`Source.head`, :meth:`Source.count` Examples -------- >>> data = otp.Ticks(X=list('abcdefgik')) >>> data.tail()[['X']] X 0 e 1 f 2 g 3 i 4 k """ result = self.copy() result = result.last(n=n) # pylint: disable=E1123 return result(**kwargs)
def _rename_impl_(self, columns=None, **kwargs): if len(columns) != 0 and len(kwargs) != 0: raise Exception( "It is not allowed to rename using 'columns' parameter and **kwargs simultaniously. " "Use either 'columns' parameter or **kwargs" ) if len(columns) == 0 and len(kwargs) != 0: columns = kwargs # prepare items = {} out_names = set() for in_obj, out_obj in columns.items(): if isinstance(in_obj, _Column): items[in_obj.name.strip()] = out_obj.strip() elif isinstance(in_obj, str): items[in_obj.strip()] = out_obj.strip() else: raise Exception(f"It is not supported to rename item '{in_obj}' of type {type(in_obj)}'") if out_obj in out_names: raise AttributeError( f"You want to rename '{in_obj}' into '{out_obj}', " f"but also want to rename another column into '{out_obj}'" ) out_names.add(out_obj) # validate for in_key, out_key in items.items(): if " " in out_key: raise AttributeError(f"There is space in '{out_key}' column name") if in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column): pass else: raise AttributeError(f"There is no '{in_key}' column to rename") if out_key in self.__dict__ and isinstance(self.__dict__[out_key], _Column): raise AttributeError(f"Column '{out_key}' is already exist") # apply for in_key, out_key in items.items(): self.__dict__[in_key].rename(out_key, update_parent_object=False) self.__dict__[out_key] = self.__dict__[in_key] del self.__dict__[in_key] rename_rules = [key + "=" + value for key, value in items.items()] self.sink(otq.RenameFieldsEp(rename_fields=",".join(rename_rules)))
[docs] @inplace_operation def rename(self, columns=None, inplace=False) -> Optional['Source']: """ Rename columns Parameters ---------- columns : dict Rules how to rename in the following format: {<column> : <new-column-name>}, where <column> is either existing column name of str type or reference to a column, and <new-column-name> a new column name of str type. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- **RENAME** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[1], Y=[2]) >>> data = data.rename({'X': 'XX', ... data['Y']: 'YY'}) >>> otp.run(data) Time XX YY 0 2003-12-01 1 2 """ if columns is None: columns = {} self._rename_impl_(columns) return self
[docs] @inplace_operation def execute(self, *operations, inplace=False) -> Optional['Source']: """ Execute operations without returning their values. Some operations in onetick.py can be used to modify the state of some object (tick sequences mostly) and in that case user may not want to save the result of the operation to column. Parameters ---------- operations : list of :py:class:`~onetick.py.Operation` operations to execute. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See Also -------- **EXECUTE_EXPRESSIONS** OneTick event processor Examples -------- >>> data = otp.Tick(A=1) >>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'A') >>> data = data.execute(data.state_vars['SET'].erase(A=1)) """ if not operations: raise ValueError('At least one operation must be specified in execute() method') op_str = ';'.join(map(str, operations)) self.sink(otq.ExecuteExpressions(op_str)) return self
def __refresh_hash(self): """ This internal function refreshes hash for every graph modification. It is used only in _ProxyNode, because it tracks nodes changes """ self.__hash = uuid.uuid3(uuid.NAMESPACE_DNS, str(self.__hash)) def _prepare_for_execution(self, symbols=None, start=None, end=None, start_time_expression=None, end_time_expression=None, timezone=None, has_output=None, running_query_flag=None, require_dict=False, node_name=None): if has_output is None: has_output = self.__source_has_output if timezone is None: timezone = configuration.config.tz obj, start, end, symbols = self.__prepare_graph(symbols, start, end, has_output) require_dict = require_dict or _is_dict_required(symbols) if node_name is None: node_name = 'SOURCE_CALL_MAIN_OUT_NODE' obj.node().node_name(node_name) graph = obj._to_graph(add_passthrough=False) graph.set_symbols(symbols) tmp_file = utils.TmpFile(suffix=self._name_suffix('run.otq')) query_to_run = obj._tmp_otq.save_to_file(query=graph, query_name=self.get_name(remove_invalid_symbols=True) if self.get_name(remove_invalid_symbols=True) else "main_query", file_path=tmp_file.path, start=start, end=end, running_query_flag=running_query_flag, start_time_expression=start_time_expression, end_time_expression=end_time_expression, timezone=timezone) # symbols and start/end times should be already stored in the query and should not be passed again return dict( query=query_to_run, symbols=None, start=None, end=None, start_time_expression=None, end_time_expression=None, require_dict=require_dict, node_name=node_name, time_as_nsec=True, )
[docs] def __call__(self, *args, **kwargs): """ .. deprecated:: 1.48.3 Use :py:func:`otp.run <onetick.py.run>` instead. """ return otp.run(self, *args, **kwargs)
[docs] def to_df(self, symbols=None, **kwargs): """ .. deprecated:: 1.48.3 Use :py:func:`otp.run <onetick.py.run>` instead. """ # For backward compatibility: otp.run() does not accept "symbols" as a non-keyword argument if symbols is not None: kwargs['symbols'] = symbols return otp.run(self, **kwargs)
to_dataframe = to_df def print_api_graph(self): self.node().copy_graph(print_out=True) def _add_table(self, strict=False): table = otq.Table( fields=",".join( ott.type2str(dtype) + " " + name for name, dtype in self.columns(skip_meta_fields=True).items() ), keep_input_fields=not strict, ) self.sink(table) def _is_unbound_required(self): """ Check whether a graph needs unbound symbol or not """ for symbol in self.__sources_symbols.values(): if symbol is adaptive or symbol is adaptive_to_default: return True return False def _get_widest_time_range(self): """ Get minimum start time and maximum end time. If time is not found, None is returned. """ start_times = [] end_times = [] for start, end in self.__sources_keys_dates.values(): if start is not adaptive: start_times.append(start) if end is not adaptive: end_times.append(end) start = min(start_times) if start_times else None end = max(end_times) if end_times else None return start, end def _get_date_range(self, symbols=None, start=None, end=None, default_start=None, default_end=None): #noqa if default_start is None: default_start = configuration.config.get('default_start_time', adaptive) if default_end is None: default_end = configuration.config.get('default_end_time', adaptive) # ------------ # # Find symbols need_to_bind_symbol = False common_symbol = None if symbols is None: # let's try to understand whether we could use common symbol for all sources # or we need to bound symbols instead first_symbol = None for symbol in self.__sources_symbols.values(): if first_symbol is None: first_symbol = symbol if isinstance(first_symbol, _ManuallyBoundValue): # Mark that we need to bound, but keep common_symbol equal to None. # It is applicable for the bound symbols inside the merge with bound # symbols, for example. need_to_bind_symbol = True else: common_symbol = symbol continue if symbol and symbol != first_symbol: need_to_bind_symbol = True common_symbol = None break # symbol is specified nowhere - just set unbound to the default one if (first_symbol is adaptive or first_symbol is adaptive_to_default) and ( common_symbol is adaptive or common_symbol is adaptive_to_default ): common_symbol = configuration.config.default_symbol else: # when unbound symbols passed common_symbol = symbols need_to_bind_symbol = True # use to check all sources whether some has bound symbols # Find max and min for _source data ranges sources_start, sources_end = self._get_widest_time_range() sources_start = sources_start or default_start sources_end = sources_end or default_end common_format = "%Y-%m-%d %H:%M:%S" for key, date_range in self.__sources_keys_dates.items(): # find a function that builds _source func = self.__sources_base_ep_func[key] if func: src = func() # -------------------------- # determine whether we have to add modify_query_times to a src start_date, end_date = date_range if not self.__sources_modify_query_times[key]: if start_date is not adaptive or end_date is not adaptive: # if some of the end is specified, then it means # we need to check whether it is worth to wrap into the # modify_query_times if start_date is adaptive: if start is None: start_date = sources_start else: start_date = start if end_date is adaptive: if end is None: end_date = sources_end else: end_date = end if start_date is not adaptive and end_date is not adaptive: # it might happen when either sources_start/end are adaptive # or start/end are adaptive if ( (start is None and sources_start is not adaptive and start_date != sources_start) or (start is not None and start_date != start) or (end is None and sources_end is not adaptive and end_date != sources_end) or (end is not None and end_date != end) ): self.__sources_modify_query_times[key] = True mqt = otq.ModifyQueryTimes( start_time='parse_time(("' + common_format + '.%q"),"' + start_date.strftime(common_format + ".%f") + '", _TIMEZONE)', end_time='parse_time(("' + common_format + '.%q"),"' + end_date.strftime(common_format + ".%f") + '", _TIMEZONE)', output_timestamp="TIMESTAMP", ) src.sink(mqt) if need_to_bind_symbol: bound = None if key in self.__sources_symbols: # TODO: this is wrong, we need to know about symbols # it happens when we do not copy symbols when apply # merge with bound symbols. # Wrong, in that case merge with bound symbol is # non distinguishable from the manually passed None # for external queries bound = self.__sources_symbols[key] if isinstance(bound, _ManuallyBoundValue): bound = bound.value if bound and bound is not adaptive and bound is not adaptive_to_default: src.__node.symbol(bound) else: # if key is not in __sources_symbols, then # it means that symbol was not specified, and # therefor use unbound symbol if common_symbol is None: if bound is adaptive_to_default: src.__node.symbol(configuration.config.default_symbol) else: pass # # TODO: write test validated this # # # raise Exception("One of the branch does not have symbol specified") # -------------------------- # glue _source with the main graph self.node().add_rules(src.node().copy_rules()) self.source_by_key(src.node().copy_graph(), key) self._merge_tmp_otq(src) else: pass if start is None: start = sources_start if end is None: end = sources_end return start, end, common_symbol def _to_graph(self, add_passthrough=True): """ Construct the graph. Only for internal usage. It is private, because it constructs the raw graph assuming that a graph is already defined, and might confuse an end user, because by default Source is not fully defined; it becomes fully defined only when symbols, start and end datetime are specified. """ constructed_obj = self.copy() # we add it for case when the last EP has a pin output if add_passthrough: constructed_obj.sink(otq.Passthrough()) return otq.GraphQuery(constructed_obj.node().get())
[docs] def to_graph(self, raw=None, symbols=None, start=None, end=None, *, add_passthrough=True): """ Construct an otq.GraphQuery object. Parameters ---------- raw: .. deprecated:: 1.4.17 has no effect symbols: symbols query to add to otq.QraphQuery start: datetime start time of a query end: datetime end time of a query add_passthrough: bool add additional Passthrough ep to the end of a resulted graph Returns ------- otq.QraphQuery See Also -------- :meth:`render` """ if raw is not None: warnings.warn('The "raw" flag is deprecated and makes not effect') _obj, _start, _end, _symbols = self.__prepare_graph(symbols, start, end) if _obj._tmp_otq.queries: warnings.warn('Using .to_graph() for a Source object that uses sub-queries! ' 'This operation is deprecated and is not guaranteed to work as expected. ' 'Such a Source should be executed using otp.run() or saved to disk using to_otq()') _obj.sink(otq.Passthrough().output_pin_name('OUT_FOR_TO_GRAPH')) _graph = _obj._to_graph(add_passthrough=False) _graph.set_start_time(_start) _graph.set_end_time(_end) _graph.set_symbols(_symbols) query = _obj._tmp_otq.save_to_file( query=_graph, file_suffix='_to_graph.otq') query_path, query_name = query.split('::') query_params = get_query_parameter_list(query_path, query_name) source_with_nested_query = otp.Query(otp.query(query, **{param: f'${param}' for param in query_params}), out_pin='OUT_FOR_TO_GRAPH') return source_with_nested_query.to_graph( symbols=_symbols, start=_start, end=_end, add_passthrough=add_passthrough) else: return _obj._to_graph(add_passthrough=add_passthrough)
[docs] def render(self, **kwargs): """ Renders a calculation graph using the ``graphviz`` library. Every node is the onetick query language event processor. Nodes in nested queries, first stage queries and eval queries are not shown. Could be useful for debugging and in jupyter to learn the underlying graph. Note that it's required to have :graphviz:`graphviz <>` package installed. Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data1, data2 = data[(data['X'] > 2)] >>> data = otp.merge([data1, data2]) >>> data.render() # doctest: +SKIP .. graphviz:: ../../static/render_example.dot """ kwargs.setdefault('verbose', True) self._to_graph().render(**kwargs)
[docs] @inplace_operation def write(self, db: Union[str, 'otp.DB'], symbol: Optional[Union[str, 'otp.Column']] = None, tick_type: Optional[Union[str, 'otp.Column']] = None, date: Optional[date] = adaptive, append: bool = True, keep_symbol_and_tick_type: bool = adaptive, propagate: bool = True, out_of_range_tick_action: Literal['exception', 'ignore', 'load'] = 'exception', timestamp: Optional['otp.Column'] = None, keep_timestamp: bool = True, correction_type: Optional['otp.Column'] = None, replace_existing_time_series: bool = False, allow_concurrent_write: bool = False, context: str = adaptive, use_context_of_query: bool = False, inplace: bool = False, **kwargs) -> Optional['Source']: """ Saves data result to OneTick database. Note ---- This method does not save anything. It adds instruction in query to save. Data will be saved when query will be executed. Parameters ---------- db: str or :py:class:`otp.DB <onetick.py.DB>` database name or object. symbol: str or Column resulting symbol name string or column to get symbol name from. If this parameter is not set, then ticks _SYMBOL_NAME pseudo-field is used. If it is empty, an attempt is made to retrieve the symbol name from the field named SYMBOL_NAME. tick_type: str or Column resulting tick type string or column to get tick type from. If this parameter is not set, the _TICK_TYPE pseudo-field is used. If it is empty, an attempt is made to retrieve the tick type from the field named TICK_TYPE. date: datetime or None date where to save data. Should be set to `None` if writing to accelerator or memory database. By default, it is set to `otp.config.default_date`. append: bool If False - data will be rewritten for this ``date``, otherwise data will be appended (new symbols are added, existing symbols can be modified (append new ticks, modify existing ticks)). This option is not valid for accelerator databases. keep_symbol_and_tick_type: bool keep fields containing symbol name and tick type when writing ticks to the database or propagating them. By default, this parameter is adaptive. If ``symbol`` or ``tick_type`` are column objects, then it's set to True. Otherwise, it's set to False. propagate: bool Propagate ticks after that event processor or not. out_of_range_tick_action: str Action to be executed if tick's timestamp's date is not ``date``: * 'ignore': tick will not be written to the database * 'exception': runtime exception will be raised timestamp: Column Field that contains the timestamp with which the ticks will be written to the database. By default, the TIMESTAMP pseudo-column is used. keep_timestamp: bool If ``timestamp`` parameter is set and this parameter is set to True, then timestamp column is removed. correction_type: Column The name of the column that contains the correction type. This column will be removed. If this parameter is not set, no corrections will be submitted. replace_existing_time_series: bool If ``append`` is set to True, setting this option to True instructs the loader to replace existing time series, instead of appending to them. Other time series will remain unchanged. allow_concurrent_write: bool Allows different queries running on the same server to load concurrently into the same database. context: str The server context used to look up the database. By default, `otp.config.context` is used if ``use_context_of_query`` is not set. use_context_of_query: bool If this parameter is set to True and the ``context`` parameter is not set, the context of the query is used instead of the default value of the ``context`` parameter. inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise, method returns a new modified object. kwargs: .. deprecated:: use named parameters instead. Returns ------- :class:`Source` or None See also -------- **WRITE_TO_ONETICK_DB** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data = data.write('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data = otp.DataSource('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 """ if 'append_mode' in kwargs: warnings.warn("Parameter 'append_mode' is deprecated, use 'append'", DeprecationWarning) append = kwargs.pop('append_mode') if 'timestamp_field' in kwargs: warnings.warn("Parameter 'timestamp_field' is deprecated, use 'timestamp'", DeprecationWarning) timestamp = kwargs.pop('timestamp_field') if 'keep_timestamp_field' in kwargs: warnings.warn("Parameter 'keep_timestamp_field' is deprecated, use 'keep_timestamp'", DeprecationWarning) keep_timestamp = kwargs.pop('keep_timestamp_field') if kwargs: raise TypeError(f'write() got unexpected arguments: {list(kwargs)}') kwargs = {} if date is adaptive: date = configuration.config.default_date if symbol is not None: if isinstance(symbol, _Column): kwargs['symbol_name_field'] = str(symbol) if keep_symbol_and_tick_type is adaptive: keep_symbol_and_tick_type = True else: kwargs.setdefault('symbol_name_field', '_SYMBOL_NAME_FIELD_') self[kwargs['symbol_name_field']] = symbol if tick_type is not None: if isinstance(tick_type, _Column): kwargs['tick_type_field'] = str(tick_type) if keep_symbol_and_tick_type is adaptive: keep_symbol_and_tick_type = True else: kwargs.setdefault('tick_type_field', '_TICK_TYPE_FIELD_') self[kwargs['tick_type_field']] = tick_type if keep_symbol_and_tick_type is adaptive: keep_symbol_and_tick_type = False if timestamp is not None: kwargs['timestamp_field'] = str(timestamp) if correction_type is not None: kwargs['correction_type_field'] = str(correction_type) if context is not adaptive: kwargs['context'] = context elif not use_context_of_query: kwargs['context'] = otp.config.context self.sink( otq.WriteToOnetickDb( database=str(db), date=date.strftime('%Y%m%d') if date else '', append_mode=append, keep_symbol_name_and_tick_type=keep_symbol_and_tick_type, propagate_ticks=propagate, out_of_range_tick_action=out_of_range_tick_action.upper(), keep_timestamp_field=keep_timestamp, replace_existing_time_series=replace_existing_time_series, allow_concurrent_write=allow_concurrent_write, use_context_of_query=use_context_of_query, **kwargs ) ) for col in ('_SYMBOL_NAME_FIELD_', '_TICK_TYPE_FIELD_'): if col in self.schema: self.drop(col, inplace=True) to_drop = set() if not keep_symbol_and_tick_type: to_drop.update((kwargs['symbol_name_field'], kwargs['tick_type_field'])) if not keep_timestamp and timestamp is not None and str(timestamp) not in {'Time', 'TIMESTAMP'}: to_drop.add(str(timestamp)) if correction_type is not None: to_drop.add(str(correction_type)) self.schema.set(**{ k: v for k, v in self.schema.items() if k not in to_drop }) return self
[docs] def copy(self, ep=None, columns=None, deep=False) -> 'Source': """ Build an object with copied calculation graph. Every node of the resulting graph has the same id as in the original. It means that if the original and copied graphs are merged or joined together further then all common nodes (all that created beforethe .copy() method) will be glued. For example, let's imagine that you have the following calculation graph ``G`` .. graphviz:: digraph { rankdir="LR"; A -> B; } where ``A`` is a source and ``B`` is some operation on it. Then we copy it to the ``G'`` and assign a new operation there .. graphviz:: digraph { rankdir="LR"; A -> B -> C; } After that we decied to merge ``G`` and ``G'``. The resulting calculation graph will be: .. graphviz:: digraph { rankdir="LR"; A -> B -> C -> MERGE; B -> MERGE; } Please use the :meth:`Source.deepcopy` if you want to get the following calculation graph after merges and joins .. graphviz:: digraph { rankdir="LR"; A -> B -> C -> MERGE; "A'" -> "B'" -> "C'" -> MERGE; } Returns ------- Source See Also -------- Source.deepcopy """ if columns is None: columns = self.columns() if ep: result = self.__class__(node=ep, **columns) result.source(self.node().copy_graph()) # we need to clean it, because ep is not a _source result._clean_sources_dates() else: result = self.__class__(node=self.node(), **columns) result.node().add_rules(self.node().copy_rules(deep=deep)) result._set_sources_dates(self) if deep: # generating all new uuids for node history and for sources # after they were initialized keys = defaultdict(uuid.uuid4) # type: ignore result.node().rebuild_graph(keys) result._change_sources_keys(keys) # add state result._copy_state_vars_from(self) result._tmp_otq = self._tmp_otq.copy() result.__name = self.__name #noqa result._copy_properties_from(self) return result
[docs] def deepcopy(self, ep=None, columns=None) -> 'onetick.py.Source': """ Copy all graph and change ids for every node. More details could be found in :meth:`Source.copy` See Also -------- Source.copy """ return self.copy(ep, columns, deep=True)
def _copy_properties_from(self, obj): # needed if we are doing copy of a child with custom properties for attr in set(self.__class__._PROPERTIES) - set(Source._PROPERTIES): setattr(self, attr, getattr(obj, attr)) def _copy_state_vars_from(self, objs): self.__dict__["_state_vars"] = StateVars(self, objs)
[docs] def split(self, expr, cases, default=False) -> Tuple['Source', ...]: """ The method splits data using passed expression ``expr`` for several outputs by passed ``cases``. The method is the alias for the :meth:`Source.switch` Parameters ---------- expr : Operation column or column based expression cases : list list of values or :py:class:`onetick.py.range` objects to split by default : bool ``True`` adds the default output Returns ------- Outputs according to passed cases, number of outputs is len(cases) plus one if ``default=True`` See also -------- | :meth:`Source.switch` | **SWITCH** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Source operations.split; >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4]) >>> r1, r2, r3 = data.split(data['X'], [otp.nan, otp.range(0, 100)], default=True) >>> r1() Time X 0 2003-12-01 00:00:00.002 NaN >>> r2() Time X 0 2003-12-01 00:00:00.000 0.33 1 2003-12-01 00:00:00.003 9.40 >>> r3() Time X 0 2003-12-01 00:00:00.001 -5.1 See Also -------- Source.switch :py:class:`onetick.py.range` """ output_num = len(cases) # format cases def to_str(v): if isinstance(v, onetick.py.utils.range): return "[" + str(v.start) + "," + str(v.stop) + "]" elif isinstance(v, str): return '"' + v + '"' elif isinstance(v, tuple): return ",".join(map(to_str, list(v))) return str(v) cases = [f"{to_str(cases[inx])}:OUT{inx}" for inx in range(output_num)] # create ep params = dict(switch=str(expr), cases=";".join(cases)) if default: params["default_output"] = "DEF_OUT" switch = self.copy(ep=otq.SwitchEp(**params)) # construct results result = [] for inx in range(output_num): res = switch.copy() res.node().out_pin(f"OUT{inx}") res.sink(otq.Passthrough()) result.append(res) if default: res = switch.copy() res.node().out_pin("DEF_OUT") res.sink(otq.Passthrough()) result.append(res) return tuple(result)
[docs] def switch(self, expr, cases, default=False) -> Tuple['Source', ...]: """ The method splits data using passed expression for several outputs by passed cases. This method is an alias for :meth:`Source.split` method. Parameters ---------- expr : Operation column or column based expression cases : list list of values or :py:class:`onetick.py.range` objects to split by default : bool ``True`` adds the default output Returns ------- Outputs according to passed cases, number of outputs is len(cases) plus one if ``default=True`` See also -------- | :meth:`Source.split` | **SWITCH** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4]) >>> r1, r2, r3 = data.switch(data['X'], [otp.nan, otp.range(0, 100)], default=True) >>> r1() Time X 0 2003-12-01 00:00:00.002 NaN >>> r2() Time X 0 2003-12-01 00:00:00.000 0.33 1 2003-12-01 00:00:00.003 9.40 >>> r3() Time X 0 2003-12-01 00:00:00.001 -5.1 See Also -------- Source.split :py:class:`onetick.py.range` """ return self.split(expr, cases, default)
def columns(self, skip_meta_fields=False): """ Return columns in data source Parameters ---------- skip_meta_fields: bool, default=False do not add meta fields Returns ------- dict """ result = {} for key, value in self.__dict__.items(): if skip_meta_fields and key in self.__class__.meta_fields: continue if key in self.__class__._PROPERTIES: continue if isinstance(value, _Column): result[value.name] = value.dtype return result def drop_columns(self): """ Method removes all columns in the python representation, but don't drop columns on the data. It is used when external query is applied, because we don't know how data schema has changed. """ items = [] for key, value in self.__dict__.items(): if key in self.__class__.meta_fields or key in self.__class__._PROPERTIES: continue if isinstance(value, _Column): items.append(key) for item in items: del self.__dict__[item] def node(self): return self.__node def tick_type(self, tt): self.__node.tick_type(tt) return self def symbol(self, symbol): """ Apply symbol to graph .. deprecated:: 1.3.31 """ warnings.warn("symbol method is deprecated, please specify symbol during creation", DeprecationWarning) self.__node.symbol(symbol) return self def node_name(self, name=None, key=None): return self.__node.node_name(name, key) def __add__(self, other) -> 'Source': return onetick.py.functions.merge([self, other])
[docs] @inplace_operation def table(self, inplace=False, strict: bool = True, **schema) -> Optional['Source']: """ Set the OneTick and python schemas levels according to the ``schema`` parameter. The ``schema`` should contain either (field_name -> type) pairs or (field_name -> default value) pairs; ``None`` means no specified type, and OneTick considers it's as a double type. Resulting ticks have the same order as in the ``schema``. If only partial fields are specified (i.e. when the ``strict=False``) then fields from the ``schema`` have the most left position. Parameters ---------- inplace: bool The flag controls whether operations should be applied inplace strict: bool If set to ``False``, all fields present in an input tick will be present in the output tick. If ``True``, then only fields specified in the ``schema``. schema: field_name -> type or field_name -> default value pairs that should be applied on the source. Returns ------- :class:`Source` or ``None`` See Also -------- | :attr:`Source.schema` | :meth:`__getitem__`: the table shortcut | **TABLE** OneTick event processor Examples -------- Selection case >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.table(X2=int, A1=str) # OTdirective: snippet-name: Arrange.set schema; >>> otp.run(data) Time X2 A1 0 2003-12-01 00:00:00.000 3 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 1 A Defining default values case (note the order) >>> data = otp.Ticks(X=[1, 2, 3]) >>> data = data.table(Y=0.5, strict=False) >>> otp.run(data) Time Y X 0 2003-12-01 00:00:00.000 0.5 1 1 2003-12-01 00:00:00.001 0.5 2 2 2003-12-01 00:00:00.002 0.5 3 """ def is_time_type_or_nsectime(obj): return ott.is_time_type(obj) or isinstance(obj, ott.nsectime) def transformer(name, obj): if obj is None: return name res = f'{ott.type2str(ott.get_object_type(obj))} {name}' if not isinstance(obj, Type) and not is_time_type_or_nsectime(obj): res += f' ({ott.value2str(obj)})' return res def get_type(value): if value is None: return float return ott.get_object_type(value) for c_name in list(schema.keys()): if c_name in self.__class__.meta_fields: # meta fields should not be propagated to Table ep # otherwise new user-defined field with the same name will appear in schema # and using this field will raise an "ambiguous use" error in OneTick warnings.warn(f"Meta field '{c_name}' should not be used in .table() method.") schema.pop(c_name) if not schema: return self schema_to_set = {c_name: get_type(c_value) for c_name, c_value in schema.items()} if strict: self.schema.set(**schema_to_set) else: self.schema.update(**schema_to_set) fields = ','.join([transformer(c_name, c_value) for c_name, c_value in schema.items()]) self.sink(otq.Table(fields=fields, keep_input_fields=not strict)) for c_name, c_value in schema.items(): # datetime and nsetime values require onetick built-in functions to be initialized # but built-in functions can't be used in table ep so updating columns after the table if is_time_type_or_nsectime(c_value): self.update({c_name: c_value}, where=self[c_name] == 0, inplace=True) self._fix_varstrings() return self
def _fix_varstrings(self): """ PY-556: converting to varstring results in string with null-characters """ varstring_columns = { name: self[name] for name, dtype in self.schema.items() if dtype is ott.varstring } # just updating the column removes null-characters if varstring_columns: self.update(varstring_columns, inplace=True)
[docs] @inplace_operation def drop(self, columns: List[Any], inplace=False) -> Optional['Source']: """ Remove a list of columns from the Source. If column with such name wasn't found the error will be raised, if the regex was specified and there aren't any matched columns, do nothing. Regex is any string containing any of characters ***+?\:[]{}()**, dot is a valid symbol for OneTick identificator, so **a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)** Parameters ---------- columns : str, Column or list of them Column(s) to remove. You could specify a regex or collection of regexes, in such case columns with match names will be deleted. inplace: bool A flag controls whether operation should be applied inplace. If inplace=True, then it returns nothing. Otherwise method returns a new modified object. Returns ---------- :class:`Source` or ``None`` See Also -------- **PASSTHROUGH** OneTick event processor Examples -------- >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.drop("X1") # OTdirective: snippet-name: Arrange.drop.one field; >>> otp.run(data) Time X2 A1 0 2003-12-01 00:00:00.000 3 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 1 A Regexes also could be specified in such case all matched columns will be deleted >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.drop(r"X\d+") # OTdirective: snippet-name: Arrange.drop.regex; >>> otp.run(data) Time A1 0 2003-12-01 00:00:00.000 A 1 2003-12-01 00:00:00.001 A 2 2003-12-01 00:00:00.002 A Several parameters can be specified >>> # OTdirective: snippet-name: Arrange.drop.multiple; >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... Y1=[1, 2, 3], ... Y2=[3, 2, 1], ... YA=["a", "b", "c"], ... A1=["A", "A", "A"]) >>> data = data.drop([r"X\d+", "Y1", data["Y2"]]) >>> otp.run(data) Time YA A1 0 2003-12-01 00:00:00.000 a A 1 2003-12-01 00:00:00.001 b A 2 2003-12-01 00:00:00.002 c A **a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)** >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1], ... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]}) >>> data = data.drop("COLUMN.A") # OTdirective: skip-snippet:; >>> otp.run(data) Time COLUMN1A COLUMN1B COLUMN2A 0 2003-12-01 00:00:00.000 3 a c 1 2003-12-01 00:00:00.001 2 b b 2 2003-12-01 00:00:00.002 1 c a >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1], ... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]}) >>> data = data.drop("(COLUMN.A)") # OTdirective: skip-snippet:; >>> otp.run(data) Time COLUMN1B 0 2003-12-01 00:00:00.000 a 1 2003-12-01 00:00:00.001 b 2 2003-12-01 00:00:00.002 c """ self.__delitem__(columns) return self
[docs] @inplace_operation def dropna(self, how: Literal["any", "all"] = "any", inplace=False) -> Optional['Source']: """ Drops ticks that contain NaN values according to the policy in the ``how`` parameter Parameters ---------- how: "any" or "all" ``any`` - filters out ticks if at least one field has NaN value ``all`` - filters out ticks if all fields have NaN values. inplace: bool the flag controls whether operation should be applied inplace. Returns ------- :class:`Source` or ``None`` Examples -------- Drop ticks where **at least one** field has ``nan`` value. >>> data = otp.Ticks([[ 'X', 'Y'], ... [ 0.0, 1.0], ... [ otp.nan, 2.0], ... [ 4.0, otp.nan], ... [ otp.nan, otp.nan], ... [ 6.0, 7.0]]) >>> data = data.dropna() >>> otp.run(data)[['X', 'Y']] X Y 0 0.0 1.0 1 6.0 7.0 Drop ticks where **all** fields have ``nan`` values. >>> data = otp.Ticks([[ 'X', 'Y'], ... [ 0.0, 1.0], ... [ otp.nan, 2.0], ... [ 4.0, otp.nan], ... [ otp.nan, otp.nan], ... [ 6.0, 7.0]]) >>> data = data.dropna(how='all') >>> otp.run(data)[['X', 'Y']] X Y 0 0.0 1.0 1 NaN 2.0 2 4.0 NaN 3 6.0 7.0 """ if how not in ["any", "all"]: raise ValueError(f"It is expected to see 'any' or 'all' values for 'how' parameter, but got '{how}'") condition = None for column_name, dtype in self.columns(skip_meta_fields=True).items(): if dtype is float: if condition is None: condition = self[column_name] != ott.nan else: if how == "any": condition &= self[column_name] != ott.nan elif how == "all": condition |= self[column_name] != ott.nan self.sink(otq.WhereClause(where=str(condition))) return self
def __delitem__(self, obj): if isinstance(obj, list) or isinstance(obj, tuple): objs = obj else: objs = (obj,) # we can't be sure python Source has consistent columns cache, because sinking complex event processors # can change columns unpredictable, so if user will specify regex as a param, we will pass regex # as an onetick's param, but delete all matched columns from python Source cache. # items_to_passthrough - items to pass to Passthrough as parameters # names_of_columns - names of onetick.py Source columns to delete from Source instance items_to_passthrough, names_of_columns, regex = self._get_columns_names(objs) self._validate_columns_names(names_of_columns) for item in names_of_columns: del self.__dict__[item] self.sink(otq.Passthrough(drop_fields=True, fields=",".join(items_to_passthrough), use_regex=regex)) def _validate_columns_names(self, names_of_columns): for item in names_of_columns: if item in self.__dict__: if not isinstance(self.__dict__[item], _Column): raise AttributeError(f"There is no '{item}' column") else: raise AttributeError(f"There is no '{item}' column") def _get_columns_names(self, objs): items_to_passthrough = [] names_of_columns = [] regex = False for obj in objs: if isinstance(obj, _Column): items_to_passthrough.append(obj.name) names_of_columns.append(obj.name) elif isinstance(obj, str): items_to_passthrough.append(obj) if any(c in "*+?\\:[]{}()" for c in obj): # it is a possible regex regex = True names_of_columns.extend(col for col in self.columns() if re.match(obj, col)) else: names_of_columns.append(obj) else: raise TypeError(f"It is not supported to delete item '{obj}' of type '{type(obj)}'") # remove duplications and meta_fields names_of_columns = set(names_of_columns) - set(self.__class__.meta_fields) return items_to_passthrough, list(names_of_columns), regex def __from_ep_to_proxy(self, ep): in_pin, out_pin = None, None if isinstance(ep, otq.graph_components.EpBase.PinnedEp): if hasattr(ep, "_output_name"): out_pin = getattr(ep, "_output_name") else: in_pin = getattr(ep, "_input_name") ep = getattr(ep, "_ep") return ep, uuid.uuid4(), in_pin, out_pin
[docs] def sink(self, ep, out_pin=None, move_node=True): """ Appends node inplace to the source. Connect `out_pin` of this source to `ep`. Can be used to connect onetick.query objects to onetick.py source. Data schema changes (added or deleted columns) will not be detected automatically after applying `sink` function, so the user must change the schema himself by updating `schema` property. Parameters ---------- ep: otq.graph_components.EpBase,\ otq.graph_components.EpBase.PinnedEp,\ Tuple[otq.graph_components.EpBase, uuid.uuid4, Optional[str], Optional[str]] onetick.query EP object to append to source. out_pin: Optional[str], default=None name of the out pin to connect to `ep` move_node: bool, default=True Returns ---------- result: otq.graph_components.EpBase, otq.graph_components.EpBase.PinnedEp Last node of the source See Also -------- onetick.py.Source.schema onetick.py.core._source.schema.Schema Examples -------- Adding column 'B' directly with onetick.query EP. >>> data = otp.Tick(A=1) >>> data.sink(otq.AddField(field='B', value=2)) # OTdirective: skip-snippet:; AddField(field='B',value=2) >>> data.to_df() # OTdirective: skip-snippet:; Time A B 0 2003-12-01 1 2 But we can't use this column with onetick.py methods yet. >>> data['C'] = data['B'] # OTdirective: skip-snippet:; # doctest: +ELLIPSIS Traceback (most recent call last): ... KeyError: 'Column name B is not in the schema... We should manually change source's schema >>> data.schema.update(B=int) # OTdirective: skip-snippet:; >>> data['C'] = data['B'] >>> data.to_df() Time A B C 0 2003-12-01 1 2 2 """ if not ( issubclass(type(ep), otq.graph_components.EpBase) or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp) or type(ep) is tuple ): raise Exception("sinking is allowed only for EpBase instances") if type(ep) is tuple: # for already existed EP fetched from _ProxyNode return self.__node.sink(out_pin, *ep, move_node) else: return self.__node.sink(out_pin, *self.__from_ep_to_proxy(ep), move_node)
def __rshift__(self, ep): """ duplicates sink() method, but returns new object """ new_source = self.copy() new_source.sink(ep) return new_source def __irshift__(self, ep): """ duplicates sink() method, but assigns source new object """ new_source = self.copy() new_source.sink(ep) return new_source def source(self, ep, in_pin=None): """ Add node as source to root node """ if not ( issubclass(type(ep), otq.graph_components.EpBase) or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp) or type(ep) is tuple ): raise Exception("sourcing is allowed only for EpBase instances") if type(ep) is tuple: # for already existed EP fetched from _ProxyNode return self.__node.source(in_pin, *ep) else: return self.__node.source(in_pin, *self.__from_ep_to_proxy(ep)) def source_by_key(self, ep, to_key): """ Add node as source to graph node by key""" if not ( issubclass(type(ep), otq.graph_components.EpBase) or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp) or type(ep) is tuple ): raise Exception("sourcing is allowed only for EpBase instances") if type(ep) is tuple: # for already existed EP fetched from _ProxyNode return self.__node.source_by_key(to_key, *ep) else: return self.__node.source_by_key(to_key, *self.__from_ep_to_proxy(ep)) def apply_query(self, query, in_pin="IN", out_pin="OUT", **params): """ Apply data source to query .. deprecated:: 1.3.77 See Also -------- Source.apply """ warnings.warn( 'The "apply_qery" method is deprecated. Please, use the .apply method instead or ' "call a reference queries directly." ) res = onetick.py.functions.apply_query(query, {in_pin: self}, [out_pin], **params) res.node().out_pin(out_pin) return res
[docs] def apply(self, obj) -> Union['otp.Column', 'Source']: """ Apply object to data source. Parameters ---------- obj: onetick.py.query, Callable, type, onetick.query.GraphQuery - `onetick.py.query` allows to apply external nested query - python `Callable` allows to translate python code to similar OneTick's CASE expression. There are some limitations to which python operators can be used in this callable. See :ref:`Python callables parsing guide <python callable parser>` article for details. In :ref:`Remote OTP with Ray<ray-remote>` any `Callable` must be decorated with `@otp.remote` decorator, see :ref:`Ray usage examples<apply-remote-context>` for details. - `type` allows to apply default type convertation - `onetick.query.GraphQuery` allows to apply a build onetick.query.Graph Returns ------- Column, Source Examples -------- Apply external query to a tick flow. In this case it assumes that query has only one input and one output. Check the :class:`query` examples if you want to use a query with multiple inputs or outputs. >>> data = otp.Ticks(X=[1, 2, 3]) >>> external_query = otp.query('update.otq') >>> data = data.apply(external_query) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 2 1 2003-12-01 00:00:00.001 4 2 2003-12-01 00:00:00.002 6 Apply a predicate to a column / operation. In this case value passed to a predicate is column values. Result is a column. >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'].apply(lambda x: x * 2) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 2 1 2003-12-01 00:00:00.001 2 4 2 2003-12-01 00:00:00.002 3 6 Another example of applying more sophisticated operation >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'].apply(lambda x: 1 if x > 2 else 0) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 0 1 2003-12-01 00:00:00.001 2 0 2 2003-12-01 00:00:00.002 3 1 Example of applying a predicate to a Source. In this case value passed to a predicate is a whole tick. Result is a column. >>> data = otp.Ticks(X=[1, 2, 3], Y=[.5, -0.4, .2]) >>> data['Z'] = data.apply(lambda tick: 1 if abs(tick['X'] * tick['Y']) > 0.5 else 0) >>> otp.run(data) Time X Y Z 0 2003-12-01 00:00:00.000 1 0.5 0 1 2003-12-01 00:00:00.001 2 -0.4 1 2 2003-12-01 00:00:00.002 3 0.2 1 See Also -------- :py:class:`onetick.py.query` :py:meth:`onetick.py.Source.script` :ref:`Python callables parsing guide <python callable parser>` """ if isinstance(obj, onetick.py.sources.query): graph = obj.graph_info if len(graph.nested_inputs) != 1: raise Exception( f'It is expected the query "{obj.query_name}" to have one input, but it ' f"has {len(graph.nested_inputs)}" ) if len(graph.nested_outputs) > 1: raise Exception( f'It is expected the query "{obj.query_name}" to have one or no output, but it ' f"has {len(graph.nested_outputs)}" ) in_pin = graph.nested_inputs[0].NESTED_INPUT if len(graph.nested_outputs) == 0: out_pin = None # means no output else: out_pin = graph.nested_outputs[0].NESTED_OUTPUT return obj(**{in_pin: self})[out_pin] elif isinstance(obj, otq.GraphQuery): tmp_file = utils.TmpFile(suffix=".otq") obj.save_to_file( tmp_file.path, "graph_query_to_apply", # start and end times don't matter for this query, use some constants start=db_constants.DEFAULT_START_DATE, end=db_constants.DEFAULT_END_DATE, ) query_info = get_query_info(tmp_file.path) if len(query_info.sources) != 1: raise Exception( "It is expected the query to have one input, but it " f"has {len(query_info.sources)}" ) if len(query_info.sinks) != 1: raise Exception("It is expected the query to have one output, but it " f"has {len(query_info.sinks)}") add_pins( tmp_file.path, "graph_query_to_apply", [(query_info.sources[0], 1, "IN"), (query_info.sinks[0], 0, "OUT")], ) return onetick.py.sources.query(tmp_file.path + "::graph_query_to_apply")(IN=self)["OUT"] return apply_lambda(obj, _EmulateObject(self))
[docs] def dump(self, label: str = None, where: 'otp.Operation' = None, columns: Union[str, Tuple[str], List[str]] = None, callback: Callable = None): """ Dumps the ``columns`` from ticks into std::out in runtime if they fit the ``where`` condition. Every dump has a corresponding header that always includes the TIMESTAMP field. Other fields could be configured using the ``columns`` parameter. A header could be augmented with a ``label`` parameter; this label is an addition column that helps to distinguish ticks from multiple dumps with the same schema, because ticks from different dumps could be mixed. It might happen bacause of the OneTick multithreading, and there is the operating system buffer between the OneTick and the actual output. This method is helpful for debugging. Parameters ---------- label: str A label for a dump. It adds a special column **_OUT_LABEL_** for all ticks and set to the specified value. It helps to distinguish ticks from multiple dumps, because actual output could contain mixed ticks due the concurrency. ``None`` means no label. where: Operation A condition that allows to filter ticks to dump. ``None`` means no filtering. columns: str, tupel or list List of columns that shoud be in the output. ``None`` means dump all columns. callback : callable Callable, which preprocess source before printing. See also -------- **WRITE_TEXT** OneTick event processor Examples -------- >>> # OTdirective: skip-snippet:; >>> data.dump(label='Debug point', where=data['PRICE'] > 99.3, columns=['PRICE', 'QTY']) # doctest: +SKIP >>> data.dump(columns="X", callback=lambda x: x.first(), label="first") # doctest: +SKIP """ self_c = self.copy() if callback: self_c = callback(self_c) if where is not None: # can't be simplified because the _Operation overrides __bool__ self_c, _ = self_c[(where)] if columns: self_c = self_c[columns if isinstance(columns, (list, tuple)) else [columns]] if label: self_c['_OUT_LABEL_'] = label self_c.sink(otq.WriteText(formats_of_fields='TIMESTAMP=%|' + configuration.config.tz + '|%d-%m-%Y %H:%M:%S.%J', prepend_timestamp=False, prepend_symbol_name=False, propagate_ticks=True)) # print <no data> in case there are 0 ticks self_c = self_c.agg({'COUNT': otp.agg.count()}) self_c, _ = self_c[self_c['COUNT'] == 0] self_c['NO_DATA'] = '<no data>' self_c = self_c[['NO_DATA']] self_c.sink(otq.WriteText(output_headers=False, prepend_timestamp=False, prepend_symbol_name=False, propagate_ticks=True)) # Do not propagate ticks then, because we want just to write them into # the std::out. We have to do that, because otherwise these ticks would # go to a query output, that would mess real output. self_c, _ = self_c[self_c['Time'] != self_c['Time']] # We have to merge the branch back to the main branch even that these # branch does not generate ticks, because we do not introduce one more # output point, because the OneTick would add it to the final output # datastructure. self.sink(otq.Merge(identify_input_ts=False)) self.source(self_c.node().copy_graph()) self.node().add_rules(self_c.node().copy_rules()) self._merge_tmp_otq(self_c)
[docs] def script(self, func: Union[Callable[['Source'], Optional[bool]], str], inplace=False) -> 'Source': # TODO: need to narrow Source object to get rid of undesired methods like aggregations """ Implements a script for every tick. Allows to pass a ``func`` that will be applied per every tick. A ``func`` can be python callable in this case it will be translated to per tick script. In order to use it in Remote OTP with Ray, the function should be decorated with ``@otp.remote``, see :ref:`Ray usage examples<apply-remote-context>` for details. See :ref:`Per Tick Script Guide <python callable parser>` for more detailed description of python to OneTick code translation and per-tick script features. The script written in per tick script language can be passed itself as a string or path to a file with the code. onetick-py doesn't validate the script, but configure the schema accordingly. Parameters ---------- func: callable, str or path - a callable that takes only one parameter - actual tick that behaves like a `Source` instance - or the script on per-tick script language - or a path to file with onetick script Returns ------- :class:`Source` See also -------- **PER_TICK_SCRIPT** OneTick event processor Examples -------- >>> t = otp.Ticks({'X': [1, 2, 3], 'Y': [4, 5, 6]}) >>> def fun(tick): ... tick['Z'] = 0 ... if tick['X'] + tick['Y'] == 5: ... tick['Z'] = 1 ... elif tick['X'] + tick['Y'] * 2 == 15: ... tick['Z'] = 2 >>> t = t.script(fun) >>> t() Time X Y Z 0 2003-12-01 00:00:00.000 1 4 1 1 2003-12-01 00:00:00.001 2 5 0 2 2003-12-01 00:00:00.002 3 6 2 See also -------- :py:meth:`onetick.py.Source.apply` :ref:`Per-Tick Script Guide <python callable parser>` """ res = self.copy() res = self if inplace else self.copy() if callable(func): _new_columns, _script = apply_script(func, _EmulateObject(self)) elif isinstance(func, str): if os.path.isfile(func): # path to the file with script with open(func) as file: _script = file.read() else: _script = func _new_columns = self._extract_columns_from_script(_script) else: raise ValueError("Wrong argument was specify, please use callable or string with either script on per tick " "language or path to it") res.sink(otq.PerTickScript(script=_script)) res.schema.update(**_new_columns) return res
def _extract_columns_from_script(self, script): result = {} type2type = dict(byte=int, short=int, uint=int, long=int, ulong=int, int=int, float=float, double=float, string=str, time32=ott.nsectime, nsectime=ott.nsectime, msectime=ott.msectime, varstring=ott.varstring) type2type[int] = int types = r"(?P<type>varstring|byte|short|uint|int|ulong|long|" \ r"float|double|decimal|string|time32|msectime|nsectime|matrix)" length = r"(\[(?P<length>\d+)\])?" name = r"\s+(?P<name>\w+)\s*=\s*" pattern = re.compile(types + length + name) for p in re.finditer(pattern, script): groupdict = p.groupdict() type = type2type.get(groupdict["type"]) if type: length = groupdict["length"] if length: length = int(length) if type is str and length != ott.string.DEFAULT_LENGTH: type = ott.string[length] result[groupdict["name"]] = type else: warnings.warn(f"{groupdict['type']} isn't supported for now, so field {groupdict['name']} won't " f"be added to schema.") return result
[docs] def to_symbol_param(self): """ Creates a read-only instance with the same columns except Time. It is used as a result of a first stage query with symbol params. See also -------- :ref:`Symbol Parameters Objects` :ref:`Symbol parameters` Examples -------- >>> symbols = otp.Ticks({'SYMBOL_NAME': ['S1', 'S2'], 'PARAM': ['A', 'B']}) >>> symbol_params = symbols.to_symbol_param() >>> t = otp.DataSource('SOME_DB', tick_type='TT') >>> t['S_PARAM'] = symbol_params['PARAM'] >>> result = otp.run(t, symbols=symbols) >>> result['S1'] Time X S_PARAM 0 2003-12-01 00:00:00.000 1 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 3 A """ return _SymbolParamSource(**self.columns())
@staticmethod def _convert_symbol_to_string(symbol, tmp_otq=None, start=None, end=None, timezone=None): if start is adaptive: start = None if end is adaptive: end = None if isinstance(symbol, Source): symbol = otp.eval(symbol).to_eval_string(tmp_otq=tmp_otq, start=start, end=end, timezone=timezone, operation_suffix='symbol', query_name=None, file_suffix=symbol._name_suffix('symbol.otq')) if isinstance(symbol, onetick.py.sources.query): return symbol.to_eval_string() else: return symbol @staticmethod def _construct_multi_branch_graph(branches): # TODO: add various checks, e.g. that branches have common parts main = branches[0].copy() for branch in branches[1:]: main.node().add_rules(branch.node().copy_rules()) main._merge_tmp_otq(branch) return main def _apply_side_branches(self, side_branches): for side_branch in side_branches: self.node().add_rules(side_branch.node().copy_rules()) self._merge_tmp_otq(side_branch) self.__sources_keys_dates.update(side_branch.__sources_keys_dates) self.__sources_modify_query_times.update(side_branch.__sources_modify_query_times) self.__sources_base_ep_func.update(side_branch.__sources_base_ep_func) self.__sources_symbols.update(side_branch.__sources_symbols) def symbols_for(self, func, *, symbol="SYMBOL_NAME", start="_PARAM_START_TIME_NANOS", end="_PARAM_END_TIME_NANOS"): """ Apply ticks from the current _source as symbols for calculations in the 'func'. Parameters ---------- func: callable A callable object that takes only one parameter, that references to the current symbol. symbol: str The column name that contains symbols. Default is 'SYMBOL_NAME'. The mandatory column. start: str The column name that contains start time for a symbol. Default is '_PARAM_START_TIME_NANOS'. The ptional column. end: str The column name that contains end time for a symbol. Default is '_PARAM_END_TIME_NANOS'. The optional column. Returns ------- A multi symbol ticks _source. """ symbols = self.copy() if str(symbol) != "SYMBOL_NAME": symbols["SYMBOL_NAME"] = symbols[str(symbol)] if str(start) != "_PARAM_START_TIME_NANOS": symbols["_PARAM_START_TIME_NANOS"] = symbols[str(start)] if str(end) != "_PARAM_END_TIME_NANOS": symbols["_PARAM_END_TIME_NANOS"] = symbols[str(end)] if "SYMBOL_NAME" not in symbols.columns(): raise Exception("Ticks do not have the SYMBOL_NAME column, but this is mandatory column.") # -------------- # num_params = len(inspect.signature(func).parameters) if num_params == 0: logic = func() elif num_params == 1: logic = func(symbols.to_symbol_param()) else: raise ValueError( f"It is expected only one parameter from the callback, but {num_params} passed" ) # TODO: test this case # -------------- # # Find date range for the logic query # If date range is not specified for at least one _source, then # try to deduce date range - set to adaptive # TODO: we don't need to get common symbol from _get_date_range, # refactor this function, so we don't get an error if default symbol is not set logic_start, logic_end, _ = logic.copy()._get_date_range(default_start=adaptive, default_end=adaptive) if logic_start is adaptive: logic_start = onetick.py.utils.INF_TIME if logic_end is adaptive: logic_end = onetick.py.ZERO_TIME # The same as previous, but for the symbols symbols_start, symbols_end, _ = symbols.copy()._get_date_range(default_start=adaptive, default_end=adaptive) if symbols_start is adaptive: symbols_start = onetick.py.utils.INF_TIME if symbols_end is adaptive: symbols_end = onetick.py.ZERO_TIME # Query interval should be as wider as possible start = min(logic_start, symbols_start) end = max(logic_end, symbols_end) # If nothing is specified, then just use default if start == onetick.py.utils.INF_TIME: start = None if end == onetick.py.ZERO_TIME: end = None return _MultiSymbolsSource(symbols.copy(), logic, start=start, end=end)
[docs] def join_with_query( self, query, how="outer", symbol=None, params=None, start=None, end=None, timezone=None, prefix=None, caching=None, keep_time=None, where=None, **kwargs, ) -> 'Source': """ For each tick executes ``query``. Parameters ---------- query: callable, Source Callable ``query`` should return :class:`Source`. This object will be evaluated by OneTick (not python) for every tick. Note python code will be executed only once, so all python's conditional expressions will be evaluated only once too. Callable should have ``symbol`` parameter and the parameters with names from ``params`` if they are specified in this method. If ``query`` is a :class:`Source` object then it will be propagated as a query to OneTick. how: 'inner', 'outer' Type of join. If **inner**, then each tick is propagated only if its ``query`` execution has a non-empty result. params: dict Mapping of the parameters' names and their values for the ``query``. :py:class:`Columns <onetick.py.Column>` can be used as a value. symbol: str, Operation or Tuple[Union[str, Operation], dict] Symbol to use in ``query``, in addition dictionary of symbol params can be passed along with symbol. If ``symbol`` is `None` then symbol from the main source is used. start: datetime, Operation Start time of ``query``. By default, start time of the main source is used. end: datetime, Operation End time of ``query``. By default, end time of the main source is used. start_time: .. deprecated:: 1.48.4 The same as ``start``. end_time: .. deprecated:: 1.48.4 The same as ``end``. timezone : Optional, str Timezone of ``query``. By default, timezone of the main source is used. prefix : str Prefix for the names of joined tick fields. caching : str If `None` caching is disabled. You can specify caching by using values: * 'cross_symbol': cache is the same for all symbols * 'per_symbol': cache is different for each symbol. keep_time : str Name for the joined timestamp column. `None` means no timestamp column will be joined. where : Operation Condition to filter ticks for which the result of the ``query`` will be joined. Returns ------- :class:`Source` Source with joined ticks from ``query`` See also -------- **JOIN_WITH_QUERY** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Special functions.join with query.with an otp data source; >>> d = otp.Ticks(Y=[-1]) >>> d = d.update(dict(Y=1), where=(d.Symbol.name == "a")) >>> data = otp.Ticks(X=[1, 2], ... S=["a", "b"]) >>> res = data.join_with_query(d, how='inner', symbol=data['S']) >>> otp.run(res)[["X", "Y", "S"]] X Y S 0 1 1 a 1 2 -1 b >>> d = otp.Ticks(ADDED=[-1]) >>> d = d.update(dict(ADDED=1), where=(d.Symbol.name == "3")) # symbol name is always string >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(d, how='inner', symbol=(data['A'] + data['B'])) # OTdirective: skip-snippet:; >>> df = otp.run(res) >>> df[["A", "B", "ADDED"]] A B ADDED 0 1 2 1 1 2 4 -1 Constants as symbols are also supported >>> d = otp.Ticks(ADDED=[d.Symbol.name]) >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(d, how='inner', symbol=1) # OTdirective: skip-snippet:; >>> df = otp.run(res) >>> df[["A", "B", "ADDED"]] A B ADDED 0 1 2 1 1 2 4 1 Function object as query is also supported (Note it will be executed only once in python's code) >>> def func(symbol): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST'] ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B'], dict(PREF="_", POST="$"))) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _three$ 1 2 4 _six$ It's possible to pass the source itself as a list of symbol parameters, which will make all of its fields accessible through the "symbol" object >>> def func(symbol): ... d = otp.Ticks(TYPE=["six"]) ... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST'] ... return d >>> # OTdirective: snippet-name: Source operations.join with query.source as symbol; >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["_", "$"], POST=["$", "_"]) >>> res = data.join_with_query(func, how='inner', symbol=data) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _six$ 1 2 4 $six_ The examples above can be rewritten by using otq params instead of symbol params. OTQ parameters are global for query, while symbol parameters can be redefined by bound symbols. >>> def func(symbol, pref, post): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = pref + d["TYPE"] + post ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params; >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']), ... params=dict(pref="_", post="$")) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _three$ 1 2 4 _six$ Some or all otq params can be column or expression also >>> def func(symbol, pref, post): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = pref + d["TYPE"] + post ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params from fields; # noqa >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["^", "_"], POST=["!", "$"]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']), ... params=dict(pref=data["PREF"] + ".", post=data["POST"])) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 ^.three! 1 2 4 _.six$ You can specify start and end time of the query to select specific ticks from db >>> # OTdirective: snippet-name: Special functions.join with query.passing start/end times; >>> d = otp.Ticks(Y=[1, 2]) >>> data = otp.Ticks(X=[1, 2]) >>> start = datetime(2003, 12, 1, 0, 0, 0, 1000, tzinfo=pytz.timezone("EST5EDT")) >>> end = datetime(2003, 12, 1, 0, 0, 0, 3000, tzinfo=pytz.timezone("EST5EDT")) >>> res = data.join_with_query(d, how='inner', start=start, end=end) >>> otp.run(res) Time Y X 0 2003-12-01 00:00:00.000 1 1 1 2003-12-01 00:00:00.000 2 1 2 2003-12-01 00:00:00.001 1 2 3 2003-12-01 00:00:00.001 2 2 Use keep_time param to keep or rename original timestamp column >>> # OTdirective: snippet-name: Special functions.join with query.keep the timestamps of the joined ticks; >>> d = otp.Ticks(Y=[1, 2]) >>> data = otp.Ticks(X=[1, 2]) >>> res = data.join_with_query(d, how='inner', keep_time="ORIG_TIME") >>> otp.run(res) Time Y ORIG_TIME X 0 2003-12-01 00:00:00.000 1 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 2 2003-12-01 00:00:00.001 1 2 2003-12-01 00:00:00.001 1 2003-12-01 00:00:00.000 2 3 2003-12-01 00:00:00.001 2 2003-12-01 00:00:00.001 2 """ # TODO: check if join_with_query checks schema of joined source against primary source, # by itself or with process_by_group def _columns_to_params(columns, fix_symbol_time=False): params_list = [] for key, value in columns.items(): dtype = ott.get_object_type(value) convert_rule = "'" + key + "=' + " if dtype is str: if are_strings(getattr(value, "dtype", None)): convert_rule += str(value) else: convert_rule += '"' + value + '"' elif dtype is otp.msectime: convert_rule += "tostring(GET_MSECS(" + str(value) + "))" elif dtype is otp.nsectime: if key == '_SYMBOL_TIME' and fix_symbol_time: # hack to support passing _SYMBOL_TIME to called query as a parameter # TODO: create a dedicated interface for _SYMBOL_TIME convert_rule += "tostring(GET_MSECS(" + str(value) + "))" else: # this matches the common way onetick converts nanoseconds to symbol parameters # yes, this is the simplest way to write this expression # TODO: think if we want this conversion to be applied to query parameters also convert_rule += "tostring(GET_MSECS(" + str(value) + \ "))+'.'+SUBSTR(NSECTIME_FORMAT('%J'," + str(value) + ",_TIMEZONE),3,6)" else: convert_rule += "tostring(" + str(value) + ")" params_list.append(convert_rule) return "+','+".join(params_list) if params is None: params = {} # "symbol" parameter can contain a symbol name (string, field, operation etc), # a symbol parameter list (dict, Source, _SymbolParamSource), # or both together as a tuple def _check_and_convert_symbol(symbol): if isinstance(symbol, _Operation): # TODO: PY-35 return True, f"tostring({symbol})" elif isinstance(symbol, str): return True, f"'{symbol}'" elif type(symbol) in {int, float}: # constant return True, f"tostring({symbol})" elif symbol is None: # this is necessary to distinguish None (which is valid value for symbol) from invalid values return True, None else: return False, None def _convert_symbol_param_and_columns(symbol_param): """ We need to create two objects from a symbol param (a dict, a Source or a _SymbolParamSource): 1. Dictionary of columns to generate list of symbol parameters for the JOIN_WITH_QUERY EP 2. _SymbolParamSource object to pass to the source function if necessary """ if isinstance(symbol_param, dict): converted_symbol_param_columns = symbol_param converted_symbol_param = _SymbolParamSource(**{key: ott.get_object_type(column) for key, column in symbol_param.items()}) elif isinstance(symbol_param, _Source): converted_symbol_param_columns = {field_name: symbol_param[field_name] for field_name in symbol_param.columns(skip_meta_fields=True).keys()} converted_symbol_param = symbol_param.to_symbol_param() elif isinstance(symbol_param, _SymbolParamSource): converted_symbol_param_columns = {field_name: symbol_param[field_name] for field_name in symbol_param.schema.keys()} converted_symbol_param = symbol_param else: converted_symbol_param_columns = None converted_symbol_param = None return converted_symbol_param_columns, converted_symbol_param # if "symbol" is tuple, we unpack it if isinstance(symbol, tuple) and len(symbol) == 2: symbol_name, symbol_param = symbol else: # see if "symbol" contains symbol name or symbol params is_symbol, _ = _check_and_convert_symbol(symbol) if is_symbol: symbol_name = symbol symbol_param = {} else: symbol_name = None symbol_param = symbol _, converted_symbol_name = _check_and_convert_symbol(symbol_name) # default symbol name should be this: _SYMBOL_NAME if it is not empty else _NON_EXISTING_SYMBOL_ # this way we will force JWQ to substitute symbol with any symbol parameters we may have passed # otherwise (if an empty symbol name is passed to JWQ), it will not substitute either symbol name # or symbol parameters, and so symbol parameters may get lost # see BDS-263 if converted_symbol_name is None: converted_symbol_name = "CASE(_SYMBOL_NAME,'','_NON_EXISTING_SYMBOL',_SYMBOL_NAME)" converted_symbol_param_columns, converted_symbol_param = _convert_symbol_param_and_columns(symbol_param) if converted_symbol_param is None: # we couldn't interpret "symbols" as either symbol name or symbol parameters raise ValueError('"symbol" parameter has a wrong format! It should be a symbol name, a symbol parameter ' 'object (dict or Source), or a tuple containing both') # prepare temporary file # ------------------------------------ # converted_params = prepare_params(**params) if isinstance(query, Source): sub_source = query else: # inspect function # ------- sig = inspect.signature(query) if "symbol" in sig.parameters: if "symbol" in converted_params.keys(): raise AttributeError('"params" contains key "symbol", which is reserved for symbol parameters. ' 'Please, rename this parameter to another name') converted_params["symbol"] = converted_symbol_param # type: ignore sub_source = query(**converted_params) sub_source = self._process_keep_time_param(keep_time, sub_source) if not sub_source._is_unbound_required(): sub_source += onetick.py.sources.Empty() params_str = _columns_to_params(params, fix_symbol_time=True) symbol_params_str = _columns_to_params(converted_symbol_param_columns) columns = {} columns.update(self._get_columns_with_prefix(sub_source, prefix)) columns.update(self.columns(skip_meta_fields=True)) res = self.copy(columns=columns) res._merge_tmp_otq(sub_source) query_name = sub_source._store_in_tmp_otq( res._tmp_otq, symbols='_NON_EXISTING_SYMBOL_', operation_suffix="join_with_query" ) # TODO: combine with _convert_symbol_to_string # ------------------------------------ # if where is not None and how != 'outer': raise ValueError('The `where` parameter can be used only for outer join') join_params = dict( otq_query=f'"THIS::{query_name}"', join_type=how.upper(), otq_query_params=params_str, symbol_params=symbol_params_str, where=str(where._make_python_way_bool_expression()) if where is not None else '', ) start_time = kwargs.get('start_time', start) end_time = kwargs.get('end_time', end) self._fill_aux_params_for_jwq( join_params, caching, end_time, prefix, start_time, converted_symbol_name, timezone ) res.sink(otq.JoinWithQuery(**join_params)) res.sink( otq.Table( fields=",".join([ott.type2str(dtype) + " " + name for name, dtype in columns.items()]), keep_input_fields=True, ) ) res.sink(otq.Passthrough(fields="TIMESTAMP", drop_fields=True)) return res
@property def state_vars(self) -> StateVars: """ Provides access to state variables Returns ------- State Variables: Dict[str, state variable] State variables, you can access one with its name. See Also -------- | `State Variables \ <../../static/getting_started/variables_and_data_structures.html#variables-and-data-structures>`_ | **DECLARE_STATE_VARIABLES** OneTick event processor """ return self.__dict__['_state_vars'] __invalid_query_name_symbols_regex = re.compile('[^a-zA-Z0-9_]') def __remove_invalid_symbols(self, s): """ Replaces symbols that cannot be put in query names with '_' """ return self.__invalid_query_name_symbols_regex.sub('_', s) def get_name(self, remove_invalid_symbols=False): """ Returns source name. If remove_invalid_symbols == True, returned name only contains symbols that can be put in query names. """ if remove_invalid_symbols and self.__name: return self.__remove_invalid_symbols(self.__name) else: return self.__name def set_name(self, new_name): """ Sets source name. Source name cannot be an empty string but can be None. """ assert isinstance(new_name, str) or new_name is None, "Source name must be a string or None." if new_name is not None: assert new_name != '', "Source name must be a non-empty string." self.__name = new_name def _name_suffix(self, suffix, separator='.', remove_invalid_symbols=False): if remove_invalid_symbols: suffix = self.__remove_invalid_symbols(suffix) separator = self.__remove_invalid_symbols(separator) name = self.get_name(remove_invalid_symbols=True) else: name = self.__name return f'{separator}{name}{separator}{suffix}' if name else f'{separator}{suffix}' @property def schema(self) -> Schema: """ Represents actual python data schema in the column-name -> type format. For example, could be used after the :meth:`Source.sink` to adjust the schema. Returns ------- Schema See Also -------- Source.sink Examples -------- >>> data = otp.Ticks([['X', 'Y', 'Z'], ... [ 1, 0.5, 'abc']]) >>> data['T'] = data['Time'] >>> data.schema {'X': <class 'int'>, 'Y': <class 'float'>, 'Z': <class 'str'>, 'T': <class 'onetick.py.types.nsectime'>} >>> data.schema['X'] <class 'int'> >>> data.schema['X'] = float >>> data.schema['X'] <class 'float'> >>> 'W' in data.schema False >>> data.schema['W'] = otp.nsectime >>> 'W' in data.schema True >>> data.schema['W'] <class 'onetick.py.types.nsectime'> """ schema = self.columns(skip_meta_fields=True) hidden_columns = {'Time': ott.nsectime, 'TIMESTAMP': ott.nsectime} return Schema(_base_source=self, _hidden_columns=hidden_columns, **schema) def set_schema(self, **kwargs): """ Set schema of the _source Note: this method affect python part only and won't make any db queries. It used to set schema after db reading/ complex query. .. deprecated:: please use the :property:`Source.schema` to access and adjust the schema. Parameters ---------- kwargs schema in the column_name=type format Returns ------- None Examples -------- Python can't follow low level change of column, e.g. complex query or pertick script can be sink. >>> data = otp.Ticks(dict(A=[1, 2], B=["a", "b"])) # OTdirective: skip-snippet:; >>> data.sink(otq.AddField(field='Z', value='5')) # doctest: +SKIP >>> data.columns(skip_meta_fields=True) {'A': <class 'int'>, 'B': <class 'str'>} >>> # OTdirective: snippet-name: Arrange.schema.set; >>> data.set_schema(A=int, B=str, Z=int) # OTdirective: skip-snippet:; >>> data.columns(skip_meta_fields=True) {'A': <class 'int'>, 'B': <class 'str'>, 'Z': <class 'int'>} """ self.drop_columns() for name, type in kwargs.items(): self[(name, type)] def _process_keep_time_param(self, keep_time, sub_source): if keep_time == "TIMESTAMP": raise ValueError("TIMESTAMP is reserved OneTick name, please, specify another one.") if keep_time in self.columns(): raise ValueError(f"{keep_time} column is already presented.") sub_source = sub_source.copy() if keep_time: sub_source[keep_time] = sub_source["Time"] return sub_source def _get_columns_with_prefix(self, sub_source, prefix) -> dict: sub_source_columns = sub_source.schema if prefix is None: prefix = "" if not isinstance(prefix, str): raise ValueError("Only string constants are supported for now.") new_columns = {prefix + name: dtype for name, dtype in sub_source_columns.items()} same_names = set(new_columns) & set(self.schema) if same_names: raise ValueError(f"After applying prefix some columns aren't unique: {', '.join(same_names)}.") return new_columns def _fill_aux_params_for_jwq( self, join_params, caching, end_time, prefix, start_time, symbol_name, timezone ): if symbol_name: join_params["symbol_name"] = symbol_name if prefix is not None: join_params["prefix_for_output_ticks"] = str(prefix) if caching: supported = "cross_symbol", "per_symbol" if caching in supported: join_params["caching_scope"] = caching else: raise ValueError(f"Unknown value for caching param, please use None or any of {supported}.") self._fill_time_param_for_jwq(join_params, start_time, end_time, timezone) def _fill_time_param_for_jwq(self, join_params, start_time, end_time, timezone): self._process_start_or_end_of_jwq(join_params, start_time, "start_timestamp") self._process_start_or_end_of_jwq(join_params, end_time, "end_timestamp") if timezone: join_params["timezone"] = f"'{timezone}'" # This needs to be done, but this may break some of the existing code # Need to enable later! # else: # join_params["timezone"] = f"_TIMEZONE" # this may break something, need to test def _process_start_or_end_of_jwq(self, join_params, time, param_name): if time is not None: if isinstance(time, (datetime, otp.dt)): join_params[f"{param_name}"] = time.timestamp() * 1000 elif isinstance(time, _Operation): join_params[f"{param_name}"] = str(time) else: raise ValueError(f"{param_name} should be datetime.datetime instance or OneTick expression")
[docs] @inplace_operation def transpose( self, inplace: bool = False, direction: Literal['rows', 'columns'] = 'rows', n: Optional[int] = None, ) -> Optional['Source']: """ Data transposing. The main idea is joining many ticks into one or splitting one tick to many. Parameters ---------- inplace: bool, default=False if `True` method will modify current object, otherwise it will return modified copy of the object. direction: 'rows', 'columns', default='rows' - `rows`: join certain input ticks (depending on other parameters) with preceding ones. Fields of each tick will be added to the output tick and their names will be suffixed with **_K** where **K** is the positional number of tick (starting from 1) in reverse order. So fields of current tick will be suffixed with **_1**, fields of previous tick will be suffixed with **_2** and so on. - `columns`: the operation is opposite to `rows`. It splits each input tick to several output ticks. Each input tick must have fields with names suffixed with **_K** where **K** is the positional number of tick (starting from 1) in reverse order. n: Optional[int], default=None must be specified only if ``direction`` is 'rows'. Joins every **n**-th ticks with **n-1** preceding ticks. Returns ------- If ``inplace`` parameter is `True` method will return `None`, otherwise it will return modified copy of the object. See also -------- **TRANSPOSE** OneTick event processor Examples -------- Merging two ticks into one. >>> data = otp.Ticks(dict(A=[1, 2], ... B=[3, 4])) >>> data = data.transpose(direction='rows', n=2) # OTdirective: skip-snippet:; >>> data.to_df() Time TIMESTAMP_1 A_1 B_1 TIMESTAMP_2 A_2 B_2 0 2003-12-01 00:00:00.001 2003-12-01 00:00:00.001 2 4 2003-12-01 1 3 And splitting them back into two. >>> data = data.transpose(direction='columns') # OTdirective: skip-snippet:; >>> data.to_df() Time A B 0 2003-12-01 00:00:00.000 1 3 1 2003-12-01 00:00:00.001 2 4 """ direction_map = {'rows': 'ROWS_TO_COLUMNS', 'columns': 'COLUMNS_TO_ROWS'} n = '' if n is None else n self.sink(otq.Transpose(direction=direction_map[direction], key_constraint_values=n)) # TODO: we should change source's schema after transposing return self
[docs] @inplace_operation def process_by_group(self, process_source_func, group_by=None, source_name=None, inplace=False) -> Union['Source', Tuple['Source', ...], None]: """ Groups data by ``group_by`` and run ``process_source_func`` for each group and merge outputs for every group. Note ``process_source_func`` will be converted to Onetick object and passed to query, that means that python callable will be called only once. Parameters ---------- process_source_func: callable ``process_source_func`` should take :class:`Source` apply necessary logic and return it or tuple of :class:`Source` in this case all of them should have a common root that is the input :class:`Source`. group_by: list A list of field names to group input ticks by. If group_by is None then no group_by fields are defined and logic of ``process_source_func`` is applied to all input ticks at once source_name: str A name for the source that represents all of group_by sources. Can be passed here or as a name of the inner sources; if passed by both ways, should be consistent inplace: bool If True - nothing will be returned and changes will be applied to current query otherwise chanegs query will be returned Returns ------- :class:`Source`, Tuple[:class:`Source`] or None: See also -------- **GROUP_BY** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Arrange.group.single output; >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 3, 4]) >>> >>> def func(source): ... return source.first() >>> >>> res = d.process_by_group(func, group_by=['X']) >>> otp.run(res)[["X", "Y"]] X Y 0 1 1 1 2 3 >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 1, 3]) >>> >>> def func(source): ... source['Z'] = source['X'] ... source2 = source.copy() ... source = source.first() ... source2 = source2.last() ... return source, source2 >>> # OTdirective: snippet-name: Arrange.group.multiple output; >>> res1, res2 = d.process_by_group(func, group_by=['Y']) >>> df1 = otp.run(res1) >>> df2 = otp.run(res2) >>> df1[['X', 'Y', 'Z']] X Y Z 0 1 1 1 1 1 2 1 2 2 3 2 >>> df2[['X', 'Y', 'Z']] # OTdirective: skip-snippet:; X Y Z 0 1 2 1 1 2 1 2 2 2 3 2 """ if group_by is None: group_by = [] if inplace: main_source = self else: main_source = self.copy() input_schema = main_source.columns(skip_meta_fields=True) for field in group_by: if field not in input_schema: raise ValueError(f"Group by field name {field} not present in input source schema") process_source_root = onetick.py.sources.Custom(tick_type="ANY", schema_policy="manual", **input_schema) if source_name: process_source_root.set_name(source_name) process_sources = process_source_func(process_source_root) if isinstance(process_sources, Source): # returned one source process_sources = [process_sources] elif len(process_sources) == 1: # returned one source as an iterable pass else: # returned multiple sources if inplace: raise ValueError("Cannot use inplace=True with multi-source processing function!") num_source = 0 for process_source in process_sources: output_schema = process_source.columns(skip_meta_fields=True) if process_source.get_name(): if not process_source_root.get_name(): process_source_root.set_name(process_source.get_name()) if process_source_root.get_name() != process_source.get_name(): warnings.warn("Different strings passed as names for the root source used in " f"process_by_group: '{process_source.get_name()}' " f"and '{process_source_root.get_name()}'") # removing key fields from output schema since they will be # added by the GROUP_BY EP process_source.drop([field for field in group_by if field in output_schema], inplace=True) process_source.sink(otq.Passthrough().node_name(f"OUT_{num_source}")) process_source_root.node().add_rules(process_source.node().copy_rules()) main_source._merge_tmp_otq(process_source) num_source += 1 query_name = process_source_root._store_in_tmp_otq( main_source._tmp_otq, operation_suffix="group_by", add_passthrough=False ) process_path = f'THIS::{query_name}' num_outputs = len(process_sources) # we shouldn't set named outputs if GROUP_BY EP has only one output due to onetick behaviour if num_outputs == 1: outputs = "" else: outputs = ",".join([f"OUT_{i}" for i in range(0, num_outputs)]) main_source.sink(otq.GroupBy(key_fields=",".join(group_by), query_name=process_path, outputs=outputs)) output_sources = [] for num_output in range(0, num_outputs): if num_outputs == 1 and inplace: output_source = main_source else: output_source = main_source.copy() if num_outputs > 1: output_source.node().out_pin(f"OUT_{num_output}") # setting schema after processing output_schema = process_sources[num_output].columns(skip_meta_fields=True) for field in group_by: output_schema[field] = input_schema[field] for field, field_type in output_schema.items(): output_source.schema[field] = field_type output_source = output_source[[field for field in output_schema]] output_source._merge_tmp_otq(main_source) output_sources.append(output_source) if num_outputs == 1: return output_sources[0] else: return tuple(output_sources)
def unite_columns(self, sep="", *, apply_str=False): """ Join values of all columns into one string The method unite all fields to one string, just like python ``join`` method. All fields should be strings, otherwise the error will be generated. To change this behavior, ``apply_str=True`` argument should be specified, in this case all fields will be converted to string type before joining. Parameters ---------- sep: str Separator between values, empty string be dafault. apply_str: bool If set every column will be converted to string during operation. False be default. Returns ------- result: column Column with str type Examples -------- >>> # OTdirective: snippet-name: Arrange.join columns as strings; >>> data = otp.Ticks(X=[1, 2, 3], A=["A", "A", "A"], B=["A", "B", "C"]) >>> data["S_ALL"] = data.unite_columns(sep=",", apply_str=True) >>> data["S"] = data[["A", "B"]].unite_columns() >>> otp.run(data)[["S", "S_ALL"]] S S_ALL 0 AA 1,A,A 1 AB 2,A,B 2 AC 3,A,C """ if apply_str: cols = (self[col].apply(str) for col in self.schema) else: not_str = [name for name, t in self.schema.items() if not are_strings(t)] if not_str: raise ValueError(f"All joining columns should be strings, while {', '.join(not_str)} " f"are not. Specify `apply_str=True` for automatic type conversion") else: cols = (self[col] for col in self.schema) return functools.reduce(lambda x, y: x + sep + y, cols) # Aggregations copy # we need this functions to store and collect documentation # copy_method decorator will # set docstring (will compare docstring of donor function and method docstring) # apply same signature from donor function + self # for mimic=True will apply agg function as is
[docs] @copy_method(high_tick) def high(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.high(['X'], 2) # OTdirective: snippet-name: Aggregations.high tick; >>> data() Time X 0 2003-12-01 00:00:01.500 3 1 2003-12-01 00:00:03.000 4 """ pass
[docs] @copy_method(low_tick) def low(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.low(['X'],2) # OTdirective: snippet-name: Aggregations.low tick; >>> data() Time X 0 2003-12-01 00:00:00 1 1 2003-12-01 00:00:01 2 """ pass
[docs] @copy_method(first_tick) def first(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.first() # OTdirective: snippet-name: Aggregations.first; >>> data() Time X 0 2003-12-01 1 """ pass
[docs] @copy_method(last_tick) def last(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.last() # OTdirective: snippet-name: Aggregations.last; >>> data() Time X 0 2003-12-01 00:00:03 4 """ pass
[docs] @copy_method(distinct, mimic=False) def distinct(self, *args, **kwargs): """ Examples -------- >>> data = otp.Ticks(dict(x=[1, 3, 1, 5, 3])) >>> data = data.distinct('x') # OTdirective: snippet-name: Aggregations.distinct; >>> data.to_df() Time x 0 2003-12-04 1 1 2003-12-04 3 2 2003-12-04 5 """ if 'bucket_interval_units' in kwargs: kwargs['bucket_units'] = kwargs.pop('bucket_interval_units') agg = distinct(*args, **kwargs) return agg.apply(self)
[docs] @copy_method(high_time, mimic=False) # mimic=False for backward compatibility def high_time(self, *args, **kwargs): """ Returns timestamp of tick with the highest value of input field .. deprecated:: 1.14.5 Use :py:func:`.high_time` instead See Also -------- :py:func:`.high_time` """ warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. " f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead", DeprecationWarning, stacklevel=2) agg = high_time(*args, **kwargs) return agg.apply(self, 'VALUE')
[docs] @copy_method(low_time, mimic=False) # mimic=False for backward compatibility def low_time(self, *args, **kwargs): """ Returns timestamp of tick with the lowest value of input field .. deprecated:: 1.14.5 Use :py:func:`.low_time` instead See Also -------- :py:func:`.low_time` """ warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. " f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead", DeprecationWarning, stacklevel=2) agg = low_time(*args, **kwargs) return agg.apply(self, 'VALUE')
[docs] @copy_method(ob_snapshot) def ob_snapshot(self): """ Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP >>> data = data.ob_snapshot(max_levels=1) # doctest: +SKIP >>> data.to_df() # doctest: +SKIP Time PRICE UPDATE_TIME SIZE LEVEL BUY_SELL_FLAG 0 2003-12-04 2.0 2003-12-01 00:00:00.003 6 1 1 1 2003-12-04 5.0 2003-12-01 00:00:00.004 7 1 0 """ pass
[docs] @copy_method(ob_snapshot_wide) def ob_snapshot_wide(self): """ Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP >>> data = data.ob_snapshot_wide(max_levels=1) # doctest: +SKIP >>> data.to_df() # doctest: +SKIP Time BID_PRICE BID_UPDATE_TIME BID_SIZE ASK_PRICE ASK_UPDATE_TIME ASK_SIZE LEVEL 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 1 """ pass
[docs] @copy_method(ob_snapshot_flat) def ob_snapshot_flat(self): """ Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP >>> data = data.ob_snapshot_flat(max_levels=1) # doctest: +SKIP >>> data.to_df() # doctest: +SKIP Time BID_PRICE1 BID_UPDATE_TIME1 BID_SIZE1 ASK_PRICE1 ASK_UPDATE_TIME1 ASK_SIZE1 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 """ pass
[docs] @inplace_operation def add_prefix(self, prefix, inplace=False) -> Optional['Source']: """ Addes prefix to all column names. Parameters ---------- prefix : str String prefix to add to all columns. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1') >>> data = data.add_prefix('test_') >>> data.to_df() Time test_X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data.schema {'test_X': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.add_prefix('test_') >>> data.to_df(start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time test_PRICE test_SIZE 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> data.schema {'test_PRICE': <class 'float'>} >>> data = otp.Tick(X=1, XX=2) >>> data.add_prefix('X') Traceback (most recent call last): ... AttributeError: Column XX already exists, please, use another prefix """ if ' ' in prefix: raise AttributeError(f'There is space in prefix: {prefix}') schema = self.schema for column_name in schema: new_column_name = f'{prefix}{column_name}' if new_column_name in self.__dict__: raise AttributeError(f'Column {new_column_name} already exists, please, use another prefix') for column_name in schema: new_column_name = f'{prefix}{column_name}' self.__dict__[column_name].rename(new_column_name, update_parent_object=False) self.__dict__[new_column_name] = self.__dict__[column_name] del self.__dict__[column_name] self.sink(otq.RenameFieldsEp(rename_fields=f'(.*)={prefix}\\1', use_regex=True)) return self
[docs] @inplace_operation def add_suffix(self, suffix, inplace=False) -> Optional['Source']: """ Addes suffix to all column names. Parameters ---------- suffix : str String suffix to add to all columns. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1') >>> data = data.add_suffix('_test') >>> data.to_df() Time X_test 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data.schema {'X_test': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.add_suffix('_test') >>> data.to_df(start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE_test SIZE_test 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> data.schema {'PRICE_test': <class 'float'>} >>> data = otp.Tick(X=1, XX=2) >>> data.add_suffix('X') Traceback (most recent call last): ... AttributeError: Column XX already exists, please, use another suffix """ if ' ' in suffix: raise AttributeError(f'There is space in suffix: {suffix}') schema = self.schema for column_name in schema: new_column_name = f'{column_name}{suffix}' if new_column_name in self.__dict__: raise AttributeError(f'Column {new_column_name} already exists, please, use another suffix') for column_name in schema: new_column_name = f'{column_name}{suffix}' self.__dict__[column_name].rename(new_column_name, update_parent_object=False) self.__dict__[new_column_name] = self.__dict__[column_name] del self.__dict__[column_name] self.sink(otq.RenameFieldsEp(rename_fields=f'(.*)=\\1{suffix}', use_regex=True)) return self
[docs] @inplace_operation def time_filter(self, discard_on_match: bool = False, start_time: Union[str, int, time] = 0, end_time: Union[str, int, time] = 0, day_patterns: str = "", timezone=utils.default, # type: ignore end_time_tick_matches: bool = False, inplace=False) -> Optional['Source']: """ Filters ticks by time. Parameters ---------- discard_on_match : bool, optional If ``True``, then ticks that match the filter will be discarded. Otherwise, only ticks that match the filter will be passed. start_time : str or int or datetime.time, optional Start time of the filter, string must be in the format ``HHMMSSmmm``. Default value is 0. end_time : str or int or datetime.time, optional End time of the filter, string must be in the format ``HHMMSSmmm``. To filter ticks for an entire day, this parameter should be set to 240000000. Default value is 0. day_patterns : list or str Pattern or list of patterns that determines days for which the ticks can be propagated. A tick can be propagated if its date matches one or more of the patterns. Three supported pattern formats are: 1. ``month.week.weekdays``, 0 month means any month, 0 week means any week, 6 week means the last week of the month for a given weekday(s), weekdays are digits for each day, 0 being Sunday. 2. ``month/day``, 0 month means any month. 3. ``year/month/day``, 0 year means any year, 0 month means any month. timezone : str, optional Timezone of the filter. Default value is ``configuration.config.tz`` or timezone set in the parameter of :py:func:`onetick.py.run`. end_time_tick_matches : bool, optional If ``True``, then the end time is inclusive. Otherwise, the end time is exclusive. inplace : bool, optional The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Default value is ``False``. Returns ------- :class:`Source` or ``None`` Returns ``None`` if ``inplace=True``. See also -------- **TIME_FILTER** OneTick event processor Examples -------- >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.time_filter(start_time='000000001', end_time='000000003') >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE SIZE 0 2022-03-01 00:00:00.001 1.4 10 1 2022-03-01 00:00:00.002 1.4 50 """ if timezone is utils.default: # doesn't work without expr for some reason timezone = 'expr(_TIMEZONE)' if day_patterns: if isinstance(day_patterns, str): day_patterns = [day_patterns] for day_pattern in day_patterns: if not re.match(r"(^\d\d?\.[0-6].\d\d?$)|(^\d\d?\/\d\d?$)|(^\d{1,4}\/\d\d?\/\d\d?$)", day_pattern): raise ValueError(f"Invalid day pattern: {day_pattern}") if isinstance(start_time, time): start_time = start_time.strftime('%H%M%S%f')[:-3] if isinstance(end_time, time): end_time = end_time.strftime('%H%M%S%f')[:-3] day_patterns = ",".join(day_patterns) self.sink(otq.TimeFilter(discard_on_match=discard_on_match, start_time=start_time, end_time=end_time, timezone=timezone, day_patterns=day_patterns, end_time_tick_matches=end_time_tick_matches, )) return self
[docs] @copy_method(ranking) def ranking(self, *args, **kwargs): pass
[docs] @inplace_operation def insert_tick(self, fields=None, where=None, preserve_input_ticks=True, num_ticks_to_insert=1, insert_before=True, inplace=False) -> Optional['Source']: """ Insert tick. Parameters ---------- fields: dict of str to :py:class:`onetick.py.Operation` Mapping of field names to some expressions or values. These fields in inserted ticks will be set to corresponding values or results of expressions. If field is presented in input tick, but not set in ``fields`` dict, then the value of the field will be copied from input tick to inserted tick. If parameter ``fields`` is not set at all, then values for inserted ticks' fields will be default values for fields' types from input ticks (0 for integers etc.). where: :py:class:`onetick.py.Operation` Expression to select ticks near which the new ticks will be inserted. By default, all ticks are selected. preserve_input_ticks: bool A switch controlling whether input ticks have to be preserved in output time series or not. While the former case results in fields of input ticks to be present in the output time series together with those defined by the ``fields`` parameter, the latter case results in only defined fields to be present. If a field of the input time series is defined in the ``fields`` parameter, the defined value takes precedence. num_ticks_to_insert: int Number of ticks to insert. insert_before: bool Insert tick before each input tick or after. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- **INSERT_TICK** OneTick event processor Returns ------- :class:`Source` or ``None`` Examples -------- Insert tick before each tick with default type values. >>> data = otp.Tick(A=1) >>> data = data.insert_tick() >>> data.to_df() Time A 0 2003-12-01 0 1 2003-12-01 1 Insert tick before each tick with field `A` copied from input tick and field `B` set to specified value. >>> data = otp.Tick(A=1) >>> data = data.insert_tick(fields={'B': 'b'}) >>> data.to_df() Time A B 0 2003-12-01 1 b 1 2003-12-01 1 Insert two ticks only after first tick. >>> data = otp.Ticks(A=[1, 2, 3]) >>> data = data.insert_tick(where=data['A'] == 1, ... insert_before=False, ... num_ticks_to_insert=2) >>> data.to_df() Time A 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 0 2 2003-12-01 00:00:00.000 0 3 2003-12-01 00:00:00.001 2 4 2003-12-01 00:00:00.002 3 """ if not isinstance(num_ticks_to_insert, int) or num_ticks_to_insert <= 0: raise ValueError("Parameter 'num_ticks_to_insert' must be a positive integer") if not preserve_input_ticks and not fields: raise ValueError("Parameter 'fields' must be set if 'preserve_input_ticks' is False") where = '' if where is None else str(where) def _default_by_type(dtype): if issubclass(dtype, int): return 0 if issubclass(dtype, float): return 0.0 if issubclass(dtype, str): return '' if issubclass(dtype, ott.nsectime) or issubclass(dtype, ott.msectime): return 0 fields = fields or {} update_schema = {} for field, value in fields.items(): dtype = ott.get_object_type(value) if field not in self.schema: update_schema[field] = dtype elif dtype is not self.schema[field]: raise ValueError(f"Incompatible types for field '{field}': {self.schema[field]} --> {dtype}") dtype = ott.type2str(dtype) if isinstance(value, Type): value = _default_by_type(value) value = ott.value2str(value) fields[field] = (dtype, value) fields = ','.join( f'{field} {dtype}={value}' if value else f'{field} {dtype}' for field, (dtype, value) in fields.items() ) self.sink( otq.InsertTick( fields=fields, where=where, preserve_input_ticks=preserve_input_ticks, num_ticks_to_insert=num_ticks_to_insert, insert_before=insert_before, ) ) if preserve_input_ticks: self.schema.update(**update_schema) else: self.schema.set(**update_schema) return self
[docs] @inplace_operation def modify_query_times(self, start=None, end=None, output_timestamp=None, propagate_heartbeats=True, inplace=False): """ Modify ``start`` and ``end`` time of the query. * query times are changed for all operations only **before** this method up to the source of the graph. * all ticks' timestamps produced by this method **must** fall into original time range of the query. It is possible to change ticks' timestamps with parameter ``output_timestamp``, so they will stay inside the original time range. Parameters ---------- start: :py:class:`onetick.py.datetime` or \ :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation` Expression to replace query start time. By default, start time is not changed. Note that expression in this parameter can't depend on ticks, thus only :py:class:`~onetick.py.core.source.MetaFields` and constants can be used. end: :py:class:`onetick.py.datetime` or \ :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation` Expression to replace query end time. By default, end time is not changed. Note that expression in this parameter can't depend on ticks, thus only :py:class:`~onetick.py.core.source.MetaFields` and constants can be used. output_timestamp: :py:class:`onetick.py.Operation` Expression that produces timestamp for each tick. By default, the following expression is used: ``orig_start + orig_timestamp - start`` This expression covers cases when start time of the query is changed and keeps timestamp inside original time range. Note that it doesn't cover cases, for example, if end time was increased, you have to handle such cases yourself. propagate_heartbeats: bool Controls heartbeat propagation. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- | **MODIFY_QUERY_TIMES** OneTick event processor | :py:meth:`onetick.py.Source.time_interval_shift` Returns ------- :class:`Source` or ``None`` Examples -------- >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') By default, method does nothing: >>> t = data.modify_query_times() >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 See how ``_START_TIME`` and ``_END_TIME`` meta fields are changed. They are changed *before* ``modify_query_times``: >>> t = data.copy() >>> t['S_BEFORE'] = t['_START_TIME'] >>> t['E_BEFORE'] = t['_END_TIME'] >>> t = t.modify_query_times(start=t['_START_TIME'] + otp.Milli(1), ... end=t['_END_TIME'] - otp.Milli(1)) >>> t['S_AFTER'] = t['_START_TIME'] >>> t['E_AFTER'] = t['_END_TIME'] >>> otp.run(t, start=start, end=end) Time PRICE SIZE S_BEFORE E_BEFORE S_AFTER E_AFTER 0 2022-03-02 1.1 101 2022-03-02 00:00:00.001 2022-03-02 00:00:00.002 2022-03-02 2022-03-02 00:00:00.003 You can decrease time interval without problems: >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1), ... end=data['_END_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 1.1 101 Note that the timestamp of the tick was changed with default expression. In this case we can output original timestamps, because they fall into original time range: >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1), ... end=data['_END_TIME'] - otp.Milli(1), ... output_timestamp=data['TIMESTAMP']) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.1 101 But it will not work if new time range is wider than original: >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1), ... output_timestamp=data['TIMESTAMP']) >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...timestamp is falling out of initial start/end time range... In this case default ``output_timestamp`` expression would work just fine: >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.0 100 1 2022-03-02 00:00:00.002 1.1 101 2 2022-03-02 00:00:00.003 1.2 102 But it doesn't work, for example, if end time has crossed the borders of original time range. In this case other ``output_timestamp`` expression must be specified: >>> t = data.modify_query_times( ... start=data['_START_TIME'] - otp.Milli(2), ... output_timestamp=otp.math.min(data['TIMESTAMP'] + otp.Milli(2), data['_END_TIME']) ... ) >>> otp.run(t, start=start + otp.Milli(2), end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.0 100 1 2022-03-02 00:00:00.003 1.1 101 2 2022-03-02 00:00:00.003 1.2 102 Remember that ``start`` and ``end`` parameters can't depend on ticks: >>> t = data.copy() >>> t['X'] = 12345 >>> t = t.modify_query_times(start=t['_START_TIME'] + t['X'] - t['X'], ... end=t['_END_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...parameter must not depend on ticks... Constant datetime values can be used as parameters too: >>> t = data.modify_query_times(start=start + otp.Milli(1), ... end=end - otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 1.1 101 Note that some graph patterns are not allowed when using this method. For example, modifying query times for a branch that will be merged later: >>> t1, t2 = data[data['PRICE'] > 1.3] >>> t2 = t2.modify_query_times(start=start + otp.Milli(1)) >>> t = otp.merge([t1, t2]) >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...Invalid graph...time bound to a node...an intermediate node in one of the cycles in graph... """ start = ott.value2str(start) if start is not None else '' end = ott.value2str(end) if end is not None else '' output_timestamp = ott.value2str(output_timestamp) if output_timestamp is not None else '' self.sink( otq.ModifyQueryTimes( start_time=start, end_time=end, output_timestamp=output_timestamp, propagate_heartbeats=propagate_heartbeats, ) ) return self
[docs] def time_interval_shift(self, shift, inplace=False): """ Shifting time interval for a source. The whole data flow is shifted all the way up to the source of the graph. The ticks' timestamps are changed accordingly so they fit into original time range. Parameters ---------- shift: int or :ref:`datetime offset<offsets>` Offset to shift the whole time interval. Can be positive or negative. Positive value moves time interval into the future, negative -- to the past. int values are interpreted as milliseconds. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- | :py:meth:`onetick.py.Source.modify_query_times` | :py:meth:`onetick.py.Source.time_interval_change` Examples -------- --> Also see use-case using :py:meth:`time_interval_shift` for calculating `Markouts <../../static/getting_started/time_based_joins.html#use-case-computing-markouts>`_ >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') Default data: >>> otp.run(data, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Get window for a third tick: >>> otp.run(data, start=start + otp.Milli(2), end=start + otp.Milli(3)) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.2 102 Shifting time window will result in different set of ticks, but the ticks will have their timestamps changed to fit into original time range. Let's shift time 2 milliseconds back and thus get the first tick: >>> t = data.time_interval_shift(shift=-otp.Milli(2)) >>> otp.run(t, start=start + otp.Milli(2), end=start + otp.Milli(3)) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.0 100 Here we are querying empty time interval, but shifting one second back to get ticks. >>> t = data.time_interval_shift(shift=-otp.Second(1)) >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1)) Time PRICE SIZE 0 2022-03-02 00:00:01.000 1.0 100 1 2022-03-02 00:00:01.001 1.1 101 2 2022-03-02 00:00:01.002 1.2 102 """ start = self['_START_TIME'] + shift end = self['_END_TIME'] + shift # change timestamps so they fit into original time range output_timestamp = self['TIMESTAMP'] - shift return self.modify_query_times(start=start, end=end, output_timestamp=output_timestamp, inplace=inplace)
[docs] def time_interval_change(self, start_change=0, end_change=0, inplace=False): """ Changing time interval by making it bigger or smaller. All timestamps of ticks that are crossing the border of original time range will be set to original start time or end time depending on their original time. Parameters ---------- start_change: int or :ref:`datetime offset<offsets>` Offset to shift start time. Can be positive or negative. Positive value moves start time into the future, negative -- to the past. int values are interpreted as milliseconds. end_change: int or :ref:`datetime offset<offsets>` Offset to shift end time. Can be positive or negative. Positive value moves end time into the future, negative -- to the past. int values are interpreted as milliseconds. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- | :py:meth:`onetick.py.Source.modify_query_times` | :py:meth:`onetick.py.Source.time_interval_shift` Examples -------- >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') By default, ``time_interval_change()`` does nothing: >>> t = data.time_interval_change() >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Decreasing time range will not change ticks' timestamps: >>> t = data.time_interval_change(start_change=otp.Milli(1), end_change=-otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.1 101 Increasing time range will change timestamps of the ticks that crossed the border. In this case first tick's timestamp will be set to original start time, and third tick's to original end time. >>> t = data.time_interval_change(start_change=-otp.Milli(1), end_change=otp.Milli(1)) >>> otp.run(t, start=start + otp.Milli(1), end=start + otp.Milli(2)) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Here we are querying empty time interval, but changing start time one second back to get ticks. >>> t = data.time_interval_change(start_change=-otp.Second(1)) >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1)) Time PRICE SIZE 0 2022-03-02 00:00:01 1.0 100 1 2022-03-02 00:00:01 1.1 101 2 2022-03-02 00:00:01 1.2 102 """ start = self['_START_TIME'] + start_change end = self['_END_TIME'] + end_change # change ticks' timestamps only if they are out of bounds output_timestamp = self['TIMESTAMP'] output_timestamp = otp.math.min(output_timestamp, self['_END_TIME']) output_timestamp = otp.math.max(output_timestamp, self['_START_TIME']) return self.modify_query_times(start=start, end=end, output_timestamp=output_timestamp, inplace=inplace)
_Source = Source # Backward compatibility