Source code for onetick.py.core.source

import copy
import functools
import inspect
import os
import re
import uuid
import warnings
from collections import defaultdict
from datetime import datetime, date, time
import pandas as pd
import numpy as np
from typing import Callable, Optional, List, Any, Tuple, Union, Type, Dict, Collection, Set
from onetick.py.backports import Literal

import onetick.query as otq
from onetick.lib.instance import OneTickLib

import onetick.py.functions
import onetick.py.sources
from onetick import py as otp
from onetick.py import aggregations
from onetick.py import types as ott
from onetick.py import utils, configuration
from onetick.py.core._internal._manually_bound_value import _ManuallyBoundValue
from onetick.py.core._internal._multi_symbols_source import _MultiSymbolsSource
from onetick.py.core._internal._proxy_node import _ProxyNode
from onetick.py.core._internal._state_objects import _StateColumn, _TickSequence
from onetick.py.core._internal._state_vars import StateVars
from onetick.py.core._source._symbol_param import _SymbolParamSource
from onetick.py.core._source.schema import Schema
from onetick.py.core._source.symbol import Symbol
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.core.column import _Column, _LagOperator, _ColumnAggregation
from onetick.py.core.column_operations._methods.methods import is_arithmetical, is_compare
from onetick.py.core.column_operations._methods.op_types import are_strings, _replace_parameters, are_numerics, are_time
from onetick.py.core.column_operations.base import _Operation
from onetick.py.core.lambda_object import _LambdaIfElse, apply_lambda, apply_script, _EmulateObject
from onetick.py.core.query_inspector import get_query_info, add_pins, get_query_parameter_list
from onetick.py.core.eval_query import _QueryEvalWrapper, prepare_params
from onetick.py.core.cut_builder import _BaseCutBuilder
from onetick.py.core import db_constants
from onetick.py.utils import adaptive, adaptive_to_default
from onetick.py.aggregations._docs import (copy_method,
                                           _running_doc, _all_fields_with_policy_doc, _bucket_interval_doc,
                                           _bucket_time_doc, _bucket_units_doc, _bucket_end_condition_doc,
                                           _end_condition_per_group_doc, _boundary_tick_bucket_doc, _group_by_doc)
from onetick.py.docs.utils import docstring, param_doc
from onetick.py.aggregations.functions import (high_tick, low_tick,
                                               high_time, low_time,
                                               first_tick, last_tick,
                                               distinct, ranking,
                                               ob_snapshot, ob_snapshot_wide, ob_snapshot_flat)


def inplace_operation(method):
    """ Decorator that adds the `inplace` parameter and logic according to this
    flag. inplace=True means that method modifies an object, otherwise it copies
    the object firstly, modifies copy and returns the copy.
    """

    @functools.wraps(method)
    def _inner(self, *args, inplace=False, **kwargs):
        kwargs['inplace'] = inplace
        if inplace:
            method(self, *args, **kwargs)
        else:
            obj = self.copy()
            return method(obj, *args, **kwargs)

    return _inner


def _is_dict_required(symbols):
    """
    Depending on symbols, determine if output of otp.run() or Source.__call__() should always be a dictionary
    of {symbol: dataframe} even if only one symbol is present in the results
    """
    if isinstance(symbols, (list, tuple)):
        if len(symbols) == 0:
            return False
        elif len(symbols) > 1:
            return True
        else:
            symbols = symbols[0]

    if isinstance(symbols, otp.Source):
        return True
    if isinstance(symbols, otq.Symbol):
        symbols = symbols.name
    if isinstance(symbols, str) and 'eval' in symbols:
        return True
    return False


_agg_doc = param_doc(name='aggs',
                     annotation=Dict,
                     str_annotation='dict of aggregations',
                     desc="""
                      aggregation dict: key - output column name; value - aggregation""")


[docs]class MetaFields: """ http://solutions.pages.soltest.onetick.com/iac/onetick-server/ep_guide/EP/Pseudo-fields.htm OneTick defines several pseudo-columns that can be treated as if they were columns of every tick. These columns can be accessed directly via :py:meth:`onetick.py.Source.__getitem__` method. But in case they are used in :py:class:`~onetick.py.core.column_operations.base.Expr` they can be accessed via ``onetick.py.Source.meta_fields``. Examples -------- Accessing pseudo-fields as columns or as class properties >>> data = otp.Tick(A=1) >>> data['X'] = data['_START_TIME'] >>> data['Y'] = otp.Source.meta_fields['_TIMEZONE'] >>> otp.run(data, start=otp.dt(2003, 12, 2), timezone='GMT') Time A X Y 0 2003-12-02 1 2003-12-02 GMT """ def __init__(self): self.timestamp = _Column('TIMESTAMP', dtype=ott.nsectime) self.time = self.timestamp self.start_time = _Column('_START_TIME', dtype=ott.nsectime) self.start = self.start_time self.end_time = _Column('_END_TIME', dtype=ott.nsectime) self.end = self.end_time self.timezone = _Column('_TIMEZONE', dtype=str) self.db_name = _Column('_DBNAME', dtype=str) self.symbol_name = _Column('_SYMBOL_NAME', dtype=str) self.tick_type = _Column('_TICK_TYPE', dtype=str) self.__fields = set(map(str, self.__dict__.values())) | {'Time'} def __iter__(self): yield from self.__fields def __contains__(self, item): return item in self.__fields def __len__(self): return len(self.__fields)
[docs] def __getitem__(self, item): """ These fields are available: * ``TIMESTAMP`` (or ``Time``) * ``START_TIME`` (or ``_START_TIME``) * ``END_TIME`` (or ``_END_TIME``) * ``TIMEZONE`` (or ``_TIMEZONE``) * ``DBNAME`` (or ``_DBNAME``) * ``SYMBOL_NAME`` (or ``_SYMBOL_NAME``) * ``TICK_TYPE`` (or ``_TICK_TYPE``) """ return { 'TIMESTAMP': self.timestamp, 'Time': self.time, 'START_TIME': self.start_time, '_START_TIME': self.start_time, 'END_TIME': self.end_time, '_END_TIME': self.end_time, 'TIMEZONE': self.timezone, '_TIMEZONE': self.timezone, 'DB_NAME': self.db_name, 'DBNAME': self.db_name, '_DBNAME': self.db_name, 'SYMBOL_NAME': self.symbol_name, '_SYMBOL_NAME': self.symbol_name, 'TICK_TYPE': self.tick_type, '_TICK_TYPE': self.tick_type, }[item]
[docs]class Source: """ Base class for representing Onetick execution graph. All :ref:`onetick-py sources <sources>` are derived from this class and have access to all its methods. Examples -------- >>> data = otp.Tick(A=1) >>> isinstance(data, otp.Source) True Also this class can be used to initialize raw source with the help of ``onetick.query`` classes, but it should be done with caution as the user is required to set such properties as symbol name and tick type manually. >>> data = otp.Source(otq.TickGenerator(bucket_interval=0, fields='long A = 123').tick_type('TT')) >>> otp.run(data, symbols='LOCAL::') Time A 0 2003-12-04 123 """ # TODO: need to support transactions for every _source # transaction is set of calls between _source creation and call or between two calls # if transaction have the same operations, then it seems we should add only one set of operations _PROPERTIES = [ "__node", "__hash", "__use_name_for_column_prefix", "__sources_keys_dates", "__sources_modify_query_times", "__sources_base_ep_func", "__sources_symbols", "__source_has_output", "__name", "_tmp_otq" ] _OT_META_FIELDS = ["_START_TIME", "_END_TIME", "_SYMBOL_NAME", "_DBNAME", "_TICK_TYPE", '_TIMEZONE'] meta_fields = MetaFields() Symbol = Symbol def __init__( self, node=None, _symbols=None, _start=adaptive, _end=adaptive, _base_ep_func=None, _has_output=True, **kwargs ): self._tmp_otq = TmpOtq() self.__name = None if "Time" in kwargs: # TODO: add tests raise ValueError( "It is not allowed to have 'Time' or 'TIMESTAMP' columns, because they are the key columns" ) if "TIMESTAMP" not in kwargs: kwargs["TIMESTAMP"] = ott.nsectime kwargs["Time"] = ott.nsectime kwargs["_START_TIME"] = ott.nsectime kwargs["_END_TIME"] = ott.nsectime kwargs["_SYMBOL_NAME"] = str kwargs["_DBNAME"] = str kwargs["_TICK_TYPE"] = str kwargs["_TIMEZONE"] = str for key, value in kwargs.items(): # calculate value type value_type = ott.get_source_base_type(value) self.__dict__[key] = _Column(name=key, dtype=value_type, obj_ref=self) # just an alias to Timestamp self.__dict__['Time'] = self.__dict__['TIMESTAMP'] self.__dict__['_state_vars'] = StateVars(self) if node is None: node = otq.Passthrough() self.__hash = uuid.uuid4() self.__sources_keys_dates = {} self.__sources_modify_query_times = {} self.__sources_base_ep_func = {} self.__sources_symbols = {} self.__source_has_output = _has_output if isinstance(node, _ProxyNode): self.__node = _ProxyNode(*node.copy_graph(), refresh_func=self.__refresh_hash) else: self.__node = _ProxyNode(*self.__from_ep_to_proxy(node), refresh_func=self.__refresh_hash) self.__sources_keys_dates[self.__node.key()] = (_start, _end) self.__sources_modify_query_times[self.__node.key()] = False self.__sources_base_ep_func[self.__node.key()] = _base_ep_func self.__sources_symbols[self.__node.key()] = _symbols # flag controls whether we have to add node_name prefix when convert # columns into string self.__use_name_for_column_prefix = False def _try_default_constructor(self, *args, node=None, **kwargs): if node is not None: # Source.copy() method will use this way # all info from original source will be copied by copy() method after Source.__init__(self, *args, node=node, **kwargs) return True return False def base_ep(self, **kwargs): # default implementation # real implementation should return a Source object return None def _clean_sources_dates(self): self.__sources_keys_dates = {} self.__sources_modify_query_times = {} self.__sources_base_ep_func = {} self.__sources_symbols = {} def _set_sources_dates(self, other, copy_symbols=True): self.__sources_keys_dates.update(other._get_sources_dates()) self.__sources_modify_query_times.update(other._get_sources_modify_query_times()) self.__sources_base_ep_func.update(other._get_sources_base_ep_func()) if copy_symbols: self.__sources_symbols.update(other._get_sources_symbols()) else: # this branch is applicable for the bound symbols with callbacks, # where we drop all adaptive symbols and keep only manually specified # symbols manually_bound = { key: _ManuallyBoundValue(value) for key, value in other._get_sources_symbols().items() if value is not adaptive and value is not adaptive_to_default } self.__sources_symbols.update(manually_bound) self.__source_has_output = other._get_source_has_output() def _change_sources_keys(self, keys: dict): """ Change keys in sources dictionaries. Need to do it, for example, after rebuilding the node history with new keys. Parameters ---------- keys: dict Mapping from old key to new key """ sources = (self.__sources_keys_dates, self.__sources_modify_query_times, self.__sources_base_ep_func, self.__sources_symbols) for dictionary in sources: for key in list(dictionary): dictionary[keys[key]] = dictionary.pop(key) def _get_source_has_output(self): return self.__source_has_output def _get_sources_dates(self): return self.__sources_keys_dates def _get_sources_modify_query_times(self): return self.__sources_modify_query_times def _get_sources_base_ep_func(self): return self.__sources_base_ep_func def _get_sources_symbols(self): return self.__sources_symbols def use_name_for_column_prefix(self, flag=None): if flag is not None: self.__use_name_for_column_prefix = flag return self.__use_name_for_column_prefix def _check_key_in_properties(self, key: str) -> bool: if key in self.__class__._PROPERTIES: return True if key.replace('_' + Source.__name__.lstrip('_'), "") in self.__class__._PROPERTIES: return True if key.replace(self.__class__.__name__, "") in self.__class__._PROPERTIES: return True return False def __setattr__(self, key, value): if self._check_key_in_properties(key): self.__dict__[key] = value return if isinstance(value, _BaseCutBuilder): value(key) return value = self.__validate_before_setting(key, value) if key in self.__dict__: field = self.__dict__[key] if issubclass(type(field), _Column): self.__update_field(field, value) else: raise AttributeError(f'Column "{key}" not found') else: assert not ( isinstance(value, _StateColumn) and value.obj_ref is None ), "State variables should be in `state` field" self.__add_field(key, value) def __validate_before_setting(self, key, value): if key in ["Symbol", "_SYMBOL_NAME"]: raise ValueError("Symbol setting is supported during creation only") if key == "_state_vars": raise ValueError("state field is necessary for keeping state variables and can't be rewritten") if isinstance(value, ott.ExpressionDefinedTimeOffset): value = value.n if isinstance(value, np.generic): value = value.item() if not (ott.is_type_supported(ott.get_object_type(value)) or isinstance(value, _Operation) or type(value) is tuple or isinstance(value, _ColumnAggregation)): raise TypeError(f'It is not allowed to set objects of "{type(value)}" type') return value def _update_field(self, field, value): self.__update_field(field, value) def __update_field(self, field, value): if isinstance(value, _ColumnAggregation): value.apply(self, str(field)) return def _replace_positive_lag_operator_with_tmp_column(operation): if isinstance(operation, _LagOperator) and operation.index > 0: column = operation._op_params[0] name = column.name if name.startswith("__"): raise ValueError("Column name started with two underscores should be used by system only, " "please do not use such names.") name = f"__{name}_{operation.index}_NEW__" return _Column(name, column.dtype, column.obj_ref, precision=getattr(column, "_precision", None)) new_names, old_names = self.__get_old_and_new_names(_replace_positive_lag_operator_with_tmp_column, value) if old_names: self.sink(otq.AddFields(", ".join(f"{new} = {arg}" for arg, new in zip(old_names, new_names)))) if type(value) is tuple: # support to be compatible with adding fields to get rid of some strange problems # but really we do not use passed type, because update field does not support it value, _ = value convert_to_type = None str_value = ott.value2str(value) value_dtype = ott.get_object_type(value) base_type = ott.get_base_type(value_dtype) if base_type is bool: # according OneTick base_type = float type_changes = False # because mantis 0021194 if base_type is str: # update_field non-string field to string field (of any length) or value # changes type to default string if not issubclass(field.dtype, str): field._dtype = str type_changes = True else: if ( (issubclass(field.dtype, int) or issubclass(field.dtype, float) or issubclass(field.dtype, str)) and (issubclass(value_dtype, ott.msectime) or issubclass(value_dtype, ott.nsectime)) and (str(field) != 'TIMESTAMP') and (not isinstance(field, _StateColumn)) ): # in OneTick after updating fields with functions that return datetime values # the type of column will not change for long and double columns # and will change to long (or double in older versions) when updating string column # (see BDS-267) # That's why we are explicitly setting type for returned value convert_to_type = value_dtype if issubclass(field.dtype, str): # using update_field only for string because update_fields preserves type # by default and raises exception if it can't be done type_changes = True elif issubclass(field.dtype, float) and base_type is int and not isinstance(field, _StateColumn): # PY-574 if field was float and int, then # it is still float in onetick, so no need to change type on otp level convert_to_type = int elif (issubclass(field.dtype, ott.msectime) or issubclass(field.dtype, ott.nsectime)) and base_type is int: # if field was time type and we add something to it, then # no need to change type pass elif issubclass(field.dtype, str) and base_type is int: type_changes = True convert_to_type = int else: if issubclass(value_dtype, bool): value_dtype = float if isinstance(field, _StateColumn): pass # do nothing else: field._dtype = value_dtype type_changes = True # for aliases, TIMESTAMP ~ Time as an example key = str(field) if key == "TIMESTAMP": self.__update_timestamp(key, value, str_value) elif type_changes: if (value_dtype in [otp.nsectime, int] and issubclass(field.dtype, str)): # PY-416 string field changing the type from str to datetime leads to losing nanoseconds # BE-142 similar issue with int from string: OneTick convert str to float, and then to int # so we lose some precision for big integers # work around is: make a new column first, delete accessor column # and then recreate it with value from temp column self.sink(otq.AddField(field=f"_TMP_{key}", value=str_value)) self.sink(otq.Passthrough(fields=key, drop_fields=True)) self.sink(otq.AddField(field=f"{key}", value=f"_TMP_{key}")) self.sink(otq.Passthrough(fields=f"_TMP_{key}", drop_fields=True)) else: self.sink(otq.UpdateField(field=key, value=str_value)) else: self.sink(otq.UpdateFields(set=key + "=" + str_value)) if new_names: self.sink(otq.Passthrough(drop_fields=True, fields=",".join(new_names))) if convert_to_type: # manual type conversion after update fields for some cases self.table(**{key: convert_to_type}, inplace=True, strict=False) def __update_timestamp(self, key, value, str_value): if not hasattr(value, "dtype"): # A constant value: no need to pre- or post-sort self.sink(otq.UpdateField(field=key, value=str_value)) elif ( isinstance(value, _Column) and not isinstance(value, _StateColumn) and hasattr(value, "name") and value.name in self.__dict__ ): # An existing, present column: no need to create a temporary one. See PY-253 need_to_sort = str_value not in ("_START_TIME", "_END_TIME") if need_to_sort: self.sort(value, inplace=True) self.sink(otq.UpdateField(field=key, value=str_value)) if need_to_sort: self.sort(self.Time, inplace=True) elif (not is_compare(value) and isinstance(value, (_Operation, _LambdaIfElse)) or is_arithmetical(value)): # An expression or a statevar: create a temp column with its value, pre- and post- sort. self.sink(otq.AddField(field="__TEMP_TIMESTAMP__", value=str_value)) self.sink(otq.OrderByEp(order_by="__TEMP_TIMESTAMP__ ASC")) self.sink(otq.UpdateField(field=key, value="__TEMP_TIMESTAMP__")) self.sink(otq.Passthrough(fields="__TEMP_TIMESTAMP__", drop_fields=True)) self.sort(self.Time, inplace=True) else: raise Exception(f"Illegal type for timestamp assignment: {value.__class__}") def __get_old_and_new_names(self, _replace_positive_lag_operator_with_tmp_column, value): old_args, new_args = _replace_parameters(value, _replace_positive_lag_operator_with_tmp_column) old_args = {str(old): (old, new) for old, new in zip(old_args, new_args)} old_names = [] new_names = [] for op_str, (_, new) in old_args.items(): old_names.append(op_str) new_names.append(str(new)) return new_names, old_names def append(self, other) -> 'Source': """ Merge data source with `other` Parameters ---------- other: List, Source data source to merge Returns ------- Source """ if isinstance(other, list): return onetick.py.functions.merge(other + [self]) else: return onetick.py.functions.merge([self, other]) def __add_field(self, key, value): # ------------- # ADD_FIELDS # ------------- # TODO: merge together several add fields in one add_fields if isinstance(value, _ColumnAggregation): value.apply(self, key) return if type(value) is tuple: value, dtype = value else: dtype = ott.get_object_type(value) if type(value) is str: if len(value) > ott.string.DEFAULT_LENGTH: dtype = ott.string[len(value)] if issubclass(dtype, bool): # according to OneTick transformations dtype = float if issubclass(dtype, (ott.datetime, ott.date)): # according to OneTick transformations dtype = ott.nsectime # TODO: shouldn't all such logic be in ott.type2str? if np.issubdtype(dtype, np.integer): dtype = int if np.issubdtype(dtype, np.floating): dtype = float type_str = ott.type2str(dtype) # format value str_value = ott.value2str(value) self.sink(otq.AddField(field=f'{type_str} {key}', value=str_value)) self.__dict__[key] = _Column(key, dtype, self)
[docs] def __getitem__(self, item): """ Allows to express multiple things: - access a field by name - filter ticks by condition - select subset of fields - set order of fields Parameters ---------- item: str, :class:`Operation`, :func:`eval`, list of str - ``str`` is to access column by name or columns specified by regex. - ``Operation`` to express filter condition. - ``otp.eval`` to express filter condition based on external query - ``List[str]`` select subset of specified columns or columns specified in regexes. - ``slice[List[str]::]`` set order of columns - ``slice[Tuple[str, Type]::]`` type defaulting - ``slice[:]`` alias to :meth:`Source.copy()` - ``slice[int:int:int]`` select ticks the same way as elements in python lists Returns ------- Column, Source or tuple of Sources - Column if column name was specified. - Two sources if filtering expression or eval was provided: the first one is for ticks that pass condition and the second one that do not. Examples -------- Access to the `X` column: add `Y` based on `X` >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'] * 2 >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 2 1 2003-12-01 00:00:00.001 2 4 2 2003-12-01 00:00:00.002 3 6 Filtering based on expression >>> data = otp.Ticks(X=[1, 2, 3]) >>> data_more, data_less = data[(data['X'] > 2)] >>> otp.run(data_more) Time X 0 2003-12-01 00:00:00.002 3 >>> otp.run(data_less) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 Filtering based on the result of another query. Another query should have only one tick as a result with only one field (whatever it names). >>> exp_to_select = otp.Ticks(WHERE=['X > 2']) >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1]) >>> data, _ = data[otp.eval(exp_to_select)] >>> otp.run(data) Time X Y Z 0 2003-12-01 00:00:00.002 3 c 0.1 Select subset of specified columns >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1]) >>> data = data[['X', 'Z']] >>> otp.run(data) Time X Z 0 2003-12-01 00:00:00.000 1 0.4 1 2003-12-01 00:00:00.001 2 0.3 2 2003-12-01 00:00:00.002 3 0.1 Slice with list will keep all columns, but change order: >>> data=otp.Tick(Y=1, X=2, Z=3) >>> otp.run(data) Time Y X Z 0 2003-12-01 1 2 3 >>> data = data[['X', 'Y']:] >>> otp.run(data) Time X Y Z 0 2003-12-01 2 1 3 Slice can be used as short-cut for :meth:`Source.copy`: >>> data[:] # doctest: +ELLIPSIS <onetick.py.sources.Tick object at ...> Slices can use integers. In this case ticks are selected the same way as elements in python lists. >>> data = otp.Ticks({'A': [1, 2, 3, 4, 5]}) Select first 3 ticks: >>> otp.run(data[:3]) Time A 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 Skip first 3 ticks: >>> otp.run(data[3:]) Time A 0 2003-12-01 00:00:00.003 4 1 2003-12-01 00:00:00.004 5 Select last 3 ticks: >>> otp.run(data[-3:]) Time A 0 2003-12-01 00:00:00.002 3 1 2003-12-01 00:00:00.003 4 2 2003-12-01 00:00:00.004 5 Skip last 3 ticks: >>> otp.run(data[:-3]) Time A 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 Skip first and last tick: >>> otp.run(data[1:-1]) Time A 0 2003-12-01 00:00:00.001 2 1 2003-12-01 00:00:00.002 3 2 2003-12-01 00:00:00.003 4 Select every second tick: >>> otp.run(data[::2]) Time A 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.002 3 2 2003-12-01 00:00:00.004 5 Select every second tick, not including first and last tick: >>> otp.run(data[1:-1:2]) Time A 0 2003-12-01 00:00:00.001 2 1 2003-12-01 00:00:00.003 4 Regular expressions can be used to select fields too: >>> data = otp.Tick(A=1, AA=2, AB=3, B=4, BB=5, BA=6) >>> otp.run(data['A.*']) Time A AA AB BA 0 2003-12-01 1 2 3 6 Note that by default pattern is matched in any position of the string. Use characters ^ and $ to specify start and end of the string: >>> otp.run(data['^A']) Time A AA AB 0 2003-12-01 1 2 3 Several regular expressions can be specified too: >>> otp.run(data[['^A+$', '^B+$']]) Time A AA B BB 0 2003-12-01 1 2 4 5 See Also -------- | :meth:`Source.table`: another and more generic way to select subset of specified columns | **PASSTHROUGH** OneTick event processor | **WHERE_CLAUSE** OneTick event processor """ strict = True if isinstance(item, (_Operation, _QueryEvalWrapper)): if isinstance(item, _Operation): item = item._make_python_way_bool_expression() where = self.copy(ep=otq.WhereClause(where=str(item))) if_source = where.copy() if_source.node().out_pin("IF") else_source = where.copy() else_source.node().out_pin("ELSE") # TODO: add ability to remove then this ep, because it is required only for right output else_source.sink(otq.Passthrough()) return if_source, else_source if isinstance(item, slice): result = self._get_integer_slice(item) if result: return result if item.step: raise AttributeError("Source columns slice with step set makes no sense") if item.start and item.stop: raise AttributeError("Source columns slice with both start and stop set is not available now") if not item.start and item.stop: raise AttributeError("Source columns slice with only stop set is not implemented yet") if item.start is None and item.stop is None: return self.copy() item = item.start strict = False if isinstance(item, tuple): item = dict([item]) elif isinstance(item, list): if not item: return self.copy() item_type = list(set([type(x) for x in item])) if len(item_type) > 1: raise AttributeError(f"Different types {item_type} in slice list is not supported") if item_type[0] == tuple: item = dict(item) if isinstance(item, (list, str)): # check if item has regex characters item_list = [item] if isinstance(item, str) else item try: items_to_passthrough, use_regex = self._columns_names_regex(item_list) except TypeError: use_regex = False if use_regex: src = self.copy() src.sink(otq.Passthrough(fields=','.join(items_to_passthrough), use_regex=True)) return src if isinstance(item, list): # --------- # TABLE # --------- items = [] for it in item: if isinstance(it, _Column): items.append(it.name) elif isinstance(it, str): items.append(it) else: raise ValueError(f"It is not supported to filter '{it}' object of '{type(it)}' type") # validation for item in items: if item not in self.schema: existing_columns = ", ".join(self.schema.keys()) raise AttributeError( f"There is no '{item}' column. There are existing columns: {existing_columns}" ) columns = {} dtypes = self.columns(skip_meta_fields=True) for column_name in items: if column_name not in ['Time', 'Timestamp', 'TIMESTAMP']: columns[column_name] = dtypes[column_name] return self.table(strict=strict, **columns) if isinstance(item, dict): return self.table(strict=strict, **item) # way to set type if isinstance(item, tuple): name, dtype = item warnings.warn('Using tuple with name and type in otp.Source.__getitem__() is not supported anymore,' ' change your code to use otp.Source.schema object instead.', DeprecationWarning) return self._set_field_by_tuple(name, dtype) name = item if name not in self.__dict__: raise KeyError(f'Column name {name} is not in the schema. Please, check that this column ' 'is in the schema or add it using the .schema property') if not isinstance(self.__dict__[name], _Column): raise AttributeError(f"There is no '{name}' column") return self.__dict__[name]
def _set_field_by_tuple(self, name, dtype): # TODO: move this logic to self.set_schema() warnings.warn('Using _set_field_by_tuple() is not recommended,' ' change your code to use otp.Source.schema object.', DeprecationWarning) if name not in self.__dict__: if dtype is None: raise KeyError(f'Column name {name} is not in the schema. Please, check that this column ' 'is in the schema or add it using the .schema property') if name == 0 or name == 1: raise ValueError(f"constant {name} are not supported for indexing for now, please use otp.Empty") if type(name) in (int, float): raise ValueError("integer indexes are not supported") self.__dict__[name] = _Column(name, dtype, self) else: if not isinstance(self.__dict__[name], _Column): raise AttributeError(f"There is no '{name}' column") if dtype: type1, type2 = self.__dict__[name].dtype, dtype b_type1, b_type2 = ott.get_base_type(type1), ott.get_base_type(type2) if b_type1 != b_type2: if {type1, type2} == {int, float}: self.__dict__[name]._dtype = float else: raise Warning( f"Column '{name}' was declared as '{type1}', but you want to change it to '{type2}', " "that is not possible without setting type directly via assigning value" ) else: if issubclass(b_type1, str): t1_length = ott.string.DEFAULT_LENGTH if type1 is str else type1.length t2_length = ott.string.DEFAULT_LENGTH if type2 is str else type2.length self.__dict__[name]._dtype = type2 if t1_length < t2_length else type1 if {type1, type2} == {ott.nsectime, ott.msectime}: self.__dict__[name]._dtype = ott.nsectime return self.__dict__[name] def _get_integer_slice(self, item: slice) -> Optional['Source']: """ Treat otp.Source object as a sequence of ticks and apply common python integer slicing logic to it. """ start, stop, step = item.start, item.stop, item.step for v in (start, stop, step): if v is not None and not isinstance(v, int): return None # let's filter out cases that we don't want to support if step is not None and step <= 0: raise ValueError("step value can't be negative or zero") if stop is not None and stop == 0: raise ValueError("stop value can't be zero") if start and stop and start > 0 and stop > 0 and start >= stop: raise ValueError("stop value can't be less than start") if start and start < 0 and stop and stop > 0: raise ValueError("start value can't be negative when start value is positive") def add_counter(src, force=False): if '__NUM__' not in src.schema or force: if '__NUM__' in src.schema: src = src.drop('__NUM__') src = src.agg({'__NUM__': otp.agg.count()}, running=True, all_fields=True) return src result = self.copy() if start: if start > 0: result = add_counter(result) result, _ = result[result['__NUM__'] > start] if start < 0: result = result.last(-start) if stop: if stop > 0: result = add_counter(result) result, _ = result[result['__NUM__'] <= stop] if stop < 0: result = add_counter(result) last_ticks = result.last(-stop) last_ticks['__FLAG__'] = 1 last_ticks = last_ticks[['__FLAG__', '__NUM__']] result = otp.join(result, last_ticks, on=result['__NUM__'] == last_ticks['__NUM__'], how='outer', rprefix='RIGHT') result, _ = result[result['__FLAG__'] == 0] result = result.drop(['__FLAG__', 'RIGHT___NUM__']) if step: if step > 0: # resetting counter result = add_counter(result, force=True) result, _ = result[(result['__NUM__'] - 1) % step == 0] if '__NUM__' in result.schema: result = result.drop('__NUM__') return result
[docs] def __setitem__(self, key, value): """ Add new column to the source or update existing one. Parameters ---------- key: str The name of the new or existing column. value: int, str, float, datetime, date, \ :py:class:`~onetick.py.Column`, :py:class:`~onetick.py.Operation`, :py:class:`~onetick.py.string`, \ :py:class:`otp.date <onetick.py.date>`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`~onetick.py.nsectime`, :py:class:`~onetick.py.msectime` The new value of the column. See also -------- | **ADD_FIELD** OneTick event processor | **UPDATE_FIELD** OneTick event processor Examples -------- >>> data = otp.Tick(A='A') >>> data['D'] = otp.datetime(2022, 2, 2) >>> data['X'] = 1 >>> data['Y'] = data['X'] >>> data['X'] = 12345 >>> data['Z'] = data['Y'].astype(str) + 'abc' >>> otp.run(data) Time A D X Y Z 0 2003-12-01 A 2022-02-02 12345 1 1abc """ return self.__setattr__(key, value)
[docs] @inplace_operation def update(self, if_set, else_set=None, where=1, inplace=False) -> 'Source': """ Update field of the Source or state variable. Parameters ---------- if_set: dict Dictionary <field name>: <expression>. else_set: dict, optional Dictionary <field name>: <expression> where: expression, optional Condition of updating. If ``where`` is True the fields from ``if_set`` will be updated with corresponding expression. If ``where`` is False, the fields from ``else_set`` will be updated with corresponding expression. inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None``. See also -------- **UPDATE_FIELD** and **UPDATE_FIELDS** OneTick event processors Examples -------- Columns can be updated with this method: >>> # OTdirective: snippet-name: Arrange.conditional update; >>> t = otp.Ticks({'X': [1, 2, 3], ... 'Y': [4, 5, 6], ... 'Z': [1, 0, 1]}) >>> t = t.update(if_set={'X': t['X'] + t['Y']}, ... else_set={'X': t['X'] - t['Y']}, ... where=t['Z'] == 1) >>> otp.run(t) # OTdirective: snippet-example; Time X Y Z 0 2003-12-01 00:00:00.000 5 4 1 1 2003-12-01 00:00:00.001 -3 5 0 2 2003-12-01 00:00:00.002 9 6 1 State variables can be updated too: >>> t = otp.Ticks({'X': [1, 2, 3], ... 'Y': [4, 5, 6], ... 'Z': [1, 0, 1]}) >>> t.state_vars['X'] = 0 >>> t = t.update(if_set={t.state_vars['X']: t['X'] + t['Y']}, ... else_set={t.state_vars['X']: t['X'] - t['Y']}, ... where=t['Z'] == 1) >>> t['UX'] = t.state_vars['X'] >>> otp.run(t) Time X Y Z UX 0 2003-12-01 00:00:00.000 1 4 1 5 1 2003-12-01 00:00:00.001 2 5 0 -3 2 2003-12-01 00:00:00.002 3 6 1 9 """ if else_set is None: else_set = {} if len(if_set) == 0 or not isinstance(if_set, dict): raise ValueError( f"'if_set' parameter should be non empty dict, but got '{if_set}' of type '{type(if_set)}'" ) def _prepare(to_prepare): result = {} for in_obj, out_obj in to_prepare.items(): if isinstance(in_obj, _StateColumn): result[in_obj] = out_obj elif isinstance(in_obj, _Column): result[in_obj.name] = out_obj elif isinstance(in_obj, str): result[in_obj.strip()] = out_obj else: raise AttributeError( f"It is not supported to update item '{in_obj}' of type '{type(in_obj)}'" ) return result def _validate(to_validate): result = {} for in_key, out_obj in to_validate.items(): if isinstance(in_key, _StateColumn): in_dtype = in_key.dtype else: if not (in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column)): raise AttributeError(f"There is no '{in_key}' column to update") if in_key == "Time" or in_key == "TIMESTAMP": raise ValueError("It is not allowed to modify 'Time' column using .update method") in_dtype = self.schema[in_key] dtype = ott.get_object_type(out_obj) if not ott.is_type_supported(dtype): raise TypeError(f"Deduced type of object {out_obj} is not supported: {dtype}") # will raise exception if types are incompatible _ = ott.get_type_by_objects([dtype, in_dtype]) if isinstance(in_key, _StateColumn): in_key = str(in_key) if isinstance(out_obj, bool): out_obj = int(out_obj) result[in_key] = out_obj return result # prepare and validate items = _validate(_prepare(if_set)) else_items = _validate(_prepare(else_set)) if isinstance(where, bool): where = int(where) if not (getattr(where, "dtype", None) is bool or isinstance(where, int)): raise ValueError(f"Where has not supported type '{type(where)}'") # apply set_rules = [f"{key}=({ott.value2str(value)})" for key, value in items.items()] else_set_rules = [f"{key}=({ott.value2str(value)})" for key, value in else_items.items()] self.sink(otq.UpdateFields(set=",".join(set_rules), else_set=",".join(else_set_rules), where=str(where))) return self
[docs] @docstring(parameters=[_agg_doc, _running_doc, _all_fields_with_policy_doc, _bucket_interval_doc, _bucket_time_doc, _bucket_units_doc, _bucket_end_condition_doc, _end_condition_per_group_doc, _boundary_tick_bucket_doc, _group_by_doc], add_self=True) def agg(self, aggs, *args, **kwargs) -> 'Source': """ Applies composition of :ref:`otp.agg <aggregations_funcs>` aggregations See Also -------- | :ref:`Aggregations <aggregations_funcs>` | **COMPUTE** OneTick event processor Examples -------- By default the whole data is aggregated: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}) >>> otp.run(data) Time X_SUM 0 2003-12-04 10 Multiple aggregations can be applied at the same time: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X'), ... 'X_MEAN': otp.agg.average('X')}) >>> otp.run(data) Time X_SUM X_MEAN 0 2003-12-04 10 2.5 Aggregation can be used in running mode: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True) >>> otp.run(data) Time CUM_SUM 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 3 2 2003-12-01 00:00:00.002 6 3 2003-12-01 00:00:00.003 10 Aggregation can be split in buckets: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks') >>> otp.run(data) Time X_SUM 0 2003-12-01 00:00:00.001 3 1 2003-12-01 00:00:00.003 7 Running aggregation can be used with buckets too. In this case (all_fields=False and running=True) output ticks are created when a tick enters or leaves the sliding window (that's why for this example there are 8 output ticks for 4 input ticks): >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600]) >>> data = data.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... running=True, bucket_interval=2) >>> otp.run(data) Time X_MEAN X_STD 0 2003-12-01 00:00:00.000 1.0 0.000000 1 2003-12-01 00:00:01.000 1.5 0.500000 2 2003-12-01 00:00:01.500 2.0 0.816497 3 2003-12-01 00:00:02.000 2.5 0.500000 4 2003-12-01 00:00:03.000 3.0 0.000000 5 2003-12-01 00:00:03.500 NaN NaN 6 2003-12-01 00:00:03.600 4.0 0.000000 7 2003-12-01 00:00:05.600 NaN NaN If all_fields=True an output tick is generated only for arrival events, but all attributes from the input tick causing an arrival event are copied over to the output tick and the aggregation is added as another attribute: >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600]) >>> data = data.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... all_fields=True, running=True) >>> otp.run(data) Time X X_MEAN X_STD 0 2003-12-01 00:00:00.000 1 1.0 0.000000 1 2003-12-01 00:00:01.000 2 1.5 0.500000 2 2003-12-01 00:00:01.500 3 2.0 0.816497 3 2003-12-01 00:00:03.600 4 2.5 1.118034 ``all_fields`` parameter can be used when there is need to have all original fields in the output: >>> ticks = otp.Ticks(X=[3, 4, 1, 2]) >>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... all_fields=True) >>> otp.run(data) Time X X_MEAN X_STD 0 2003-12-04 3 2.5 1.118034 There are different politics for ``all_fields`` parameter: >>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... all_fields="last") >>> otp.run(data) Time X X_MEAN X_STD 0 2003-12-04 2 2.5 1.118034 For low/high policies the field selected as input is set this way: >>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... all_fields=otp.agg.low_tick(data["X"])) >>> otp.run(data) Time X X_MEAN X_STD 0 2003-12-04 1 2.5 1.118034 """ aggs = aggs.copy() result = self.copy() what_to_aggregate = aggregations.compute( *args, **kwargs, ) for name, ag in aggs.items(): what_to_aggregate.add(name, ag) result = what_to_aggregate.apply(result) result._add_table() return result
def sort_values(self, *args, **kwargs): """ alias of sort See Also -------- :meth:`Source.sort` """ return self.sort(*args, **kwargs)
[docs] @inplace_operation def sort(self, by: Union[str, Collection[Union[str, 'onetick.py.Column']]], ascending=True, inplace=False) -> Optional['Source']: """ Sort ticks by columns. Parameters ---------- by: str, Column or list of them Column(s) to sort by. It is possible to pass a list of column, where is the order is important: from the left to the right. ascending: bool or list Order to sort by. If list of columns is specified, then list of ascending values per column is expected. (the :class:`nan` is the smallest for ``float`` type fields) inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object Returns ------- :class:`Source` See also -------- **ORDER_BY** OneTick event processor Examples -------- Single column examples >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['X']) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.001 5 3.1 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.000 94 NaN >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['Y']) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 Inplace >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data.sort(data['Y'], inplace=True) # OTdirective: snippet-name: Arrange.sort.inplace; >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 Multiple columns >>> data = otp.Ticks({'X':[ 5, 6, 3, 6], ... 'Y':[1.4, 3.1, 9.1, 5.5]}) >>> data = data.sort([data['X'], data['Y']]) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.002 3 9.1 1 2003-12-01 00:00:00.000 5 1.4 2 2003-12-01 00:00:00.001 6 3.1 3 2003-12-01 00:00:00.003 6 5.5 Ascending/descending control >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['X'], ascending=False) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 >>> # OTdirective: snippet-name: Arrange.sort.sort; >>> data = otp.Ticks({'X':[ 5, 6, 3, 6], ... 'Y':[1.4, 3.1, 9.1, 5.5]}) >>> data = data.sort([data['X'], data['Y']], ascending=[True, False]) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.002 3 9.1 1 2003-12-01 00:00:00.000 5 1.4 2 2003-12-01 00:00:00.003 6 5.5 3 2003-12-01 00:00:00.001 6 3.1 """ columns = by if isinstance(columns, list): objs = columns else: objs = [columns] if isinstance(ascending, list): asc_objs = ascending else: asc_objs = [ascending] items = [] # ------------------------------- # Columns processing # ------------------------------- # convert to strings # TODO: it seems as a common code, need to move to a separate function for obj in objs: if isinstance(obj, _Column): items.append(obj.name) elif isinstance(obj, str): items.append(obj) else: # TODO: cover with tests raise TypeError(f"It is not supported to order by '{obj}' of type '{type(obj)}'") # validate for item in items: if item in self.__dict__: if not isinstance(self.__dict__[item], _Column): # TODO: cover with tests raise AttributeError(f"There is no '{item}' column") # if else: # TODO: covert with tests raise AttributeError(f"There is no '{item}' column") # ------------------------------- # Asc processing # ------------------------------- asc_items = [True] * len(items) def asc_convert(v): return "ASC" if v else "DESC" for inx in range(len(items)): if inx >= len(asc_objs): asc_obj = asc_items[inx] else: asc_obj = asc_objs[inx] if isinstance(asc_obj, bool): asc_items[inx] = asc_convert(asc_obj) elif isinstance(asc_obj, int): asc_items[inx] = asc_convert(asc_obj) else: raise TypeError(f"asc can not be '{asc_obj}' of type '{type(asc_obj)}'") # --------------- # combine together order_by = [column_name + " " + asc for column_name, asc in zip(items, asc_items)] self.sink(otq.OrderByEp(order_by=",".join(order_by))) return self
def _hash(self): return self.__hash def _merge_tmp_otq(self, source): self._tmp_otq.merge(source._tmp_otq) def __prepare_graph(self, symbols=None, start=None, end=None, has_output=False): # We copy object here, because we will change it according to passed # symbols and date ranges. For example, we can add modify_query_times EP # if it is necessary obj = self.copy() if has_output: obj.sink(otq.Passthrough()) start, end, symbols = obj._get_date_range(symbols, start, end) if start is adaptive: start = None if end is adaptive: end = None if symbols is not None and isinstance(symbols, pd.DataFrame): symbols = utils.get_symbol_list_from_df(symbols) if symbols is not None and not isinstance(symbols, list): symbols = [symbols] elif symbols is None: symbols = [] _symbols = [] for sym in symbols: _symbols.append(self._convert_symbol_to_string(sym, tmp_otq=obj._tmp_otq, start=start, end=end)) return obj, start, end, _symbols
[docs] def to_otq(self, file_name=None, file_suffix=None, query_name=None, symbols=None, start=None, end=None, timezone=None, raw=None, add_passthrough=True): """ Save data source to .otq file and return path to the saved file. Parameters ---------- file_name: str Absolute or relative path to the saved file. If ``None``, create temporary file and name it randomly. file_suffix: str Suffix to add to the saved file name (including extension). Can be specified if ``file_name`` is ``None`` to distinguish between different temporary files. Default: ".to_otq.otq" query_name: str Name of the main query in the created file. If ``None``, take name from this Source object. If that name is empty, set name to "query". symbols: str, list, :pandas:`DataFrame <pandas.DataFrame>`, :class:`Source` symbols to save query with start: datetime start time to save query with end: datetime end time to save query with timezone: str timezone to save query with raw .. deprecated:: 1.4.17 add_passthrough: bool will add :py:class:`onetick.query.Passthrough` event processor at the end of resulting graph Returns ------- result: str Relative (if ``file_name`` is relative) or absolute path to the created query in the format ``file_name::query_name`` """ if raw is not None: warnings.warn('The "raw" flag is deprecated and makes no effect', DeprecationWarning) if timezone is None: timezone = configuration.config.tz file_path = str(file_name) if file_name is not None else None if file_suffix is None: file_suffix = self._name_suffix('to_otq.otq') if query_name is None: query_name = self.get_name(remove_invalid_symbols=True) if query_name is None: query_name = 'query' obj, start, end, symbols = self.__prepare_graph(symbols, start, end) graph = obj._to_graph(add_passthrough=add_passthrough) graph.set_symbols(symbols) return obj._tmp_otq.save_to_file(query=graph, query_name=query_name, file_path=file_path, file_suffix=file_suffix, start=start, end=end, timezone=timezone)
def _store_in_tmp_otq(self, tmp_otq, operation_suffix="tmp_query", symbols=None, start=None, end=None, raw=None, add_passthrough=True, name=None): """ Adds this source to the tmp_otq storage Parameters: tmp_otq: TmpOtq Storage object operation_suffix: str Suffix string to be added to the autogenerated graph name in the otq file name: str, optional If specified, this ``name`` will be used to save query and ``suffix`` parameter will be ignored. Returns: result: str String with the name of the saved graph (starting with THIS::) """ # We copy object here, because we will change it according to passed # symbols and date ranges. For example, we can add modify_query_times EP # if it is necessary if raw is not None: warnings.warn('The "raw" flag is deprecated and makes no effect', DeprecationWarning) obj = self.copy() start, end, symbols = obj._get_date_range(symbols, start, end) tmp_otq.merge(obj._tmp_otq) if symbols and not isinstance(symbols, list): symbols = [symbols] elif symbols is None: symbols = [] _symbols = [] for sym in symbols: _symbols.append(self._convert_symbol_to_string(sym, tmp_otq)) if isinstance(start, ott.dt): # OT save_to_file checks for the datetime time start = datetime.fromtimestamp(start.timestamp()) if isinstance(end, ott.dt): end = datetime.fromtimestamp(end.timestamp()) graph = obj._to_graph(add_passthrough=add_passthrough) graph.set_start_time(start) graph.set_end_time(end) graph.set_symbols(_symbols) suffix = self._name_suffix(suffix=operation_suffix, separator='__', remove_invalid_symbols=True) return tmp_otq.add_query(graph, suffix=suffix, name=name) def _save_as_tmp_otq(self, start=None, end=None, symbols=None, suffix=""): tmp_otq = utils.TmpFile(f"{suffix}.otq") query_path = self.to_otq(tmp_otq, symbols=symbols, start=start, end=end) return query_path
[docs] def plot(self, y, x='Time', kind='line', **kwargs): """ Executes the query with known properties and builds a plot resulting dataframe. Uses the pandas dataframe plot method to plot data. Other parameters could be specified through the ``kwargs``. Parameters ---------- x: str Column name for the X axis y: str Column name for the Y axis kind: str The kind of plot Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data.plot(y='X', kind='bar') # doctest: +SKIP """ result = self.copy() return result[[y, x]]().plot(x=x, y=y, kind=kind, **kwargs)
[docs] def count(self, **kwargs): """ Returns the number of ticks in the query. Adds an aggregation that calculate total ticks count, and executes a query. Result is a single value -- number of ticks. Possible application is the jupyter when a developer wants to check data presences for example. Parameters ---------- kwargs parameters that will be passed to :py:func:`otp.run <onetick.py.run>` Returns ------- int See Also -------- :py:func:`otp.run <onetick.py.run>` Source.head, Source.tail, :py:func:`onetick.py.agg.count` Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data.count() 3 >>> data = otp.Empty() >>> data.count() 0 """ result = self.copy() result = otp.run(result.agg({'__num_rows': otp.agg.count()}), **kwargs) if result.empty: return 0 return int(result['__num_rows'][0])
[docs] def head(self, n=5, **kwargs) -> 'Union[pd.DataFrame, Source]': """ Executes the query and returns first ``n`` ticks as a pandas dataframe. It is useful in the jupyter case when you want to observe first ``n`` values. Parameters ---------- n: int, default=5 number of ticks to return kwargs: parameters will be passed to :py:func:`otp.run <onetick.py.run>` Returns ------- :pandas:`DataFrame <pandas.DataFrame>` See Also -------- :py:func:`otp.run <onetick.py.run>` Source.tail, Source.count Examples -------- >>> data = otp.Ticks(X=list('abcdefgik')) >>> data.head()[['X']] X 0 a 1 b 2 c 3 d 4 e """ result = self.copy() result = result.first(n=n) # pylint: disable=E1123 return otp.run(result, **kwargs)
[docs] def tail(self, n=5, **kwargs) -> Union['pd.DataFrame', 'Source']: """ Executes the query and returns last ``n`` ticks as a pandas dataframe. It is useful in the jupyter case when you want to observe last ``n`` values. Parameters ---------- n: int number of ticks to return kwargs: parameters will be passed to :py:func:`otp.run <onetick.py.run>` Returns ------- :pandas:`DataFrame <pandas.DataFrame>` See Also -------- :py:func:`otp.run <onetick.py.run>` :meth:`Source.head`, :meth:`Source.count` Examples -------- >>> data = otp.Ticks(X=list('abcdefgik')) >>> data.tail()[['X']] X 0 e 1 f 2 g 3 i 4 k """ result = self.copy() result = result.last(n=n) # pylint: disable=E1123 return otp.run(result, **kwargs)
def _rename_impl_(self, columns=None, use_regex=False, fields_to_skip=None): # prepare items = {} out_names = set() fields_to_skip = fields_to_skip or [] for in_obj, out_obj in columns.items(): if isinstance(in_obj, _Column): items[in_obj.name.strip()] = out_obj.strip() elif isinstance(in_obj, str): items[in_obj.strip()] = out_obj.strip() else: raise Exception(f"It is not supported to rename item '{in_obj}' of type {type(in_obj)}'") if out_obj in out_names: raise AttributeError( f"You want to rename '{in_obj}' into '{out_obj}', " f"but also want to rename another column into '{out_obj}'" ) out_names.add(out_obj) schema_update_dict = items if use_regex: schema_update_dict = self._get_columns_names_renaming(items, fields_to_skip) # validate for in_key, out_key in schema_update_dict.items(): if " " in out_key: raise AttributeError(f"There is space in '{out_key}' column name") if in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column): pass else: raise AttributeError(f"There is no '{in_key}' column to rename") if out_key in self.__dict__ and isinstance(self.__dict__[out_key], _Column): raise AttributeError(f"Column '{out_key}' is already exist") # apply for in_key, out_key in schema_update_dict.items(): self.__dict__[in_key].rename(out_key, update_parent_object=False) self.__dict__[out_key] = self.__dict__[in_key] del self.__dict__[in_key] rename_rules = [key + "=" + value for key, value in items.items()] kwargs = dict(rename_fields=",".join(rename_rules)) if use_regex: kwargs['use_regex'] = True if fields_to_skip: kwargs['fields_to_skip'] = ",".join(fields_to_skip) self.sink(otq.RenameFieldsEp(**kwargs)) def _get_columns_names_renaming(self, rename: Dict[str, str], not_to_rename: List[str]) -> Dict[str, str]: """ We can't be sure python Source has consistent columns cache, because sinking complex event processors can change columns unpredictable, so if user will specify regex as a param, we will pass regex as an onetick's param, but rename all matched columns from python Source cache. Parameters ---------- rename: Dict old_name -> new_name. Some of the dictionary's items use regex. Returns ------- output_dict: Dict old_name -> new_name used for renaming columns in Source. None of this dictionary's items use regex. """ output_dict = {} for old_name, new_name in rename.items(): matching_columns = [col for col in self.schema if re.match(old_name, col) and not self._is_excluded(col, not_to_rename)] for col in matching_columns: output_dict[col] = re.sub(old_name, new_name, col) return output_dict @staticmethod def _is_excluded(s: str, not_to_rename: List[str]) -> bool: for excluded in not_to_rename: if re.match(excluded, s): return True return False
[docs] @inplace_operation def rename(self, columns=None, use_regex=False, fields_to_skip=None, inplace=False) -> Optional['Source']: r""" Rename columns Parameters ---------- columns : dict Rules how to rename in the following format: {<column> : <new-column-name>}, where <column> is either existing column name of str type or reference to a column, and <new-column-name> a new column name of str type. use_regex: bool If true, then old-name=new-name pairs in the `columns` parameter are treated as regular expressions. This allows bulk renaming for field names. Notice that regular expressions for old names are treated as if both their prefix and their suffix are .*, i.e. the prefix and suffix match any substring. As a result, old-name *XX* will match all of *aXX*, *aXXB*, and *XXb*, when `use_regex=true`. You can have old-name begin from ^ to indicate that .* prefix does not apply, and you can have old name end at $ to indicate that .* suffix does not apply. Default: false fields_to_skip: list of str A list of regular expressions for specifying fields that should be skipped (i.e., not be renamed). If a field is matched by one of the specified regular expressions, it won't be considered for renaming. Default: None inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise, method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- **RENAME** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[1], Y=[2]) >>> data = data.rename({'X': 'XX', ... data['Y']: 'YY'}) >>> otp.run(data) Time XX YY 0 2003-12-01 1 2 >>> data = otp.Tick(**{'X.X': 1, 'X.Y': 2}) >>> data = data.rename({r'X\.(.*)': r'\1'}, use_regex=True) >>> otp.run(data) Time X Y 0 2003-12-01 1 2 >>> data = otp.Tick(**{'X.X': 1, 'X.Y': 2}) >>> data = data.rename({r'X\.(.*)': r'\1'}, use_regex=True, fields_to_skip=['X.Y']) >>> otp.run(data) Time X X.Y 0 2003-12-01 1 2 """ if columns is None: columns = {} self._rename_impl_(columns, use_regex, fields_to_skip) return self
[docs] @inplace_operation def execute(self, *operations, inplace=False) -> Optional['Source']: """ Execute operations without returning their values. Some operations in onetick.py can be used to modify the state of some object (tick sequences mostly) and in that case user may not want to save the result of the operation to column. Parameters ---------- operations : list of :py:class:`~onetick.py.Operation` operations to execute. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See Also -------- **EXECUTE_EXPRESSIONS** OneTick event processor Examples -------- >>> data = otp.Tick(A=1) >>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'A') >>> data = data.execute(data.state_vars['SET'].erase(A=1)) """ if not operations: raise ValueError('At least one operation must be specified in execute() method') op_str = ';'.join(map(str, operations)) self.sink(otq.ExecuteExpressions(op_str)) return self
def __refresh_hash(self): """ This internal function refreshes hash for every graph modification. It is used only in _ProxyNode, because it tracks nodes changes """ self.__hash = uuid.uuid3(uuid.NAMESPACE_DNS, str(self.__hash)) def _prepare_for_execution(self, symbols=None, start=None, end=None, start_time_expression=None, end_time_expression=None, timezone=None, has_output=None, running_query_flag=None, require_dict=False, node_name=None): if has_output is None: has_output = self.__source_has_output if timezone is None: timezone = configuration.config.tz obj, start, end, symbols = self.__prepare_graph(symbols, start, end, has_output) require_dict = require_dict or _is_dict_required(symbols) if node_name is None: node_name = 'SOURCE_CALL_MAIN_OUT_NODE' obj.node().node_name(node_name) graph = obj._to_graph(add_passthrough=False) graph.set_symbols(symbols) tmp_file = utils.TmpFile(suffix=self._name_suffix('run.otq')) query_to_run = obj._tmp_otq.save_to_file(query=graph, query_name=self.get_name(remove_invalid_symbols=True) if self.get_name(remove_invalid_symbols=True) else "main_query", file_path=tmp_file.path, start=start, end=end, running_query_flag=running_query_flag, start_time_expression=start_time_expression, end_time_expression=end_time_expression, timezone=timezone) # symbols and start/end times should be already stored in the query and should not be passed again return dict( query=query_to_run, symbols=None, start=None, end=None, start_time_expression=None, end_time_expression=None, require_dict=require_dict, node_name=node_name, time_as_nsec=True, )
[docs] def __call__(self, *args, **kwargs): """ .. deprecated:: 1.48.3 Use :py:func:`otp.run <onetick.py.run>` instead. """ warnings.warn('__call__() method is deprecated, use otp.run() instead', DeprecationWarning, stacklevel=2) return otp.run(self, *args, **kwargs)
[docs] def to_df(self, symbols=None, **kwargs): """ .. deprecated:: 1.48.3 Use :py:func:`otp.run <onetick.py.run>` instead. """ warnings.warn('to_df() method is deprecated, use otp.run() instead', DeprecationWarning, stacklevel=2) # For backward compatibility: otp.run() does not accept "symbols" as a non-keyword argument if symbols is not None: kwargs['symbols'] = symbols return otp.run(self, **kwargs)
to_dataframe = to_df def print_api_graph(self): self.node().copy_graph(print_out=True) def _add_table(self, strict=False): table = otq.Table( fields=",".join( ott.type2str(dtype) + " " + name for name, dtype in self.columns(skip_meta_fields=True).items() ), keep_input_fields=not strict, ) self.sink(table) def _is_unbound_required(self): """ Check whether a graph needs unbound symbol or not """ for symbol in self.__sources_symbols.values(): if symbol is adaptive or symbol is adaptive_to_default: return True return False def _get_widest_time_range(self): """ Get minimum start time and maximum end time. If time is not found, None is returned. """ start_times = [] end_times = [] for start, end in self.__sources_keys_dates.values(): if start is not adaptive: start_times.append(start) if end is not adaptive: end_times.append(end) start = min(start_times) if start_times else None end = max(end_times) if end_times else None return start, end def _get_date_range(self, symbols=None, start=None, end=None, default_start=None, default_end=None): #noqa if default_start is None: default_start = configuration.config.get('default_start_time', adaptive) if default_end is None: default_end = configuration.config.get('default_end_time', adaptive) # ------------ # # Find symbols need_to_bind_symbol = False common_symbol = None if symbols is None: # let's try to understand whether we could use common symbol for all sources # or we need to bound symbols instead first_symbol = None for symbol in self.__sources_symbols.values(): if first_symbol is None: first_symbol = symbol if isinstance(first_symbol, _ManuallyBoundValue): # Mark that we need to bound, but keep common_symbol equal to None. # It is applicable for the bound symbols inside the merge with bound # symbols, for example. need_to_bind_symbol = True else: common_symbol = symbol continue if symbol and symbol != first_symbol: need_to_bind_symbol = True common_symbol = None break # symbol is specified nowhere - just set unbound to the default one if (first_symbol is adaptive or first_symbol is adaptive_to_default) and ( common_symbol is adaptive or common_symbol is adaptive_to_default ): common_symbol = configuration.config.default_symbol else: # when unbound symbols passed common_symbol = symbols need_to_bind_symbol = True # use to check all sources whether some has bound symbols # Find max and min for _source data ranges sources_start, sources_end = self._get_widest_time_range() sources_start = sources_start or default_start sources_end = sources_end or default_end common_format = "%Y-%m-%d %H:%M:%S" for key, date_range in self.__sources_keys_dates.items(): # find a function that builds _source func = self.__sources_base_ep_func[key] if func: src = func() # -------------------------- # determine whether we have to add modify_query_times to a src start_date, end_date = date_range if not self.__sources_modify_query_times[key]: if start_date is not adaptive or end_date is not adaptive: # if some of the end is specified, then it means # we need to check whether it is worth to wrap into the # modify_query_times if start_date is adaptive: if start is None: start_date = sources_start else: start_date = start if end_date is adaptive: if end is None: end_date = sources_end else: end_date = end if start_date is not adaptive and end_date is not adaptive: # it might happen when either sources_start/end are adaptive # or start/end are adaptive if ( (start is None and sources_start is not adaptive and start_date != sources_start) or (start is not None and start_date != start) or (end is None and sources_end is not adaptive and end_date != sources_end) or (end is not None and end_date != end) ): self.__sources_modify_query_times[key] = True mqt = otq.ModifyQueryTimes( start_time='parse_time(("' + common_format + '.%q"),"' + start_date.strftime(common_format + ".%f") + '", _TIMEZONE)', end_time='parse_time(("' + common_format + '.%q"),"' + end_date.strftime(common_format + ".%f") + '", _TIMEZONE)', output_timestamp="TIMESTAMP", ) src.sink(mqt) if need_to_bind_symbol: bound = None if key in self.__sources_symbols: # TODO: this is wrong, we need to know about symbols # it happens when we do not copy symbols when apply # merge with bound symbols. # Wrong, in that case merge with bound symbol is # non distinguishable from the manually passed None # for external queries bound = self.__sources_symbols[key] if isinstance(bound, _ManuallyBoundValue): bound = bound.value if bound and bound is not adaptive and bound is not adaptive_to_default: src.__node.symbol(bound) else: # if key is not in __sources_symbols, then # it means that symbol was not specified, and # therefor use unbound symbol if common_symbol is None: if bound is adaptive_to_default: src.__node.symbol(configuration.config.default_symbol) else: pass # # TODO: write test validated this # # # raise Exception("One of the branch does not have symbol specified") # -------------------------- # glue _source with the main graph self.node().add_rules(src.node().copy_rules()) self.source_by_key(src.node().copy_graph(), key) self._merge_tmp_otq(src) else: pass if start is None: start = sources_start if end is None: end = sources_end return start, end, common_symbol def _to_graph(self, add_passthrough=True): """ Construct the graph. Only for internal usage. It is private, because it constructs the raw graph assuming that a graph is already defined, and might confuse an end user, because by default Source is not fully defined; it becomes fully defined only when symbols, start and end datetime are specified. """ constructed_obj = self.copy() # we add it for case when the last EP has a pin output if add_passthrough: constructed_obj.sink(otq.Passthrough()) return otq.GraphQuery(constructed_obj.node().get())
[docs] def to_graph(self, raw=None, symbols=None, start=None, end=None, *, add_passthrough=True): """ Construct an :py:class:`onetick.query.GraphQuery` object. Parameters ---------- raw: .. deprecated:: 1.4.17 has no effect symbols: symbols query to add to otq.GraphQuery start: datetime start time of a query end: datetime end time of a query add_passthrough: bool add additional :py:class:`onetick.query.Passthrough` event processor to the end of a resulted graph Returns ------- otq.GraphQuery See Also -------- :meth:`render` """ if raw is not None: warnings.warn('The "raw" flag is deprecated and makes not effect', DeprecationWarning) _obj, _start, _end, _symbols = self.__prepare_graph(symbols, start, end) if _obj._tmp_otq.queries: warnings.warn('Using .to_graph() for a Source object that uses sub-queries! ' 'This operation is deprecated and is not guaranteed to work as expected. ' 'Such a Source should be executed using otp.run() or saved to disk using to_otq()', DeprecationWarning) _obj.sink(otq.Passthrough().output_pin_name('OUT_FOR_TO_GRAPH')) _graph = _obj._to_graph(add_passthrough=False) _graph.set_start_time(_start) _graph.set_end_time(_end) _graph.set_symbols(_symbols) query = _obj._tmp_otq.save_to_file( query=_graph, file_suffix='_to_graph.otq') query_path, query_name = query.split('::') query_params = get_query_parameter_list(query_path, query_name) source_with_nested_query = otp.Query(otp.query(query, **{param: f'${param}' for param in query_params}), out_pin='OUT_FOR_TO_GRAPH') return source_with_nested_query.to_graph( symbols=_symbols, start=_start, end=_end, add_passthrough=add_passthrough) else: return _obj._to_graph(add_passthrough=add_passthrough)
[docs] def render(self, **kwargs): """ Renders a calculation graph using the ``graphviz`` library. Every node is the onetick query language event processor. Nodes in nested queries, first stage queries and eval queries are not shown. Could be useful for debugging and in jupyter to learn the underlying graph. Note that it's required to have :graphviz:`graphviz <>` package installed. Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data1, data2 = data[(data['X'] > 2)] >>> data = otp.merge([data1, data2]) >>> data.render() # doctest: +SKIP .. graphviz:: ../../static/render_example.dot """ kwargs.setdefault('verbose', True) self._to_graph().render(**kwargs)
[docs] @inplace_operation def write(self, db: Union[str, 'otp.DB'], symbol: Union[str, 'otp.Column', None] = None, tick_type: Union[str, 'otp.Column', None] = None, date: Union[date, Type[adaptive], None] = adaptive, start: Optional[date] = None, end: Optional[date] = None, append: bool = True, keep_symbol_and_tick_type: Union[bool, Type[adaptive]] = adaptive, propagate: bool = True, out_of_range_tick_action: Literal['exception', 'ignore', 'load'] = 'exception', timestamp: Optional['otp.Column'] = None, keep_timestamp: bool = True, correction_type: Optional['otp.Column'] = None, replace_existing_time_series: bool = False, allow_concurrent_write: bool = False, context: Union[str, Type[adaptive]] = adaptive, use_context_of_query: bool = False, inplace: bool = False, **kwargs) -> Optional['Source']: """ Saves data result to OneTick database. Note ---- This method does not save anything. It adds instruction in query to save. Data will be saved when query will be executed. Using ``start``+``end`` parameters instead of single ``date`` have some limitations: * ``inplace`` is not supported * if ``DAY_BOUNDARY_TZ`` and ``DAY_BOUNDARY_OFFSET`` specified against individual locations of database, then day boundary could be calculated incorrectly. * ``out_of_range_tick_action`` could be only ``exception`` or ``ignore`` Parameters ---------- db: str or :py:class:`otp.DB <onetick.py.DB>` database name or object. symbol: str or Column resulting symbol name string or column to get symbol name from. If this parameter is not set, then ticks _SYMBOL_NAME pseudo-field is used. If it is empty, an attempt is made to retrieve the symbol name from the field named SYMBOL_NAME. tick_type: str or Column resulting tick type string or column to get tick type from. If this parameter is not set, the _TICK_TYPE pseudo-field is used. If it is empty, an attempt is made to retrieve the tick type from the field named TICK_TYPE. date: datetime or None date where to save data. Should be set to `None` if writing to accelerator or memory database. By default, it is set to `otp.config.default_date`. start: datetime or None Start date for data to save. It is inclusive. Cannot be used with ``date`` parameter. Also cannot be used with ``inplace`` set to ``True``. Should be set to `None` if writing to accelerator or memory database. By default, None. end: datetime or None End date for data to save. It is exclusive, so be sure to set it to the next day after the last day in data. Cannot be used with ``date`` parameter. Also cannot be used with ``inplace`` set to ``True``. Should be set to `None` if writing to accelerator or memory database. By default, None. append: bool If False - data will be rewritten for this ``date`` or range of dates (from ``start`` to ``end``), otherwise data will be appended: new symbols are added, existing symbols can be modified (append new ticks, modify existing ticks). This option is not valid for accelerator databases. keep_symbol_and_tick_type: bool keep fields containing symbol name and tick type when writing ticks to the database or propagating them. By default, this parameter is adaptive. If ``symbol`` or ``tick_type`` are column objects, then it's set to True. Otherwise, it's set to False. propagate: bool Propagate ticks after that event processor or not. out_of_range_tick_action: str Action to be executed if tick's timestamp's date is not ``date`` or between ``start`` or ``end``: * `exception`: runtime exception will be raised * `ignore`: tick will not be written to the database * `load`: writes tick to the database anyway. Can be used only with ``date``, not with ``start``+``end``. Default: `exception` timestamp: Column Field that contains the timestamp with which the ticks will be written to the database. By default, the TIMESTAMP pseudo-column is used. keep_timestamp: bool If ``timestamp`` parameter is set and this parameter is set to True, then timestamp column is removed. correction_type: Column The name of the column that contains the correction type. This column will be removed. If this parameter is not set, no corrections will be submitted. replace_existing_time_series: bool If ``append`` is set to True, setting this option to True instructs the loader to replace existing time series, instead of appending to them. Other time series will remain unchanged. allow_concurrent_write: bool Allows different queries running on the same server to load concurrently into the same database. context: str The server context used to look up the database. By default, `otp.config.context` is used if ``use_context_of_query`` is not set. use_context_of_query: bool If this parameter is set to True and the ``context`` parameter is not set, the context of the query is used instead of the default value of the ``context`` parameter. inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise, method returns a new modified object. Cannot be ``True`` if ``start`` and ``end`` are set. kwargs: .. deprecated:: use named parameters instead. Returns ------- :class:`Source` or None See also -------- **WRITE_TO_ONETICK_DB** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data = data.write('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data = otp.DataSource('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 """ if 'append_mode' in kwargs: warnings.warn("Parameter 'append_mode' is deprecated, use 'append'", DeprecationWarning) append = kwargs.pop('append_mode') if 'timestamp_field' in kwargs: warnings.warn("Parameter 'timestamp_field' is deprecated, use 'timestamp'", DeprecationWarning) timestamp = kwargs.pop('timestamp_field') if 'keep_timestamp_field' in kwargs: warnings.warn("Parameter 'keep_timestamp_field' is deprecated, use 'keep_timestamp'", DeprecationWarning) keep_timestamp = kwargs.pop('keep_timestamp_field') if kwargs: raise TypeError(f'write() got unexpected arguments: {list(kwargs)}') kwargs = {} if date is not adaptive and (start or end): raise ValueError('date cannot be used with start+end') if date is adaptive and (start and end) and inplace: # join_with_query and merge are used for multiple dates, so inplace is not supported raise ValueError('cannot run on multiple dates if inplace is True,' ' use one value for date instead of start+end') if (start and not end) or (not start and end): raise ValueError('start and end should be both specified or both None') if date is adaptive: date = configuration.config.default_date if symbol is not None: if isinstance(symbol, _Column): kwargs['symbol_name_field'] = str(symbol) if keep_symbol_and_tick_type is adaptive: keep_symbol_and_tick_type = True else: kwargs.setdefault('symbol_name_field', '_SYMBOL_NAME_FIELD_') self[kwargs['symbol_name_field']] = symbol if tick_type is not None: if isinstance(tick_type, _Column): kwargs['tick_type_field'] = str(tick_type) if keep_symbol_and_tick_type is adaptive: keep_symbol_and_tick_type = True else: kwargs.setdefault('tick_type_field', '_TICK_TYPE_FIELD_') self[kwargs['tick_type_field']] = tick_type if keep_symbol_and_tick_type is adaptive: keep_symbol_and_tick_type = False if timestamp is not None: kwargs['timestamp_field'] = str(timestamp) if correction_type is not None: kwargs['correction_type_field'] = str(correction_type) if context is not adaptive: kwargs['context'] = context elif not use_context_of_query: kwargs['context'] = otp.config.context if out_of_range_tick_action.upper() == 'IGNORE': pass elif out_of_range_tick_action.upper() == 'LOAD': if start and end: raise ValueError('LOAD out_of_range_tick_action cannot be used with start+end, use date instead') elif out_of_range_tick_action.upper() == 'EXCEPTION': if start and end: # WRITE_TO_ONETICK_DB use DAY_BOUNDARY_TZ and DAY_BOUNDARY_OFFSET # to check tick timestamp is out of range or not # so we mimic it here with THROW event processor src = otp.Source(otq.DbShowConfig(str(db), 'DB_TIME_INTERVALS')) src.table(inplace=True, DAY_BOUNDARY_TZ=str, DAY_BOUNDARY_OFFSET=int) # DAY_BOUNDARY_OFFSET offset are in seconds src['DAY_BOUNDARY_OFFSET'] = src['DAY_BOUNDARY_OFFSET'] * 1000 src.rename({ 'DAY_BOUNDARY_TZ': '__DAY_BOUNDARY_TZ', 'DAY_BOUNDARY_OFFSET': '__DAY_BOUNDARY_OFFSET' }, inplace=True) self = self.join_with_query(src, symbol=f"{str(db)}::DUMMY", caching='per_symbol') start_formatted = start.strftime('%Y-%m-%d') end_formatted = end.strftime('%Y-%m-%d') convert_timestamp = self['TIMESTAMP'].dt.strftime('%Y%m%d%H%M%S.%J', timezone=self['__DAY_BOUNDARY_TZ']) start_op = ( otp.dt(start).to_operation(timezone=self['__DAY_BOUNDARY_TZ']) + self['__DAY_BOUNDARY_OFFSET'] ) self.throw( where=(self['TIMESTAMP'] < start_op), message=('Timestamp ' + convert_timestamp + ' of a tick, visible or hidden, ' + f'earlier than {start_formatted} in timezone ' + self['__DAY_BOUNDARY_TZ']), inplace=True, ) end_op = otp.dt(end).to_operation(timezone=self['__DAY_BOUNDARY_TZ']) + self['__DAY_BOUNDARY_OFFSET'] self.throw( where=(self['TIMESTAMP'] >= end_op), message=('Timestamp ' + convert_timestamp + ' of a tick, visible or hidden, ' + f'later than {end_formatted} in timezone ' + self['__DAY_BOUNDARY_TZ']), inplace=True, ) else: raise ValueError(f'Unknown out_of_range_tick_action: {out_of_range_tick_action}.' ' Possible values are: "ignore", "exception"') branches = [] if propagate: branches = [self] kwargs = dict( **kwargs, database=str(db), append_mode=append, keep_symbol_name_and_tick_type=keep_symbol_and_tick_type, keep_timestamp_field=keep_timestamp, replace_existing_time_series=replace_existing_time_series, allow_concurrent_write=allow_concurrent_write, use_context_of_query=use_context_of_query, ) if start and end: days = (end - start).days + 1 for i in range(days): branch = self.copy() branch.sink( otq.WriteToOnetickDb( date=(start + otp.Day(i)).strftime('%Y%m%d'), propagate_ticks=False, out_of_range_tick_action='IGNORE', **kwargs ) ) branches.append(branch) self = otp.merge(branches) else: self.sink( otq.WriteToOnetickDb( date=date.strftime('%Y%m%d') if date else '', # type: ignore[union-attr] propagate_ticks=propagate, out_of_range_tick_action=out_of_range_tick_action.upper(), **kwargs ) ) for col in ('_SYMBOL_NAME_FIELD_', '_TICK_TYPE_FIELD_'): if col in self.schema: self.drop(col, inplace=True) to_drop: Set[str] = set() if not keep_symbol_and_tick_type: to_drop.update((kwargs['symbol_name_field'], kwargs['tick_type_field'])) if not keep_timestamp and timestamp is not None and str(timestamp) not in {'Time', 'TIMESTAMP'}: to_drop.add(str(timestamp)) if correction_type is not None: to_drop.add(str(correction_type)) self.schema.set(**{ k: v for k, v in self.schema.items() if k not in to_drop }) return self
[docs] def copy(self, ep=None, columns=None, deep=False) -> 'Source': """ Build an object with copied calculation graph. Every node of the resulting graph has the same id as in the original. It means that if the original and copied graphs are merged or joined together further then all common nodes (all that created before the .copy() method) will be glued. For example, let's imagine that you have the following calculation graph ``G`` .. graphviz:: digraph { rankdir="LR"; A -> B; } where ``A`` is a source and ``B`` is some operation on it. Then we copy it to the ``G'`` and assign a new operation there .. graphviz:: digraph { rankdir="LR"; A -> B -> C; } After that we decided to merge ``G`` and ``G'``. The resulting calculation graph will be: .. graphviz:: digraph { rankdir="LR"; A -> B -> C -> MERGE; B -> MERGE; } Please use the :meth:`Source.deepcopy` if you want to get the following calculation graph after merges and joins .. graphviz:: digraph { rankdir="LR"; A -> B -> C -> MERGE; "A'" -> "B'" -> "C'" -> MERGE; } Returns ------- Source See Also -------- Source.deepcopy """ if columns is None: columns = self.columns() if ep: result = self.__class__(node=ep, **columns) result.source(self.node().copy_graph()) # we need to clean it, because ep is not a _source result._clean_sources_dates() else: result = self.__class__(node=self.node(), **columns) result.node().add_rules(self.node().copy_rules(deep=deep)) result._set_sources_dates(self) if deep: # generating all new uuids for node history and for sources # after they were initialized keys = defaultdict(uuid.uuid4) # type: ignore result.node().rebuild_graph(keys) result._change_sources_keys(keys) # add state result._copy_state_vars_from(self) result._tmp_otq = self._tmp_otq.copy() result.__name = self.__name #noqa result._copy_properties_from(self) return result
[docs] def deepcopy(self, ep=None, columns=None) -> 'onetick.py.Source': """ Copy all graph and change ids for every node. More details could be found in :meth:`Source.copy` See Also -------- Source.copy """ return self.copy(ep, columns, deep=True)
def _copy_properties_from(self, obj): # needed if we are doing copy of a child with custom properties for attr in set(self.__class__._PROPERTIES) - set(Source._PROPERTIES): setattr(self, attr, getattr(obj, attr)) def _copy_state_vars_from(self, objs): self.__dict__["_state_vars"] = StateVars(self, objs)
[docs] def split(self, expr, cases, default=False) -> Tuple['Source', ...]: """ The method splits data using passed expression ``expr`` for several outputs by passed ``cases``. The method is the alias for the :meth:`Source.switch` Parameters ---------- expr : Operation column or column based expression cases : list list of values or :py:class:`onetick.py.range` objects to split by default : bool ``True`` adds the default output Returns ------- Outputs according to passed cases, number of outputs is number of cases plus one if ``default=True`` See also -------- | :meth:`Source.switch` | **SWITCH** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Source operations.split; >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4]) >>> r1, r2, r3 = data.split(data['X'], [otp.nan, otp.range(0, 100)], default=True) >>> otp.run(r1) Time X 0 2003-12-01 00:00:00.002 NaN >>> otp.run(r2) Time X 0 2003-12-01 00:00:00.000 0.33 1 2003-12-01 00:00:00.003 9.40 >>> otp.run(r3) Time X 0 2003-12-01 00:00:00.001 -5.1 See Also -------- Source.switch :py:class:`onetick.py.range` """ output_num = len(cases) # format cases def to_str(v): if isinstance(v, onetick.py.utils.range): return "[" + str(v.start) + "," + str(v.stop) + "]" elif isinstance(v, str): return '"' + v + '"' elif isinstance(v, tuple): return ",".join(map(to_str, list(v))) return str(v) cases = [f"{to_str(cases[inx])}:OUT{inx}" for inx in range(output_num)] # create ep params = dict(switch=str(expr), cases=";".join(cases)) if default: params["default_output"] = "DEF_OUT" switch = self.copy(ep=otq.SwitchEp(**params)) # construct results result = [] for inx in range(output_num): res = switch.copy() res.node().out_pin(f"OUT{inx}") res.sink(otq.Passthrough()) result.append(res) if default: res = switch.copy() res.node().out_pin("DEF_OUT") res.sink(otq.Passthrough()) result.append(res) return tuple(result)
[docs] def switch(self, expr, cases, default=False) -> Tuple['Source', ...]: """ The method splits data using passed expression for several outputs by passed cases. This method is an alias for :meth:`Source.split` method. Parameters ---------- expr : Operation column or column based expression cases : list list of values or :py:class:`onetick.py.range` objects to split by default : bool ``True`` adds the default output Returns ------- Outputs according to passed cases, number of outputs is number of cases plus one if ``default=True`` See also -------- | :meth:`Source.split` | **SWITCH** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4]) >>> r1, r2, r3 = data.switch(data['X'], [otp.nan, otp.range(0, 100)], default=True) >>> otp.run(r1) Time X 0 2003-12-01 00:00:00.002 NaN >>> otp.run(r2) Time X 0 2003-12-01 00:00:00.000 0.33 1 2003-12-01 00:00:00.003 9.40 >>> otp.run(r3) Time X 0 2003-12-01 00:00:00.001 -5.1 See Also -------- Source.split :py:class:`onetick.py.range` """ return self.split(expr, cases, default)
def columns(self, skip_meta_fields=False): """ Return columns in data source Parameters ---------- skip_meta_fields: bool, default=False do not add meta fields Returns ------- dict """ result = {} for key, value in self.__dict__.items(): if skip_meta_fields and key in self.__class__.meta_fields: continue if key in self.__class__._PROPERTIES: continue if isinstance(value, _Column): result[value.name] = value.dtype return result def drop_columns(self): """ Method removes all columns in the python representation, but don't drop columns on the data. It is used when external query is applied, because we don't know how data schema has changed. """ items = [] for key, value in self.__dict__.items(): if key in self.__class__.meta_fields or key in self.__class__._PROPERTIES: continue if isinstance(value, _Column): items.append(key) for item in items: del self.__dict__[item] def node(self): return self.__node def tick_type(self, tt): self.__node.tick_type(tt) return self def symbol(self, symbol): """ Apply symbol to graph .. deprecated:: 1.3.31 """ warnings.warn("symbol method is deprecated, please specify symbol during creation", DeprecationWarning) self.__node.symbol(symbol) return self def node_name(self, name=None, key=None): return self.__node.node_name(name, key) def __add__(self, other) -> 'Source': return onetick.py.functions.merge([self, other])
[docs] @inplace_operation def table(self, inplace=False, strict: bool = True, **schema) -> Optional['Source']: """ Set the OneTick and python schemas levels according to the ``schema`` parameter. The ``schema`` should contain either (field_name -> type) pairs or (field_name -> default value) pairs; ``None`` means no specified type, and OneTick considers it's as a double type. Resulting ticks have the same order as in the ``schema``. If only partial fields are specified (i.e. when the ``strict=False``) then fields from the ``schema`` have the most left position. Parameters ---------- inplace: bool The flag controls whether operations should be applied inplace strict: bool If set to ``False``, all fields present in an input tick will be present in the output tick. If ``True``, then only fields specified in the ``schema``. schema: field_name -> type or field_name -> default value pairs that should be applied on the source. Returns ------- :class:`Source` or ``None`` See Also -------- | :attr:`Source.schema` | :meth:`__getitem__`: the table shortcut | **TABLE** OneTick event processor Examples -------- Selection case >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.table(X2=int, A1=str) # OTdirective: snippet-name: Arrange.set schema; >>> otp.run(data) Time X2 A1 0 2003-12-01 00:00:00.000 3 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 1 A Defining default values case (note the order) >>> data = otp.Ticks(X=[1, 2, 3]) >>> data = data.table(Y=0.5, strict=False) >>> otp.run(data) Time Y X 0 2003-12-01 00:00:00.000 0.5 1 1 2003-12-01 00:00:00.001 0.5 2 2 2003-12-01 00:00:00.002 0.5 3 """ def is_time_type_or_nsectime(obj): return ott.is_time_type(obj) or isinstance(obj, ott.nsectime) def transformer(name, obj): if obj is None: return name res = f'{ott.type2str(ott.get_object_type(obj))} {name}' if not isinstance(obj, Type) and not is_time_type_or_nsectime(obj): res += f' ({ott.value2str(obj)})' return res def get_type(value): if value is None: return float return ott.get_object_type(value) for c_name in list(schema.keys()): if c_name in self.__class__.meta_fields: # meta fields should not be propagated to Table ep # otherwise new user-defined field with the same name will appear in schema # and using this field will raise an "ambiguous use" error in OneTick warnings.warn(f"Meta field '{c_name}' should not be used in .table() method.") schema.pop(c_name) if not schema: return self schema_to_set = {c_name: get_type(c_value) for c_name, c_value in schema.items()} if strict: self.schema.set(**schema_to_set) else: self.schema.update(**schema_to_set) fields = ','.join([transformer(c_name, c_value) for c_name, c_value in schema.items()]) self.sink(otq.Table(fields=fields, keep_input_fields=not strict)) for c_name, c_value in schema.items(): # datetime and nsetime values require onetick built-in functions to be initialized # but built-in functions can't be used in table ep so updating columns after the table if is_time_type_or_nsectime(c_value): self.update({c_name: c_value}, where=self[c_name] == 0, inplace=True) self._fix_varstrings() return self
def _fix_varstrings(self): """ PY-556: converting to varstring results in string with null-characters """ varstring_columns = { name: self[name] for name, dtype in self.schema.items() if dtype is ott.varstring } # just updating the column removes null-characters if varstring_columns: self.update(varstring_columns, inplace=True)
[docs] @inplace_operation def drop(self, columns: List[Any], inplace=False) -> Optional['Source']: r""" Remove a list of columns from the Source. If column with such name wasn't found the error will be raised, if the regex was specified and there aren't any matched columns, do nothing. Regex is any string containing any of characters ***+?\:[]{}()**, dot is a valid symbol for OneTick identifier, so **a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)** Parameters ---------- columns : str, Column or list of them Column(s) to remove. You could specify a regex or collection of regexes, in such case columns with match names will be deleted. inplace: bool A flag controls whether operation should be applied inplace. If inplace=True, then it returns nothing. Otherwise method returns a new modified object. Returns ---------- :class:`Source` or ``None`` See Also -------- **PASSTHROUGH** OneTick event processor Examples -------- >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.drop("X1") # OTdirective: snippet-name: Arrange.drop.one field; >>> otp.run(data) Time X2 A1 0 2003-12-01 00:00:00.000 3 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 1 A Regexes also could be specified in such case all matched columns will be deleted >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.drop(r"X\d+") # OTdirective: snippet-name: Arrange.drop.regex; >>> otp.run(data) Time A1 0 2003-12-01 00:00:00.000 A 1 2003-12-01 00:00:00.001 A 2 2003-12-01 00:00:00.002 A Several parameters can be specified >>> # OTdirective: snippet-name: Arrange.drop.multiple; >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... Y1=[1, 2, 3], ... Y2=[3, 2, 1], ... YA=["a", "b", "c"], ... A1=["A", "A", "A"]) >>> data = data.drop([r"X\d+", "Y1", data["Y2"]]) >>> otp.run(data) Time YA A1 0 2003-12-01 00:00:00.000 a A 1 2003-12-01 00:00:00.001 b A 2 2003-12-01 00:00:00.002 c A **a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)** >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1], ... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]}) >>> data = data.drop("COLUMN.A") # OTdirective: skip-snippet:; >>> otp.run(data) Time COLUMN1A COLUMN1B COLUMN2A 0 2003-12-01 00:00:00.000 3 a c 1 2003-12-01 00:00:00.001 2 b b 2 2003-12-01 00:00:00.002 1 c a >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1], ... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]}) >>> data = data.drop("(COLUMN.A)") # OTdirective: skip-snippet:; >>> otp.run(data) Time COLUMN1B 0 2003-12-01 00:00:00.000 a 1 2003-12-01 00:00:00.001 b 2 2003-12-01 00:00:00.002 c """ self.__delitem__(columns) return self
[docs] @inplace_operation def dropna(self, how: Literal["any", "all"] = "any", subset: Optional[List[Any]] = None, inplace=False) -> Optional['Source']: """ Drops ticks that contain NaN values according to the policy in the ``how`` parameter Parameters ---------- how: "any" or "all" ``any`` - filters out ticks if at least one field has NaN value ``all`` - filters out ticks if all fields have NaN values. subset: list of str list of columns to check for NaN values. If ``None`` then all columns are checked. inplace: bool the flag controls whether operation should be applied inplace. Returns ------- :class:`Source` or ``None`` Examples -------- Drop ticks where **at least one** field has ``nan`` value. >>> data = otp.Ticks([[ 'X', 'Y'], ... [ 0.0, 1.0], ... [ otp.nan, 2.0], ... [ 4.0, otp.nan], ... [ otp.nan, otp.nan], ... [ 6.0, 7.0]]) >>> data = data.dropna() >>> otp.run(data)[['X', 'Y']] X Y 0 0.0 1.0 1 6.0 7.0 Drop ticks where **all** fields have ``nan`` values. >>> data = otp.Ticks([[ 'X', 'Y'], ... [ 0.0, 1.0], ... [ otp.nan, 2.0], ... [ 4.0, otp.nan], ... [ otp.nan, otp.nan], ... [ 6.0, 7.0]]) >>> data = data.dropna(how='all') >>> otp.run(data)[['X', 'Y']] X Y 0 0.0 1.0 1 NaN 2.0 2 4.0 NaN 3 6.0 7.0 Drop ticks where **all** fields in **subset** of columns have ``nan`` values. >>> data = otp.Ticks([[ 'X', 'Y', 'Z'], ... [ 0.0, 1.0, otp.nan], ... [ otp.nan, 2.0, otp.nan], ... [ 4.0, otp.nan, otp.nan], ... [ otp.nan, otp.nan, otp.nan], ... [ 6.0, 7.0, otp.nan]]) >>> data = data.dropna(how='all', subset=['X', 'Y']) >>> otp.run(data)[['X', 'Y', 'Z']] X Y Z 0 0.0 1.0 NaN 1 NaN 2.0 NaN 2 4.0 NaN NaN 3 6.0 7.0 NaN """ if how not in ["any", "all"]: raise ValueError(f"It is expected to see 'any' or 'all' values for 'how' parameter, but got '{how}'") condition = None columns = self.columns(skip_meta_fields=True) if subset is not None: for column_name in subset: if column_name not in columns: raise ValueError(f"There is no '{column_name}' column in the source") if columns[column_name] is not float: raise ValueError(f"Column '{column_name}' is not float type") for column_name, dtype in columns.items(): if subset is not None and column_name not in subset: continue if dtype is float: if condition is None: condition = self[column_name] != ott.nan else: if how == "any": condition &= self[column_name] != ott.nan elif how == "all": condition |= self[column_name] != ott.nan self.sink(otq.WhereClause(where=str(condition))) return self
def __delitem__(self, obj): if isinstance(obj, list) or isinstance(obj, tuple): objs = obj else: objs = (obj,) items_to_passthrough, regex = self._columns_names_regex(objs, drop=True) self.sink(otq.Passthrough(drop_fields=True, fields=",".join(items_to_passthrough), use_regex=regex)) def _validate_columns_names(self, names_of_columns): # TODO: is it used anywhere else? # we definitely have the same logic of checking columns somewhere else too, need to refactor for item in names_of_columns: if item not in self.__dict__ or not isinstance(self.__dict__[item], _Column): raise AttributeError(f"There is no '{item}' column") def _columns_names_regex(self, objs: Tuple[Union[_Column, str]], drop: bool = False) -> Tuple[List[str], bool]: """ We can't be sure python Source has consistent columns cache, because sinking complex event processors can change columns unpredictable, so if user will specify regex as a param, we will pass regex as an onetick's param, but pass or delete all matched columns from python Source cache. Parameters ---------- objs: Tuple of _Columns or string to pass or drop. String can be regex. drop: bool To drop columns from python schema or not. Returns ------- items_to_passthrough: Items to pass to Passthrough as `field` parameter. regex: Value to pass to Passthrough as `use_regex` parameter. """ items_to_passthrough = [] names_of_columns = [] regex = False for obj in objs: if isinstance(obj, _Column): name: str = obj.name items_to_passthrough.append(name) names_of_columns.append(name) elif isinstance(obj, str): items_to_passthrough.append(obj) if any(c in r"*+?\:[]{}()^$" for c in obj): # it is a possible regex regex = True names_of_columns.extend(col for col in self.columns() if re.match(obj, col)) else: names_of_columns.append(obj) else: raise TypeError(f"It is not supported to select or delete item '{obj}' of type '{type(obj)}'") # remove duplications and meta_fields names_of_columns: set[str] = set(names_of_columns) - set(self.__class__.meta_fields) # type: ignore[no-redef] self._validate_columns_names(names_of_columns) if drop: for item in names_of_columns: del self.__dict__[item] return items_to_passthrough, regex def __from_ep_to_proxy(self, ep): in_pin, out_pin = None, None if isinstance(ep, otq.graph_components.EpBase.PinnedEp): if hasattr(ep, "_output_name"): out_pin = getattr(ep, "_output_name") else: in_pin = getattr(ep, "_input_name") ep = getattr(ep, "_ep") return ep, uuid.uuid4(), in_pin, out_pin
[docs] def sink(self, ep, out_pin=None, move_node=True): """ Appends node inplace to the source. Connect `out_pin` of this source to `ep`. Can be used to connect onetick.query objects to onetick.py source. Data schema changes (added or deleted columns) will not be detected automatically after applying `sink` function, so the user must change the schema himself by updating `schema` property. Parameters ---------- ep: otq.graph_components.EpBase,\ otq.graph_components.EpBase.PinnedEp,\ Tuple[otq.graph_components.EpBase, uuid.uuid4, Optional[str], Optional[str]] onetick.query EP object to append to source. out_pin: Optional[str], default=None name of the out pin to connect to `ep` move_node: bool, default=True Returns ---------- result: otq.graph_components.EpBase, otq.graph_components.EpBase.PinnedEp Last node of the source See Also -------- onetick.py.Source.schema onetick.py.core._source.schema.Schema Examples -------- Adding column 'B' directly with onetick.query EP. >>> data = otp.Tick(A=1) >>> data.sink(otq.AddField(field='B', value=2)) # OTdirective: skip-snippet:; AddField(field='B',value=2) >>> otp.run(data) # OTdirective: skip-snippet:; Time A B 0 2003-12-01 1 2 But we can't use this column with onetick.py methods yet. >>> data['C'] = data['B'] # OTdirective: skip-snippet:; # doctest: +ELLIPSIS Traceback (most recent call last): ... AttributeError: There is no 'B' column We should manually change source's schema >>> data.schema.update(B=int) # OTdirective: skip-snippet:; >>> data['C'] = data['B'] >>> otp.run(data) Time A B C 0 2003-12-01 1 2 2 """ if not ( issubclass(type(ep), otq.graph_components.EpBase) or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp) or type(ep) is tuple ): raise Exception("sinking is allowed only for EpBase instances") if type(ep) is tuple: # for already existed EP fetched from _ProxyNode return self.__node.sink(out_pin, *ep, move_node) else: return self.__node.sink(out_pin, *self.__from_ep_to_proxy(ep), move_node)
def __rshift__(self, ep): """ duplicates sink() method, but returns new object """ new_source = self.copy() new_source.sink(ep) return new_source def __irshift__(self, ep): """ duplicates sink() method, but assigns source new object """ new_source = self.copy() new_source.sink(ep) return new_source def source(self, ep, in_pin=None): """ Add node as source to root node """ if not ( issubclass(type(ep), otq.graph_components.EpBase) or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp) or type(ep) is tuple ): raise Exception("sourcing is allowed only for EpBase instances") if type(ep) is tuple: # for already existed EP fetched from _ProxyNode return self.__node.source(in_pin, *ep) else: return self.__node.source(in_pin, *self.__from_ep_to_proxy(ep)) def source_by_key(self, ep, to_key): """ Add node as source to graph node by key""" if not ( issubclass(type(ep), otq.graph_components.EpBase) or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp) or type(ep) is tuple ): raise Exception("sourcing is allowed only for EpBase instances") if type(ep) is tuple: # for already existed EP fetched from _ProxyNode return self.__node.source_by_key(to_key, *ep) else: return self.__node.source_by_key(to_key, *self.__from_ep_to_proxy(ep)) def apply_query(self, query, in_pin="IN", out_pin="OUT", **params): """ Apply data source to query .. deprecated:: 1.3.77 See Also -------- Source.apply """ warnings.warn( 'The "apply_query" method is deprecated. Please, use the .apply method instead or ' "call a reference queries directly.", DeprecationWarning ) res = onetick.py.functions.apply_query(query, {in_pin: self}, [out_pin], **params) res.node().out_pin(out_pin) return res
[docs] def apply(self, obj) -> Union['otp.Column', 'Source']: """ Apply object to data source. Parameters ---------- obj: onetick.py.query, Callable, type, onetick.query.GraphQuery - `onetick.py.query` allows to apply external nested query - python `Callable` allows to translate python code to similar OneTick's CASE expression. There are some limitations to which python operators can be used in this callable. See :ref:`Python callables parsing guide <python callable parser>` article for details. In :ref:`Remote OTP with Ray<ray-remote>` any `Callable` must be decorated with `@otp.remote` decorator, see :ref:`Ray usage examples<apply-remote-context>` for details. - `type` allows to apply default type conversion - `onetick.query.GraphQuery` allows to apply a build onetick.query.Graph Returns ------- Column, Source Examples -------- Apply external query to a tick flow. In this case it assumes that query has only one input and one output. Check the :class:`query` examples if you want to use a query with multiple inputs or outputs. >>> data = otp.Ticks(X=[1, 2, 3]) >>> external_query = otp.query('update.otq') >>> data = data.apply(external_query) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 2 1 2003-12-01 00:00:00.001 4 2 2003-12-01 00:00:00.002 6 Apply a predicate to a column / operation. In this case value passed to a predicate is column values. Result is a column. >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'].apply(lambda x: x * 2) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 2 1 2003-12-01 00:00:00.001 2 4 2 2003-12-01 00:00:00.002 3 6 Another example of applying more sophisticated operation >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'].apply(lambda x: 1 if x > 2 else 0) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 0 1 2003-12-01 00:00:00.001 2 0 2 2003-12-01 00:00:00.002 3 1 Example of applying a predicate to a Source. In this case value passed to a predicate is a whole tick. Result is a column. >>> data = otp.Ticks(X=[1, 2, 3], Y=[.5, -0.4, .2]) >>> data['Z'] = data.apply(lambda tick: 1 if abs(tick['X'] * tick['Y']) > 0.5 else 0) >>> otp.run(data) Time X Y Z 0 2003-12-01 00:00:00.000 1 0.5 0 1 2003-12-01 00:00:00.001 2 -0.4 1 2 2003-12-01 00:00:00.002 3 0.2 1 See Also -------- :py:class:`onetick.py.query` :py:meth:`onetick.py.Source.script` :ref:`Python callables parsing guide <python callable parser>` """ if isinstance(obj, onetick.py.sources.query): graph = obj.graph_info if len(graph.nested_inputs) != 1: raise Exception( f'It is expected the query "{obj.query_name}" to have one input, but it ' f"has {len(graph.nested_inputs)}" ) if len(graph.nested_outputs) > 1: raise Exception( f'It is expected the query "{obj.query_name}" to have one or no output, but it ' f"has {len(graph.nested_outputs)}" ) in_pin = graph.nested_inputs[0].NESTED_INPUT if len(graph.nested_outputs) == 0: out_pin = None # means no output else: out_pin = graph.nested_outputs[0].NESTED_OUTPUT return obj(**{in_pin: self})[out_pin] elif isinstance(obj, otq.GraphQuery): tmp_file = utils.TmpFile(suffix=".otq") obj.save_to_file( tmp_file.path, "graph_query_to_apply", # start and end times don't matter for this query, use some constants start=db_constants.DEFAULT_START_DATE, end=db_constants.DEFAULT_END_DATE, ) query_info = get_query_info(tmp_file.path) if len(query_info.sources) != 1: raise Exception( "It is expected the query to have one input, but it " f"has {len(query_info.sources)}" ) if len(query_info.sinks) != 1: raise Exception("It is expected the query to have one output, but it " f"has {len(query_info.sinks)}") add_pins( tmp_file.path, "graph_query_to_apply", [(query_info.sources[0], 1, "IN"), (query_info.sinks[0], 0, "OUT")], ) return onetick.py.sources.query(tmp_file.path + "::graph_query_to_apply")(IN=self)["OUT"] return apply_lambda(obj, _EmulateObject(self))
[docs] def dump(self, label: Optional[str] = None, where: Optional['otp.Operation'] = None, columns: Union[str, Tuple[str], List[str], None] = None, callback: Optional[Callable] = None): """ Dumps the ``columns`` from ticks into std::out in runtime if they fit the ``where`` condition. Every dump has a corresponding header that always includes the TIMESTAMP field. Other fields could be configured using the ``columns`` parameter. A header could be augmented with a ``label`` parameter; this label is an addition column that helps to distinguish ticks from multiple dumps with the same schema, because ticks from different dumps could be mixed. It might happen because of the OneTick multithreading, and there is the operating system buffer between the OneTick and the actual output. This method is helpful for debugging. Parameters ---------- label: str A label for a dump. It adds a special column **_OUT_LABEL_** for all ticks and set to the specified value. It helps to distinguish ticks from multiple dumps, because actual output could contain mixed ticks due the concurrency. ``None`` means no label. where: Operation A condition that allows to filter ticks to dump. ``None`` means no filtering. columns: str, tupel or list List of columns that should be in the output. ``None`` means dump all columns. callback : callable Callable, which preprocess source before printing. See also -------- **WRITE_TEXT** OneTick event processor Examples -------- >>> # OTdirective: skip-snippet:; >>> data.dump(label='Debug point', where=data['PRICE'] > 99.3, columns=['PRICE', 'QTY']) # doctest: +SKIP >>> data.dump(columns="X", callback=lambda x: x.first(), label="first") # doctest: +SKIP """ self_c = self.copy() if callback: self_c = callback(self_c) if where is not None: # can't be simplified because the _Operation overrides __bool__ self_c, _ = self_c[(where)] if columns: self_c = self_c[columns if isinstance(columns, (list, tuple)) else [columns]] if label: self_c['_OUT_LABEL_'] = label self_c.sink(otq.WriteText(formats_of_fields='TIMESTAMP=%|' + configuration.config.tz + '|%d-%m-%Y %H:%M:%S.%J', prepend_timestamp=False, prepend_symbol_name=False, propagate_ticks=True)) # print <no data> in case there are 0 ticks self_c = self_c.agg({'COUNT': otp.agg.count()}) self_c, _ = self_c[self_c['COUNT'] == 0] self_c['NO_DATA'] = '<no data>' self_c = self_c[['NO_DATA']] self_c.sink(otq.WriteText(output_headers=False, prepend_timestamp=False, prepend_symbol_name=False, propagate_ticks=True)) # Do not propagate ticks then, because we want just to write them into # the std::out. We have to do that, because otherwise these ticks would # go to a query output, that would mess real output. self_c, _ = self_c[self_c['Time'] != self_c['Time']] # We have to merge the branch back to the main branch even that these # branch does not generate ticks, because we do not introduce one more # output point, because the OneTick would add it to the final output # datastructure. self.sink(otq.Merge(identify_input_ts=False)) self.source(self_c.node().copy_graph()) self.node().add_rules(self_c.node().copy_rules()) self._merge_tmp_otq(self_c)
[docs] def script(self, func: Union[Callable[['Source'], Optional[bool]], str], inplace=False) -> 'Source': # TODO: need to narrow Source object to get rid of undesired methods like aggregations """ Implements a script for every tick. Allows to pass a ``func`` that will be applied per every tick. A ``func`` can be python callable in this case it will be translated to per tick script. In order to use it in Remote OTP with Ray, the function should be decorated with ``@otp.remote``, see :ref:`Ray usage examples<apply-remote-context>` for details. See :ref:`Per Tick Script Guide <python callable parser>` for more detailed description of python to OneTick code translation and per-tick script features. The script written in per tick script language can be passed itself as a string or path to a file with the code. onetick-py doesn't validate the script, but configure the schema accordingly. Parameters ---------- func: callable, str or path - a callable that takes only one parameter - actual tick that behaves like a `Source` instance - or the script on per-tick script language - or a path to file with onetick script Returns ------- :class:`Source` See also -------- **PER_TICK_SCRIPT** OneTick event processor Examples -------- >>> t = otp.Ticks({'X': [1, 2, 3], 'Y': [4, 5, 6]}) >>> def fun(tick): ... tick['Z'] = 0 ... if tick['X'] + tick['Y'] == 5: ... tick['Z'] = 1 ... elif tick['X'] + tick['Y'] * 2 == 15: ... tick['Z'] = 2 >>> t = t.script(fun) >>> otp.run(t) Time X Y Z 0 2003-12-01 00:00:00.000 1 4 1 1 2003-12-01 00:00:00.001 2 5 0 2 2003-12-01 00:00:00.002 3 6 2 See also -------- :py:meth:`onetick.py.Source.apply` :ref:`Per-Tick Script Guide <python callable parser>` """ res = self if inplace else self.copy() changed_tick_lists = {} if callable(func): _new_columns, _script = apply_script(func, _EmulateObject(self)) changed_tick_lists = _EmulateObject.get_changed_tick_lists() elif isinstance(func, str): if os.path.isfile(func): # path to the file with script with open(func) as file: _script = file.read() else: _script = func _new_columns = self._extract_columns_from_script(_script) else: raise ValueError("Wrong argument was specify, please use callable or string with either script on per tick " "language or path to it") res.sink(otq.PerTickScript(script=_script)) res.schema.update(**_new_columns) for tick_list_name, tick_list_schema in changed_tick_lists.items(): res.state_vars[tick_list_name]._schema = tick_list_schema return res
def _extract_columns_from_script(self, script): result = {} supported_types = {'byte', 'short', 'uint', 'long', 'ulong', 'int', 'float', 'double', 'string', 'time32', 'nsectime', 'msectime', 'varstring', 'decimal'} types = r"(?P<type>varstring|byte|short|uint|int|ulong|long|" \ r"float|double|decimal|string|time32|msectime|nsectime|matrix)" length = r"(\[(?P<length>\d+)\])?" name = r"\s+(?P<name>\w+)\s*=\s*" pattern = re.compile(types + length + name) for p in re.finditer(pattern, script): groupdict = p.groupdict() type = ott.str2type(groupdict["type"]) if groupdict["type"] in supported_types else None if type: length = groupdict["length"] if length: length = int(length) if type is str and length != ott.string.DEFAULT_LENGTH: type = ott.string[length] result[groupdict["name"]] = type else: warnings.warn(f"{groupdict['type']} isn't supported for now, so field {groupdict['name']} won't " f"be added to schema.") return result
[docs] def to_symbol_param(self): """ Creates a read-only instance with the same columns except Time. It is used as a result of a first stage query with symbol params. See also -------- :ref:`Symbol Parameters Objects` :ref:`Symbol parameters` Examples -------- >>> symbols = otp.Ticks({'SYMBOL_NAME': ['S1', 'S2'], 'PARAM': ['A', 'B']}) >>> symbol_params = symbols.to_symbol_param() >>> t = otp.DataSource('SOME_DB', tick_type='TT') >>> t['S_PARAM'] = symbol_params['PARAM'] >>> result = otp.run(t, symbols=symbols) >>> result['S1'] Time X S_PARAM 0 2003-12-01 00:00:00.000 1 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 3 A """ return _SymbolParamSource(**self.columns())
@staticmethod def _convert_symbol_to_string(symbol, tmp_otq=None, start=None, end=None, timezone=None): if start is adaptive: start = None if end is adaptive: end = None if isinstance(symbol, Source): symbol = otp.eval(symbol).to_eval_string(tmp_otq=tmp_otq, start=start, end=end, timezone=timezone, operation_suffix='symbol', query_name=None, file_suffix=symbol._name_suffix('symbol.otq')) if isinstance(symbol, onetick.py.sources.query): return symbol.to_eval_string() else: return symbol @staticmethod def _construct_multi_branch_graph(branches): # TODO: add various checks, e.g. that branches have common parts main = branches[0].copy() for branch in branches[1:]: main.node().add_rules(branch.node().copy_rules()) main._merge_tmp_otq(branch) return main def _apply_side_branches(self, side_branches): for side_branch in side_branches: self.node().add_rules(side_branch.node().copy_rules()) self._merge_tmp_otq(side_branch) self.__sources_keys_dates.update(side_branch.__sources_keys_dates) self.__sources_modify_query_times.update(side_branch.__sources_modify_query_times) self.__sources_base_ep_func.update(side_branch.__sources_base_ep_func) self.__sources_symbols.update(side_branch.__sources_symbols) def symbols_for(self, func, *, symbol="SYMBOL_NAME", start="_PARAM_START_TIME_NANOS", end="_PARAM_END_TIME_NANOS"): """ Apply ticks from the current _source as symbols for calculations in the 'func'. Parameters ---------- func: callable A callable object that takes only one parameter, that references to the current symbol. symbol: str The column name that contains symbols. Default is 'SYMBOL_NAME'. The mandatory column. start: str The column name that contains start time for a symbol. Default is '_PARAM_START_TIME_NANOS'. The optional column. end: str The column name that contains end time for a symbol. Default is '_PARAM_END_TIME_NANOS'. The optional column. Returns ------- A multi symbol ticks _source. """ symbols = self.copy() if str(symbol) != "SYMBOL_NAME": symbols["SYMBOL_NAME"] = symbols[str(symbol)] if str(start) != "_PARAM_START_TIME_NANOS": symbols["_PARAM_START_TIME_NANOS"] = symbols[str(start)] if str(end) != "_PARAM_END_TIME_NANOS": symbols["_PARAM_END_TIME_NANOS"] = symbols[str(end)] if "SYMBOL_NAME" not in symbols.columns(): raise Exception("Ticks do not have the SYMBOL_NAME column, but this is mandatory column.") # -------------- # num_params = len(inspect.signature(func).parameters) if num_params == 0: logic = func() elif num_params == 1: logic = func(symbols.to_symbol_param()) else: raise ValueError( f"It is expected only one parameter from the callback, but {num_params} passed" ) # TODO: test this case # -------------- # # Find date range for the logic query # If date range is not specified for at least one _source, then # try to deduce date range - set to adaptive # TODO: we don't need to get common symbol from _get_date_range, # refactor this function, so we don't get an error if default symbol is not set logic_start, logic_end, _ = logic.copy()._get_date_range(default_start=adaptive, default_end=adaptive) if logic_start is adaptive: logic_start = onetick.py.utils.INF_TIME if logic_end is adaptive: logic_end = onetick.py.ZERO_TIME # The same as previous, but for the symbols symbols_start, symbols_end, _ = symbols.copy()._get_date_range(default_start=adaptive, default_end=adaptive) if symbols_start is adaptive: symbols_start = onetick.py.utils.INF_TIME if symbols_end is adaptive: symbols_end = onetick.py.ZERO_TIME # Query interval should be as wider as possible start = min(logic_start, symbols_start) end = max(logic_end, symbols_end) # If nothing is specified, then just use default if start == onetick.py.utils.INF_TIME: start = None if end == onetick.py.ZERO_TIME: end = None return _MultiSymbolsSource(symbols.copy(), logic, start=start, end=end) @staticmethod def _columns_to_params_for_joins(columns, query_params=False): """ Converts a dictionary of columns into a parameters string. This is mainly used for join_with_query and join_with_collection. query_params control whether resulting string should be considered query params or symbol params (as it impacts some of the conversion rules) """ params_list = [] for key, value in columns.items(): dtype = ott.get_object_type(value) convert_rule = "'" + key + "=' + " if key == '_PARAM_SYMBOL_TIME' and not query_params: # this symbol parameter has to be formatted differently because Onetick treats this parameter # in a special way if dtype is otp.nsectime: convert_rule += f'NSECTIME_FORMAT("%Y%m%d%H%M%S.%J",{ott.value2str(value)},_TIMEZONE)' elif dtype is str: if are_strings(getattr(value, "dtype", None)): convert_rule += str(value) else: convert_rule += '"' + value + '"' else: raise ValueError('Parameter symbol_time has to be a datetime value!') elif dtype is str: if are_strings(getattr(value, "dtype", None)): convert_rule += str(value) else: convert_rule += '"' + value + '"' elif dtype is otp.msectime: convert_rule += "tostring(GET_MSECS(" + str(value) + "))" elif dtype is otp.nsectime: if key == '_SYMBOL_TIME' and query_params: # hack to support passing _SYMBOL_TIME to called query as a parameter warnings.warn('Query parameter _SYMBOL_TIME passed to join_with_query! ' 'This is deprecated. Please use a dedicated `symbol_time` parameter of the ' 'join_with_query function') convert_rule += "tostring(GET_MSECS(" + str(value) + "))" elif query_params: # this can be used for query params but cannot be used for symbol params # overall it's better convert_rule += "'NSECTIME('+tostring(NSECTIME_TO_LONG(" + str(value) + "))+')'" else: # this matches the common way onetick converts nanoseconds to symbol parameters convert_rule += "tostring(GET_MSECS(" + str(value) + \ "))+'.'+SUBSTR(NSECTIME_FORMAT('%J'," + str(value) + ",_TIMEZONE),3,6)" else: convert_rule += "tostring(" + str(value) + ")" params_list.append(convert_rule) return "+','+".join(params_list) @staticmethod def _get_default_fields_for_outer_join_str(default_fields_for_outer_join, how, sub_source_schema): """ Default fields for outer join definition. Used by join_with_query() and join_with_collection() """ default_fields_for_outer_join_str = '' if default_fields_for_outer_join: if how != 'outer': raise ValueError('The `default_fields_for_outer_join` parameter can be used only for outer join') for field, expr in default_fields_for_outer_join.items(): if field not in sub_source_schema.keys(): raise KeyError(f'Field {field} is specified in `default_fields_for_outer_join` parameter, ' 'but is not present in the joined source schema!') if default_fields_for_outer_join_str != '': default_fields_for_outer_join_str += ',' default_fields_for_outer_join_str += f'{field}={ott.value2str(expr)}' return default_fields_for_outer_join_str
[docs] def join_with_collection( self, collection_name, query_func=None, how="outer", params=None, start=None, end=None, prefix=None, caching=None, keep_time=None, default_fields_for_outer_join=None, ) -> 'Source': """ For each tick uses ``query_func`` to join ticks from ``collection_name`` tick collection (tick set, unordered tick set, tick list, or tick deque). Parameters ---------- collection_name: str Name of the collection state variable from which to join ticks. Collections are the following types: :py:class:`TickSet <onetick.py.core._internal._state_objects.TickSet>`, :py:class:`TickSetUnordered <onetick.py.core._internal._state_objects.TickSetUnordered>`, :py:class:`TickList <onetick.py.core._internal._state_objects.TickList>` and :py:class:`TickDeque <onetick.py.core._internal._state_objects.TickDeque>`. query_func: callable Callable ``query_func`` should return :class:`Source`. If passed, this query will be used on ticks from collection before joining them. In this case, ``query_func`` object will be evaluated by OneTick (not python) for every input tick. Note that python code will be executed only once, so all python's conditional expressions will be evaluated only once too. Callable should have ``source`` parameter. When callable is called, this parameter will have value of a :class:`Source` object representing ticks loaded directly from the collection. Any operation applied to this source will be applied to ticks from the collection before joining them. Also, callable should have the parameters with names from ``params`` if they are specified in this method. If ``query_func`` is not passed, then all ticks from the collection will be joined. how: 'inner', 'outer' Type of join. If **inner**, then output tick is propagated only if some ticks from the collection were joined to the input tick. params: dict Mapping of the parameters' names and their values for the ``query_func``. :py:class:`Columns <onetick.py.Column>` can be used as a value. start: datetime, Operation Start time to select ticks from collection. If specified, only ticks in collection that have higher or equal timestamp will be processed. If not passed, then there will be no lower time bound for the collection ticks. This means that even ticks with TIMESTAMP lower than _START_TIME of the main query will be joined. end: datetime, Operation End time to select ticks from collection. If specified, only ticks in collection that have lower timestamp will be processed. If not passed, then there will be no upper time bound for the collection ticks. This means that even ticks with TIMESTAMP higher than _END_TIME of the main query will be joined. prefix : str Prefix for the names of joined tick fields. caching : str If `None` caching is disabled. You can specify caching by using values: * 'per_symbol': cache is different for each symbol. keep_time : str Name for the joined timestamp column. `None` means no timestamp column will be joined. default_fields_for_outer_join : dict When you use outer join, all output ticks will have fields from the schema of the joined source. If nothing was joined to a particular output tick, these fields will have default values for their type. This parameter allows to override the values that would be added to ticks for which nothing was joined. Dictionary keys should be field names, and dictionary values should be constants or :class:`Operation` expressions Returns ------- :class:`Source` Source with joined ticks from ``collection_name`` See also -------- **JOIN_WITH_COLLECTION_SUMMARY** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Special functions.join with collection.without query; >>> src = otp.Tick(A=1) >>> src.state_vars['TICK_SET'] = otp.state.tick_set('LATEST_TICK', 'B', otp.eval(otp.Tick(B=1, C='STR'))) >>> src = src.join_with_collection('TICK_SET') >>> otp.run(src)[["A", "B", "C"]] A B C 0 1 1 STR >>> # OTdirective: snippet-name: Special functions.join with collection.with query and params; >>> src = otp.Ticks(A=[1, 2, 3, 4, 5], ... B=[2, 2, 3, 3, 3]) >>> src.state_vars['TICK_LIST'] = otp.state.tick_list() >>> def fun(tick): tick.state_vars['TICK_LIST'].push_back(tick) >>> src = src.script(fun) >>> >>> def join_fun(source, param_b): ... source = source.agg(dict(VALUE=otp.agg.sum(source['A']))) ... source['VALUE'] = source['VALUE'] + param_b ... return source >>> >>> src = src.join_with_collection('TICK_LIST', join_fun, params=dict(param_b=src['B'])) >>> otp.run(src)[["A", "B", "VALUE"]] A B VALUE 0 1 2 3 1 2 2 5 2 3 3 9 3 4 3 13 4 5 3 18 Join last standing quote from each exchange to trades: >>> # OTdirective: snippet-name: Special functions.join with collection.standing quotes per exchange; >>> trd = otp.Ticks(offset=[1000, 2000, 3000, 4000, 5000], ... PRICE=[10.1, 10.2, 10.15, 10.23, 10.4], ... SIZE=[100, 50, 100, 60, 200]) >>> >>> qte = otp.Ticks(offset=[500, 600, 1200, 2500, 3500, 3600, 4800], ... EXCHANGE=['N', 'C', 'Q', 'Q', 'C', 'N', 'C'], ... ASK_PRICE=[10.2, 10.18, 10.18, 10.15, 10.31, 10.32, 10.44], ... BID_PRICE=[10.1, 10.17, 10.17, 10.1, 10.23, 10.31, 10.4]) >>> >>> trd['TICK_TYPE'] = 'TRD' >>> qte['TICK_TYPE'] = 'QTE' >>> >>> trd_qte = trd + qte >>> trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'] = otp.state.tick_set('LATEST', 'EXCHANGE') >>> del trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].schema['TICK_TYPE'] >>> del trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].schema['PRICE'] >>> del trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].schema['SIZE'] >>> >>> trd_qte = trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].update(where=trd_qte['TICK_TYPE'] == 'QTE', ... value_fields=['ASK_PRICE', 'BID_PRICE']) >>> trd, _ = trd_qte[trd_qte['TICK_TYPE'] == 'TRD'] >>> trd.drop(['ASK_PRICE', 'BID_PRICE', 'EXCHANGE'], inplace=True) >>> trd = trd.join_with_collection('LAST_QUOTE_PER_EXCHANGE') >>> otp.run(trd)[['PRICE', 'SIZE', 'EXCHANGE', 'ASK_PRICE', 'BID_PRICE']] PRICE SIZE EXCHANGE ASK_PRICE BID_PRICE 0 10.10 100 N 10.20 10.10 1 10.10 100 C 10.18 10.17 2 10.20 50 N 10.20 10.10 3 10.20 50 C 10.18 10.17 4 10.20 50 Q 10.18 10.17 5 10.15 100 N 10.20 10.10 6 10.15 100 C 10.18 10.17 7 10.15 100 Q 10.15 10.10 8 10.23 60 N 10.32 10.31 9 10.23 60 C 10.31 10.23 10 10.23 60 Q 10.15 10.10 11 10.40 200 N 10.32 10.31 12 10.40 200 C 10.44 10.40 13 10.40 200 Q 10.15 10.10 """ # check that passed collection is good if collection_name not in self.state_vars.names: raise KeyError(f'Collection with name {collection_name} is not in the list of available state variables') if not isinstance(self.state_vars[collection_name], _TickSequence): raise ValueError(f'State variable {collection_name} is not a tick collection! ' 'Only TickSet, TickSetUnordered, TickList and TickDeque objects are supported ' 'as data sources for join_with_collection') if params is None: params = {} special_params = ('source', '__fixed_start_time', '__fixed_end_time') for sp_param in special_params: if sp_param in params.keys(): raise ValueError(f'Parameter name "{sp_param}" is special and cannot be used for params ' 'of join_with_collection function. Please, select a different name.') # JOIN_WITH_COLLECTION_SUMMARY has START_TIME and END_TIME parameters with the precision of millisecond. # So, here we add a workaround on onetick.py side to support nsectime precision # "start" and "end" parameters of the EP are kept as they may be necessary # for performance reasons if start is not None: params['__fixed_start_time'] = start start = start - otp.Milli(1) if end is not None: params['__fixed_end_time'] = end end = end + otp.Milli(1) # prepare temporary file # ------------------------------------ # # TODO: this should be a common code somewhere collection_schema = { key: value for key, value in self.state_vars[collection_name].schema.items() if key not in self.__class__.meta_fields and key not in self.__class__._PROPERTIES } join_source_root = otp.DataSource(db=otp.config.default_db, tick_type="ANY", schema_policy="manual", **collection_schema) if query_func is None: query_func = lambda source: source # noqa converted_params = prepare_params(**params) fixed_start_time = None fixed_end_time = None if '__fixed_start_time' in converted_params.keys(): fixed_start_time = converted_params['__fixed_start_time'] del converted_params['__fixed_start_time'] if '__fixed_end_time' in converted_params.keys(): fixed_end_time = converted_params['__fixed_end_time'] del converted_params['__fixed_end_time'] sub_source = query_func(source=join_source_root, **converted_params) if fixed_start_time is not None: sub_source = sub_source[sub_source['TIMESTAMP'] >= fixed_start_time][0] if fixed_end_time is not None: sub_source = sub_source[sub_source['TIMESTAMP'] < fixed_end_time][0] sub_source = self._process_keep_time_param(keep_time, sub_source) params_str = self._columns_to_params_for_joins(params, query_params=True) sub_source_schema = sub_source.schema.copy() columns = {} columns.update(self._get_columns_with_prefix(sub_source, prefix)) columns.update(self.columns(skip_meta_fields=True)) res = self.copy(columns=columns) res._merge_tmp_otq(sub_source) query_name = sub_source._store_in_tmp_otq( res._tmp_otq, symbols='_NON_EXISTING_SYMBOL_', operation_suffix="join_with_collection" ) # ------------------------------------ # default_fields_for_outer_join_str = self._get_default_fields_for_outer_join_str( default_fields_for_outer_join, how, sub_source_schema ) join_params = dict( collection_name=str(self.state_vars[collection_name]), otq_query=f'"THIS::{query_name}"', join_type=how.upper(), otq_query_params=params_str, default_fields_for_outer_join=default_fields_for_outer_join_str, ) self._fill_aux_params_for_joins( join_params, caching, end, prefix, start, symbol_name=None, timezone=None, join_with_collection=True ) res.sink(otq.JoinWithCollectionSummary(**join_params)) res._add_table() res.sink(otq.Passthrough(fields="TIMESTAMP", drop_fields=True)) return res
[docs] def join_with_query( self, query, how="outer", symbol=None, params=None, start=None, end=None, timezone=None, prefix=None, caching=None, keep_time=None, where=None, default_fields_for_outer_join=None, symbol_time=None, concurrency=None, **kwargs, ) -> 'Source': """ For each tick executes ``query``. Parameters ---------- query: callable, Source Callable ``query`` should return :class:`Source`. This object will be evaluated by OneTick (not python) for every tick. Note python code will be executed only once, so all python's conditional expressions will be evaluated only once too. Callable should have ``symbol`` parameter and the parameters with names from ``params`` if they are specified in this method. If ``query`` is a :class:`Source` object then it will be propagated as a query to OneTick. how: 'inner', 'outer' Type of join. If **inner**, then each tick is propagated only if its ``query`` execution has a non-empty result. params: dict Mapping of the parameters' names and their values for the ``query``. :py:class:`Columns <onetick.py.Column>` can be used as a value. symbol: str, Operation, dict, Source, or Tuple[Union[str, Operation], Union[dict, Source]] Symbol name to use in ``query``. In addition, symbol params can be passed along with symbol name. Symbol name can be passed as a string or as an :class:`Operation`. Symbol parameters can be passed as a dictionary. Also, the main :class:`Source` object, or the object containing a symbol parameter list, can be used as a list of symbol parameter. Special symbol parameters (`_PARAM_START_TIME_NANOS` and `_PARAM_END_TIME_NANOS`) will be ignored and will not be propagated to ``query``. ``symbol`` will be interpreted as a symbol name or as symbol parameters, depending on its type. You can pass both as a tuple. If symbol name is not passed, then symbol name from the main source is used. start: datetime, Operation Start time of ``query``. By default, start time of the main source is used. end: datetime, Operation End time of ``query``. By default, end time of the main source is used. start_time: .. deprecated:: 1.48.4 The same as ``start``. end_time: .. deprecated:: 1.48.4 The same as ``end``. timezone : Optional, str Timezone of ``query``. By default, timezone of the main source is used. prefix : str Prefix for the names of joined tick fields. caching : str If `None` caching is disabled. You can specify caching by using values: * 'cross_symbol': cache is the same for all symbols * 'per_symbol': cache is different for each symbol. keep_time : str Name for the joined timestamp column. `None` means no timestamp column will be joined. where : Operation Condition to filter ticks for which the result of the ``query`` will be joined. default_fields_for_outer_join : dict When you use outer join, all output ticks will have fields from the schema of the joined source. If nothing was joined to a particular output tick, these fields will have default values for their type. This parameter allows to override the values that would be added to ticks for which nothing was joined. Dictionary keys should be field names, and dictionary values should be constants or :class:`Operation` expressions symbol_time : datetime, Operation Time that will be used by Onetick to map the symbol with which ``query`` is executed to the reference data. This parameter is only necessary if the query is expected to perform symbology conversions. concurrency : int Specifies number of threads for asynchronous processing of ``query`` per unbound symbol list. By default, the number of threads is 1. Returns ------- :class:`Source` Source with joined ticks from ``query`` See also -------- **JOIN_WITH_QUERY** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Special functions.join with query.with an otp data source; >>> d = otp.Ticks(Y=[-1]) >>> d = d.update(dict(Y=1), where=(d.Symbol.name == "a")) >>> data = otp.Ticks(X=[1, 2], ... S=["a", "b"]) >>> res = data.join_with_query(d, how='inner', symbol=data['S']) >>> otp.run(res)[["X", "Y", "S"]] X Y S 0 1 1 a 1 2 -1 b >>> d = otp.Ticks(ADDED=[-1]) >>> d = d.update(dict(ADDED=1), where=(d.Symbol.name == "3")) # symbol name is always string >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(d, how='inner', symbol=(data['A'] + data['B'])) # OTdirective: skip-snippet:; >>> df = otp.run(res) >>> df[["A", "B", "ADDED"]] A B ADDED 0 1 2 1 1 2 4 -1 Constants as symbols are also supported >>> d = otp.Ticks(ADDED=[d.Symbol.name]) >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(d, how='inner', symbol=1) # OTdirective: skip-snippet:; >>> df = otp.run(res) >>> df[["A", "B", "ADDED"]] A B ADDED 0 1 2 1 1 2 4 1 Function object as query is also supported (Note it will be executed only once in python's code) >>> def func(symbol): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST'] ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B'], dict(PREF="_", POST="$"))) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _three$ 1 2 4 _six$ It's possible to pass the source itself as a list of symbol parameters, which will make all of its fields accessible through the "symbol" object >>> def func(symbol): ... d = otp.Ticks(TYPE=["six"]) ... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST'] ... return d >>> # OTdirective: snippet-name: Source operations.join with query.source as symbol; >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["_", "$"], POST=["$", "_"]) >>> res = data.join_with_query(func, how='inner', symbol=data) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _six$ 1 2 4 $six_ The examples above can be rewritten by using onetick query parameters instead of symbol parameters. OTQ parameters are global for query, while symbol parameters can be redefined by bound symbols. >>> def func(symbol, pref, post): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = pref + d["TYPE"] + post ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params; >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']), ... params=dict(pref="_", post="$")) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _three$ 1 2 4 _six$ Some or all onetick query parameters can be column or expression also >>> def func(symbol, pref, post): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = pref + d["TYPE"] + post ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params from fields; # noqa >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["^", "_"], POST=["!", "$"]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']), ... params=dict(pref=data["PREF"] + ".", post=data["POST"])) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 ^.three! 1 2 4 _.six$ You can specify start and end time of the query to select specific ticks from db >>> # OTdirective: snippet-name: Special functions.join with query.passing start/end times; >>> d = otp.Ticks(Y=[1, 2]) >>> data = otp.Ticks(X=[1, 2]) >>> start = datetime(2003, 12, 1, 0, 0, 0, 1000, tzinfo=pytz.timezone("EST5EDT")) >>> end = datetime(2003, 12, 1, 0, 0, 0, 3000, tzinfo=pytz.timezone("EST5EDT")) >>> res = data.join_with_query(d, how='inner', start=start, end=end) >>> otp.run(res) Time Y X 0 2003-12-01 00:00:00.000 1 1 1 2003-12-01 00:00:00.000 2 1 2 2003-12-01 00:00:00.001 1 2 3 2003-12-01 00:00:00.001 2 2 Use keep_time param to keep or rename original timestamp column >>> # OTdirective: snippet-name: Special functions.join with query.keep the timestamps of the joined ticks; >>> d = otp.Ticks(Y=[1, 2]) >>> data = otp.Ticks(X=[1, 2]) >>> res = data.join_with_query(d, how='inner', keep_time="ORIG_TIME") >>> otp.run(res) Time Y ORIG_TIME X 0 2003-12-01 00:00:00.000 1 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 2 2003-12-01 00:00:00.001 1 2 2003-12-01 00:00:00.001 1 2003-12-01 00:00:00.000 2 3 2003-12-01 00:00:00.001 2 2003-12-01 00:00:00.001 2 """ # TODO: check if join_with_query checks schema of joined source against primary source, # by itself or with process_by_group if params is None: params = {} # "symbol" parameter can contain a symbol name (string, field, operation etc), # a symbol parameter list (dict, Source, _SymbolParamSource), # or both together as a tuple def _check_and_convert_symbol(symbol): if isinstance(symbol, _Operation): # TODO: PY-35 return True, f"tostring({symbol})" elif isinstance(symbol, str): return True, f"'{symbol}'" elif type(symbol) in {int, float}: # constant return True, f"tostring({symbol})" elif symbol is None: # this is necessary to distinguish None (which is valid value for symbol) from invalid values return True, None else: return False, None def _convert_symbol_param_and_columns(symbol_param): """ We need to create two objects from a symbol param (a dict, a Source or a _SymbolParamSource): 1. Dictionary of columns to generate list of symbol parameters for the JOIN_WITH_QUERY EP 2. _SymbolParamSource object to pass to the source function if necessary """ if isinstance(symbol_param, dict): converted_symbol_param_columns = symbol_param converted_symbol_param = _SymbolParamSource(**{key: ott.get_object_type(column) for key, column in symbol_param.items()}) elif isinstance(symbol_param, _Source): converted_symbol_param_columns = {field_name: symbol_param[field_name] for field_name in symbol_param.columns(skip_meta_fields=True).keys()} converted_symbol_param = symbol_param.to_symbol_param() elif isinstance(symbol_param, _SymbolParamSource): converted_symbol_param_columns = {field_name: symbol_param[field_name] for field_name in symbol_param.schema.keys()} converted_symbol_param = symbol_param else: return None, None # we want to pass all the fields to the joined query as symbol parameters, # except for some special fields that would override explicitly set parameters ignore_symbol_fields = [ '_PARAM_START_TIME_NANOS', '_PARAM_END_TIME_NANOS', ] filtered_converted_symbol_param_columns = {} for field_name, field_value in converted_symbol_param_columns.items(): if field_name in ignore_symbol_fields: warnings.warn(f'Special symbol parameter "{field_name}" was passed to the joined query! ' 'This parameter would be ignored. Please, use parameters of the `join_with_query` ' 'function itself to set it.') else: filtered_converted_symbol_param_columns[field_name] = field_value filtered_converted_symbol_param = _SymbolParamSource( **{field_name: field_value for field_name, field_value in converted_symbol_param.schema.items() if field_name not in ignore_symbol_fields} ) return filtered_converted_symbol_param_columns, filtered_converted_symbol_param # if "symbol" is tuple, we unpack it if isinstance(symbol, tuple) and len(symbol) == 2: symbol_name, symbol_param = symbol else: # see if "symbol" contains symbol name or symbol params is_symbol, _ = _check_and_convert_symbol(symbol) if is_symbol: symbol_name = symbol symbol_param = {} else: symbol_name = None symbol_param = symbol _, converted_symbol_name = _check_and_convert_symbol(symbol_name) # default symbol name should be this: _SYMBOL_NAME if it is not empty else _NON_EXISTING_SYMBOL_ # this way we will force JWQ to substitute symbol with any symbol parameters we may have passed # otherwise (if an empty symbol name is passed to JWQ), it will not substitute either symbol name # or symbol parameters, and so symbol parameters may get lost # see BDS-263 if converted_symbol_name is None: converted_symbol_name = "CASE(_SYMBOL_NAME,'','_NON_EXISTING_SYMBOL',_SYMBOL_NAME)" converted_symbol_param_columns, converted_symbol_param = _convert_symbol_param_and_columns(symbol_param) if converted_symbol_param is None: # we couldn't interpret "symbols" as either symbol name or symbol parameters raise ValueError('"symbol" parameter has a wrong format! It should be a symbol name, a symbol parameter ' 'object (dict or Source), or a tuple containing both') # adding symbol time if '_PARAM_SYMBOL_TIME' in converted_symbol_param_columns.keys(): warnings.warn('"_PARAM_SYMBOL_TIME" explicitly passed among join_with_query symbol parameters! ' 'This is deprecated - please use symbol_time parameter instead. ' 'If you specify symbol_time parameter, it will override the explicitly passed value') if symbol_time is not None: if ott.get_object_type(symbol_time) is not otp.nsectime and ott.get_object_type(symbol_time) is not str: raise ValueError(f'Parameter of type {ott.get_object_type(symbol_time)} passed as symbol_time! ' 'This parameter only supports datetime values or strings') converted_symbol_param_columns['_PARAM_SYMBOL_TIME'] = symbol_time # prepare temporary file # ------------------------------------ # converted_params = prepare_params(**params) if isinstance(query, Source): sub_source = query else: # inspect function # ------- sig = inspect.signature(query) if "symbol" in sig.parameters: if "symbol" in converted_params.keys(): raise AttributeError('"params" contains key "symbol", which is reserved for symbol parameters. ' 'Please, rename this parameter to another name') converted_params["symbol"] = converted_symbol_param # type: ignore sub_source = query(**converted_params) sub_source = self._process_keep_time_param(keep_time, sub_source) if not sub_source._is_unbound_required(): sub_source += onetick.py.sources.Empty() params_str = self._columns_to_params_for_joins(params, query_params=True) symbol_params_str = self._columns_to_params_for_joins(converted_symbol_param_columns) sub_source_schema = sub_source.schema.copy() columns = {} columns.update(self._get_columns_with_prefix(sub_source, prefix)) columns.update(self.columns(skip_meta_fields=True)) res = self.copy(columns=columns) res._merge_tmp_otq(sub_source) query_name = sub_source._store_in_tmp_otq( res._tmp_otq, symbols='_NON_EXISTING_SYMBOL_', operation_suffix="join_with_query" ) # TODO: combine with _convert_symbol_to_string # ------------------------------------ # if where is not None and how != 'outer': raise ValueError('The `where` parameter can be used only for outer join') default_fields_for_outer_join_str = self._get_default_fields_for_outer_join_str( default_fields_for_outer_join, how, sub_source_schema ) join_params = dict( otq_query=f'"THIS::{query_name}"', join_type=how.upper(), otq_query_params=params_str, symbol_params=symbol_params_str, where=str(where._make_python_way_bool_expression()) if where is not None else '', default_fields_for_outer_join=default_fields_for_outer_join_str, ) if concurrency is not None: if type(concurrency) is not int or concurrency <= 0: raise ValueError('Wrong value of concurrency parameter passed! ' 'concurrency should be a positive integer') join_params['shared_thread_count'] = concurrency start_time = kwargs.get('start_time', start) end_time = kwargs.get('end_time', end) self._fill_aux_params_for_joins( join_params, caching, end_time, prefix, start_time, converted_symbol_name, timezone ) res.sink(otq.JoinWithQuery(**join_params)) res._add_table() res.sink(otq.Passthrough(fields="TIMESTAMP", drop_fields=True)) return res
@property def state_vars(self) -> StateVars: """ Provides access to state variables Returns ------- State Variables: Dict[str, state variable] State variables, you can access one with its name. See Also -------- | `State Variables \ <../../static/getting_started/variables_and_data_structures.html#variables-and-data-structures>`_ | **DECLARE_STATE_VARIABLES** OneTick event processor """ return self.__dict__['_state_vars'] __invalid_query_name_symbols_regex = re.compile('[^a-zA-Z0-9_]') def __remove_invalid_symbols(self, s): """ Replaces symbols that cannot be put in query names with '_' """ return self.__invalid_query_name_symbols_regex.sub('_', s) def get_name(self, remove_invalid_symbols=False): """ Returns source name. If remove_invalid_symbols == True, returned name only contains symbols that can be put in query names. """ if remove_invalid_symbols and self.__name: return self.__remove_invalid_symbols(self.__name) else: return self.__name def set_name(self, new_name): """ Sets source name. Source name cannot be an empty string but can be None. """ assert isinstance(new_name, str) or new_name is None, "Source name must be a string or None." if new_name is not None: assert new_name != '', "Source name must be a non-empty string." self.__name = new_name def _name_suffix(self, suffix, separator='.', remove_invalid_symbols=False): if remove_invalid_symbols: suffix = self.__remove_invalid_symbols(suffix) separator = self.__remove_invalid_symbols(separator) name = self.get_name(remove_invalid_symbols=True) else: name = self.__name return f'{separator}{name}{separator}{suffix}' if name else f'{separator}{suffix}' @property def schema(self) -> Schema: """ Represents actual python data schema in the column-name -> type format. For example, could be used after the :meth:`Source.sink` to adjust the schema. Returns ------- Schema See Also -------- Source.sink Examples -------- >>> data = otp.Ticks([['X', 'Y', 'Z'], ... [ 1, 0.5, 'abc']]) >>> data['T'] = data['Time'] >>> data.schema {'X': <class 'int'>, 'Y': <class 'float'>, 'Z': <class 'str'>, 'T': <class 'onetick.py.types.nsectime'>} >>> data.schema['X'] <class 'int'> >>> data.schema['X'] = float >>> data.schema['X'] <class 'float'> >>> 'W' in data.schema False >>> data.schema['W'] = otp.nsectime >>> 'W' in data.schema True >>> data.schema['W'] <class 'onetick.py.types.nsectime'> """ schema = self.columns(skip_meta_fields=True) hidden_columns = {'Time': ott.nsectime, 'TIMESTAMP': ott.nsectime} return Schema(_base_source=self, _hidden_columns=hidden_columns, **schema) def set_schema(self, **kwargs): """ Set schema of the source. Note: this method affect python part only and won't make any db queries. It used to set schema after db reading/ complex query. .. deprecated:: please use the :property:`Source.schema` to access and adjust the schema. Parameters ---------- kwargs schema in the column_name=type format Examples -------- Python can't follow low level change of column, e.g. complex query or pertick script can be sink. >>> data = otp.Ticks(dict(A=[1, 2], B=["a", "b"])) >>> data.sink(otq.AddField(field='Z', value='5')) # doctest: +SKIP >>> data.columns(skip_meta_fields=True) {'A': <class 'int'>, 'B': <class 'str'>} >>> # OTdirective: snippet-name: Arrange.schema.set; >>> data.set_schema(A=int, B=str, Z=int) >>> data.columns(skip_meta_fields=True) {'A': <class 'int'>, 'B': <class 'str'>, 'Z': <class 'int'>} """ self.drop_columns() for name, dtype in kwargs.items(): dtype = ott.get_source_base_type(dtype) self._set_field_by_tuple(name, dtype) def _process_keep_time_param(self, keep_time, sub_source): if keep_time == "TIMESTAMP": raise ValueError("TIMESTAMP is reserved OneTick name, please, specify another one.") if keep_time in self.columns(): raise ValueError(f"{keep_time} column is already presented.") sub_source = sub_source.copy() if keep_time: sub_source[keep_time] = sub_source["Time"] return sub_source def _get_columns_with_prefix(self, sub_source, prefix) -> dict: sub_source_columns = sub_source.schema if prefix is None: prefix = "" if not isinstance(prefix, str): raise ValueError("Only string constants are supported for now.") new_columns = {prefix + name: dtype for name, dtype in sub_source_columns.items()} same_names = set(new_columns) & set(self.schema) if same_names: raise ValueError(f"After applying prefix some columns aren't unique: {', '.join(same_names)}.") return new_columns def _fill_aux_params_for_joins( self, join_params, caching, end_time, prefix, start_time, symbol_name, timezone, join_with_collection=False ): if symbol_name and not join_with_collection: join_params["symbol_name"] = symbol_name if prefix is not None: join_params["prefix_for_output_ticks"] = str(prefix) if caching: if join_with_collection: supported = ("per_symbol",) else: supported = ("cross_symbol", "per_symbol") if caching in supported: join_params["caching_scope"] = caching else: raise ValueError(f"Unknown value for caching param, please use None or any of {supported}.") self._fill_time_param_for_jwq(join_params, start_time, end_time, timezone) if join_with_collection: del join_params['timezone'] def _fill_time_param_for_jwq(self, join_params, start_time, end_time, timezone): self._process_start_or_end_of_jwq(join_params, start_time, "start_timestamp") self._process_start_or_end_of_jwq(join_params, end_time, "end_timestamp") if timezone: join_params["timezone"] = f"'{timezone}'" else: join_params["timezone"] = "_TIMEZONE" # this may break something, need to test def _process_start_or_end_of_jwq(self, join_params, time, param_name): if time is not None: if isinstance(time, (datetime, otp.dt)): join_params[f"{param_name}"] = time.timestamp() * 1000 elif isinstance(time, _Operation): join_params[f"{param_name}"] = str(time) else: raise ValueError(f"{param_name} should be datetime.datetime instance or OneTick expression")
[docs] @inplace_operation def transpose( self, inplace: bool = False, direction: Literal['rows', 'columns'] = 'rows', n: Optional[int] = None, ) -> Optional['Source']: """ Data transposing. The main idea is joining many ticks into one or splitting one tick to many. Parameters ---------- inplace: bool, default=False if `True` method will modify current object, otherwise it will return modified copy of the object. direction: 'rows', 'columns', default='rows' - `rows`: join certain input ticks (depending on other parameters) with preceding ones. Fields of each tick will be added to the output tick and their names will be suffixed with **_K** where **K** is the positional number of tick (starting from 1) in reverse order. So fields of current tick will be suffixed with **_1**, fields of previous tick will be suffixed with **_2** and so on. - `columns`: the operation is opposite to `rows`. It splits each input tick to several output ticks. Each input tick must have fields with names suffixed with **_K** where **K** is the positional number of tick (starting from 1) in reverse order. n: Optional[int], default=None must be specified only if ``direction`` is 'rows'. Joins every **n** number of ticks with **n-1** preceding ticks. Returns ------- If ``inplace`` parameter is `True` method will return `None`, otherwise it will return modified copy of the object. See also -------- **TRANSPOSE** OneTick event processor Examples -------- Merging two ticks into one. >>> data = otp.Ticks(dict(A=[1, 2], ... B=[3, 4])) >>> data = data.transpose(direction='rows', n=2) # OTdirective: skip-snippet:; >>> otp.run(data) Time TIMESTAMP_1 A_1 B_1 TIMESTAMP_2 A_2 B_2 0 2003-12-01 00:00:00.001 2003-12-01 00:00:00.001 2 4 2003-12-01 1 3 And splitting them back into two. >>> data = data.transpose(direction='columns') # OTdirective: skip-snippet:; >>> otp.run(data) Time A B 0 2003-12-01 00:00:00.000 1 3 1 2003-12-01 00:00:00.001 2 4 """ direction_map = {'rows': 'ROWS_TO_COLUMNS', 'columns': 'COLUMNS_TO_ROWS'} n: Union[str, int] = '' if n is None else n # type: ignore[no-redef] self.sink(otq.Transpose(direction=direction_map[direction], key_constraint_values=n)) # TODO: we should change source's schema after transposing return self
[docs] @inplace_operation def process_by_group(self, process_source_func, group_by=None, source_name=None, num_threads=None, inplace=False) -> Union['Source', Tuple['Source', ...], None]: """ Groups data by ``group_by`` and run ``process_source_func`` for each group and merge outputs for every group. Note ``process_source_func`` will be converted to Onetick object and passed to query, that means that python callable will be called only once. Parameters ---------- process_source_func: callable ``process_source_func`` should take :class:`Source` apply necessary logic and return it or tuple of :class:`Source` in this case all of them should have a common root that is the input :class:`Source`. The number of sources returned by this method is the same as the number of sources returned by ``process_source_func``. group_by: list A list of field names to group input ticks by. If group_by is None then no group_by fields are defined and logic of ``process_source_func`` is applied to all input ticks at once source_name: str A name for the source that represents all of group_by sources. Can be passed here or as a name of the inner sources; if passed by both ways, should be consistent num_threads: int If specified and not zero, turns on asynchronous processing mode and specifies number of threads to be used for processing input ticks. If this parameter is not specified or zero, then input ticks are processed synchronously. inplace: bool If True - nothing will be returned and changes will be applied to current query otherwise changes query will be returned. Error is raised if ``inplace`` is set to True and multiple sources returned by ``process_source_func``. Returns ------- :class:`Source`, Tuple[:class:`Source`] or None: See also -------- **GROUP_BY** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Arrange.group.single output; >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 3, 4]) >>> >>> def func(source): ... return source.first() >>> >>> res = d.process_by_group(func, group_by=['X']) >>> otp.run(res)[["X", "Y"]] X Y 0 1 1 1 2 3 Set asynchronous processing: >>> res = d.process_by_group(func, group_by=['X'], num_threads=2) >>> otp.run(res)[['X', 'Y']] X Y 0 1 1 1 2 3 Return multiple outputs, each with unique grouping logic: >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 1, 3]) >>> >>> def func(source): ... source['Z'] = source['X'] ... source2 = source.copy() ... source = source.first() ... source2 = source2.last() ... return source, source2 >>> # OTdirective: snippet-name: Arrange.group.multiple output; >>> res1, res2 = d.process_by_group(func, group_by=['Y']) >>> df1 = otp.run(res1) >>> df2 = otp.run(res2) >>> df1[['X', 'Y', 'Z']] X Y Z 0 1 1 1 1 1 2 1 2 2 3 2 >>> df2[['X', 'Y', 'Z']] # OTdirective: skip-snippet:; X Y Z 0 1 2 1 1 2 1 2 2 2 3 2 """ if group_by is None: group_by = [] if inplace: main_source = self else: main_source = self.copy() input_schema = main_source.columns(skip_meta_fields=True) for field in group_by: if field not in input_schema: raise ValueError(f"Group by field name {field} not present in input source schema") process_source_root = onetick.py.sources.Custom(tick_type="ANY", schema_policy="manual", **input_schema) if source_name: process_source_root.set_name(source_name) process_sources = process_source_func(process_source_root) if isinstance(process_sources, Source): # returned one source process_sources = [process_sources] elif len(process_sources) == 1: # returned one source as an iterable pass else: # returned multiple sources if inplace: raise ValueError("Cannot use inplace=True with multi-source processing function!") num_source = 0 for process_source in process_sources: output_schema = process_source.columns(skip_meta_fields=True) if process_source.get_name(): if not process_source_root.get_name(): process_source_root.set_name(process_source.get_name()) if process_source_root.get_name() != process_source.get_name(): warnings.warn("Different strings passed as names for the root source used in " f"process_by_group: '{process_source.get_name()}' " f"and '{process_source_root.get_name()}'") # removing key fields from output schema since they will be # added by the GROUP_BY EP process_source.drop([field for field in group_by if field in output_schema], inplace=True) process_source.sink(otq.Passthrough().node_name(f"OUT_{num_source}")) process_source_root.node().add_rules(process_source.node().copy_rules()) main_source._merge_tmp_otq(process_source) num_source += 1 query_name = process_source_root._store_in_tmp_otq( main_source._tmp_otq, operation_suffix="group_by", add_passthrough=False ) process_path = f'THIS::{query_name}' num_outputs = len(process_sources) # we shouldn't set named outputs if GROUP_BY EP has only one output due to onetick behaviour if num_outputs == 1: outputs = "" else: outputs = ",".join([f"OUT_{i}" for i in range(0, num_outputs)]) kwargs = {} if num_threads is not None: if num_threads < 0: raise ValueError("Parameter 'num_threads' can't be negative.") kwargs['num_threads'] = num_threads main_source.sink( otq.GroupBy(key_fields=",".join(group_by), query_name=process_path, outputs=outputs, **kwargs) ) output_sources = [] for num_output in range(0, num_outputs): if num_outputs == 1 and inplace: output_source = main_source else: output_source = main_source.copy() if num_outputs > 1: output_source.node().out_pin(f"OUT_{num_output}") # setting schema after processing output_schema = process_sources[num_output].columns(skip_meta_fields=True) for field in group_by: output_schema[field] = input_schema[field] for field, field_type in output_schema.items(): output_source.schema[field] = field_type output_source = output_source[[field for field in output_schema]] output_source._merge_tmp_otq(main_source) output_sources.append(output_source) if num_outputs == 1: return output_sources[0] else: return tuple(output_sources)
def unite_columns(self, sep="", *, apply_str=False): """ Join values of all columns into one string The method unite all fields to one string, just like python ``join`` method. All fields should be strings, otherwise the error will be generated. To change this behavior, ``apply_str=True`` argument should be specified, in this case all fields will be converted to string type before joining. Parameters ---------- sep: str Separator between values, empty string be dafault. apply_str: bool If set every column will be converted to string during operation. False be default. Returns ------- result: column Column with str type Examples -------- >>> # OTdirective: snippet-name: Arrange.join columns as strings; >>> data = otp.Ticks(X=[1, 2, 3], A=["A", "A", "A"], B=["A", "B", "C"]) >>> data["S_ALL"] = data.unite_columns(sep=",", apply_str=True) >>> data["S"] = data[["A", "B"]].unite_columns() >>> otp.run(data)[["S", "S_ALL"]] S S_ALL 0 AA 1,A,A 1 AB 2,A,B 2 AC 3,A,C """ if apply_str: cols = (self[col].apply(str) for col in self.schema) else: not_str = [name for name, t in self.schema.items() if not are_strings(t)] if not_str: raise ValueError(f"All joining columns should be strings, while {', '.join(not_str)} " f"are not. Specify `apply_str=True` for automatic type conversion") else: cols = (self[col] for col in self.schema) return functools.reduce(lambda x, y: x + sep + y, cols) # Aggregations copy # we need this functions to store and collect documentation # copy_method decorator will # set docstring (will compare docstring of donor function and method docstring) # apply same signature from donor function + self # for mimic=True will apply agg function as is
[docs] @copy_method(high_tick) def high(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.high(['X'], 2) # OTdirective: snippet-name: Aggregations.high tick; >>> otp.run(data) Time X 0 2003-12-01 00:00:01.500 3 1 2003-12-01 00:00:03.000 4 """ pass
[docs] @copy_method(low_tick) def low(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.low(['X'],2) # OTdirective: snippet-name: Aggregations.low tick; >>> otp.run(data) Time X 0 2003-12-01 00:00:00 1 1 2003-12-01 00:00:01 2 """ pass
[docs] @copy_method(first_tick) def first(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.first() # OTdirective: snippet-name: Aggregations.first; >>> otp.run(data) Time X 0 2003-12-01 1 """ pass
[docs] @copy_method(last_tick) def last(self): """ Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.last() # OTdirective: snippet-name: Aggregations.last; >>> otp.run(data) Time X 0 2003-12-01 00:00:03 4 """ pass
[docs] @copy_method(distinct, mimic=False) def distinct(self, *args, **kwargs): """ Examples -------- >>> data = otp.Ticks(dict(x=[1, 3, 1, 5, 3])) >>> data = data.distinct('x') # OTdirective: snippet-name: Aggregations.distinct; >>> otp.run(data) Time x 0 2003-12-04 1 1 2003-12-04 3 2 2003-12-04 5 """ if 'bucket_interval_units' in kwargs: kwargs['bucket_units'] = kwargs.pop('bucket_interval_units') agg = distinct(*args, **kwargs) return agg.apply(self)
[docs] @copy_method(high_time, mimic=False) # mimic=False for backward compatibility def high_time(self, *args, **kwargs): """ Returns timestamp of tick with the highest value of input field .. deprecated:: 1.14.5 Use :py:func:`.high_time` instead See Also -------- :py:func:`.high_time` """ warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. " f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead", DeprecationWarning, stacklevel=2) agg = high_time(*args, **kwargs) return agg.apply(self, 'VALUE')
[docs] @copy_method(low_time, mimic=False) # mimic=False for backward compatibility def low_time(self, *args, **kwargs): """ Returns timestamp of tick with the lowest value of input field .. deprecated:: 1.14.5 Use :py:func:`.low_time` instead See Also -------- :py:func:`.low_time` """ warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. " f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead", DeprecationWarning, stacklevel=2) agg = low_time(*args, **kwargs) return agg.apply(self, 'VALUE')
[docs] @copy_method(ob_snapshot) def ob_snapshot(self): """ Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP >>> data = data.ob_snapshot(max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time PRICE UPDATE_TIME SIZE LEVEL BUY_SELL_FLAG 0 2003-12-04 2.0 2003-12-01 00:00:00.003 6 1 1 1 2003-12-04 5.0 2003-12-01 00:00:00.004 7 1 0 """ pass
[docs] @copy_method(ob_snapshot_wide) def ob_snapshot_wide(self): """ Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP >>> data = data.ob_snapshot_wide(max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time BID_PRICE BID_UPDATE_TIME BID_SIZE ASK_PRICE ASK_UPDATE_TIME ASK_SIZE LEVEL 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 1 """ pass
[docs] @copy_method(ob_snapshot_flat) def ob_snapshot_flat(self): """ Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP >>> data = data.ob_snapshot_flat(max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time BID_PRICE1 BID_UPDATE_TIME1 BID_SIZE1 ASK_PRICE1 ASK_UPDATE_TIME1 ASK_SIZE1 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 """ pass
[docs] @inplace_operation def add_prefix(self, prefix, inplace=False, columns=None, ignore_columns=None) -> Optional['Source']: """ Adds prefix to all column names. Parameters ---------- prefix : str String prefix to add to all columns. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. columns: List[str], optional If set, only selected columns will be updated with prefix. Can't be used with ``ignore_columns`` parameter. ignore_columns: List[str], optional If set, selected columns won't be updated with prefix. Can't be used with ``columns`` parameter. Returns ------- :class:`Source` or ``None`` Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1') >>> data = data.add_prefix('test_') >>> otp.run(data) Time test_X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data.schema {'test_X': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.add_prefix('test_') >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time test_PRICE test_SIZE 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> data.schema {'test_PRICE': <class 'float'>} >>> data = otp.Tick(X=1, XX=2) >>> data.add_prefix('X') Traceback (most recent call last): ... AttributeError: Column XX already exists, please, use another prefix Parameter ``columns`` specifies columns to be updated with prefix: >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5) >>> data = data.add_prefix('test_', columns=['A', 'B', 'C']) >>> otp.run(data) Time test_A test_B test_C D E 0 2003-12-01 1 2 3 4 5 Parameter ``ignore_columns`` specifies columns to ignore: >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5) >>> data = data.add_prefix('test_', ignore_columns=['A', 'B', 'C']) >>> otp.run(data) Time A B C test_D test_E 0 2003-12-01 1 2 3 4 5 Parameters `columns` and `ignore_columns` can't be used at the same time: >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5) >>> data.add_prefix('test_', columns=['B', 'C'], ignore_columns=['A']) Traceback (most recent call last): ... ValueError: It is allowed to use only one of `columns` or `ignore_columns` parameters at a time """ return self._add_prefix_and_suffix( prefix=prefix, columns=columns, ignore_columns=ignore_columns, )
[docs] @inplace_operation def add_suffix(self, suffix, inplace=False, columns=None, ignore_columns=None) -> Optional['Source']: """ Adds suffix to all column names. Parameters ---------- suffix : str String suffix to add to all columns. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. columns: List[str], optional If set, only selected columns will be updated with suffix. Can't be used with `ignore_columns` parameter. ignore_columns: List[str], optional If set, selected columns won't be updated with suffix. Can't be used with `columns` parameter. Returns ------- :class:`Source` or ``None`` Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1') >>> data = data.add_suffix('_test') >>> otp.run(data) Time X_test 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data.schema {'X_test': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.add_suffix('_test') >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE_test SIZE_test 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> data.schema {'PRICE_test': <class 'float'>} >>> data = otp.Tick(X=1, XX=2) >>> data.add_suffix('X') Traceback (most recent call last): ... AttributeError: Column XX already exists, please, use another suffix Parameter `columns` specify columns to be updated with suffix: >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5) >>> data = data.add_prefix('test_', columns=['A', 'B', 'C']) >>> otp.run(data) Time test_A test_B test_C D E 0 2003-12-01 1 2 3 4 5 Parameter `ignore_columns` specify columns to ignore: >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5) >>> data = data.add_prefix('test_', ignore_columns=['A', 'B', 'C']) >>> otp.run(data) Time A B C test_D test_E 0 2003-12-01 1 2 3 4 5 Parameters `columns` and `ignore_columns` can't be used at the same time: >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5) >>> data.add_prefix('test_', columns=['B', 'C'], ignore_columns=['A']) Traceback (most recent call last): ... ValueError: It is allowed to use only one of `columns` or `ignore_columns` parameters at a time """ return self._add_prefix_and_suffix( suffix=suffix, columns=columns, ignore_columns=ignore_columns, )
def _add_prefix_and_suffix( self, prefix='', suffix='', columns=None, ignore_columns=None, ) -> Optional['Source']: if not prefix and not suffix: raise ValueError('Both suffix and prefix are empty') if ' ' in prefix: raise ValueError(f'There is space in prefix: {prefix}') if ' ' in suffix: raise ValueError(f'There is space in suffix: {prefix}') columns = columns or [] ignore_columns = ignore_columns or [] if columns and ignore_columns: raise ValueError('It is allowed to use only one of `columns` or `ignore_columns` parameters at a time') schema = self.schema for column in columns: if column not in schema: raise AttributeError(f'Column `{column}` does not exist in the schema') for column_name in (columns or schema): if column_name in ignore_columns: continue new_column_name = f'{prefix}{column_name}{suffix}' if new_column_name in self.__dict__: if prefix: raise AttributeError(f'Column {new_column_name} already exists, please, use another prefix') else: raise AttributeError(f'Column {new_column_name} already exists, please, use another suffix') for column_name in (columns or schema): if column_name in ignore_columns: continue new_column_name = f'{prefix}{column_name}{suffix}' self.__dict__[column_name].rename(new_column_name, update_parent_object=False) self.__dict__[new_column_name] = self.__dict__[column_name] del self.__dict__[column_name] if not columns and not ignore_columns: self.sink(otq.RenameFieldsEp(rename_fields=f'(.*)={prefix}\\1{suffix}', use_regex=True)) elif columns: renames = [f'{column}={prefix}{column}{suffix}' for column in columns] self.sink(otq.RenameFieldsEp(rename_fields=','.join(renames))) else: renames = [f'{column}={prefix}{column}{suffix}' for column in schema if column not in ignore_columns] self.sink(otq.RenameFieldsEp(rename_fields=','.join(renames))) return self
[docs] @inplace_operation def time_filter(self, discard_on_match: bool = False, start_time: Union[str, int, time] = 0, end_time: Union[str, int, time] = 0, day_patterns: Union[str, List[str]] = "", timezone=utils.default, # type: ignore end_time_tick_matches: bool = False, inplace=False) -> Optional['Source']: """ Filters ticks by time. Parameters ---------- discard_on_match : bool, optional If ``True``, then ticks that match the filter will be discarded. Otherwise, only ticks that match the filter will be passed. start_time : str or int or datetime.time, optional Start time of the filter, string must be in the format ``HHMMSSmmm``. Default value is 0. end_time : str or int or datetime.time, optional End time of the filter, string must be in the format ``HHMMSSmmm``. To filter ticks for an entire day, this parameter should be set to 240000000. Default value is 0. day_patterns : list or str Pattern or list of patterns that determines days for which the ticks can be propagated. A tick can be propagated if its date matches one or more of the patterns. Three supported pattern formats are: 1. ``month.week.weekdays``, 0 month means any month, 0 week means any week, 6 week means the last week of the month for a given weekday(s), weekdays are digits for each day, 0 being Sunday. 2. ``month/day``, 0 month means any month. 3. ``year/month/day``, 0 year means any year, 0 month means any month. timezone : str, optional Timezone of the filter. Default value is ``configuration.config.tz`` or timezone set in the parameter of :py:func:`onetick.py.run`. end_time_tick_matches : bool, optional If ``True``, then the end time is inclusive. Otherwise, the end time is exclusive. inplace : bool, optional The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Default value is ``False``. Returns ------- :class:`Source` or ``None`` Returns ``None`` if ``inplace=True``. See also -------- **TIME_FILTER** OneTick event processor Examples -------- >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.time_filter(start_time='000000001', end_time='000000003') >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE SIZE 0 2022-03-01 00:00:00.001 1.4 10 1 2022-03-01 00:00:00.002 1.4 50 """ if timezone is utils.default: # doesn't work without expr for some reason timezone = 'expr(_TIMEZONE)' if day_patterns: if isinstance(day_patterns, str): day_patterns = [day_patterns] for day_pattern in day_patterns: if not re.match(r"(^\d\d?\.[0-6].\d\d?$)|(^\d\d?\/\d\d?$)|(^\d{1,4}\/\d\d?\/\d\d?$)", day_pattern): raise ValueError(f"Invalid day pattern: {day_pattern}") if isinstance(start_time, time): start_time = start_time.strftime('%H%M%S%f')[:-3] if isinstance(end_time, time): end_time = end_time.strftime('%H%M%S%f')[:-3] day_patterns = ",".join(day_patterns) self.sink(otq.TimeFilter(discard_on_match=discard_on_match, start_time=start_time, end_time=end_time, timezone=timezone, day_patterns=day_patterns, end_time_tick_matches=end_time_tick_matches, )) return self
[docs] @copy_method(ranking) def ranking(self, *args, **kwargs): pass
[docs] @inplace_operation def insert_tick(self, fields=None, where=None, preserve_input_ticks=True, num_ticks_to_insert=1, insert_before=True, inplace=False) -> Optional['Source']: """ Insert tick. Parameters ---------- fields: dict of str to :py:class:`onetick.py.Operation` Mapping of field names to some expressions or values. These fields in inserted ticks will be set to corresponding values or results of expressions. If field is presented in input tick, but not set in ``fields`` dict, then the value of the field will be copied from input tick to inserted tick. If parameter ``fields`` is not set at all, then values for inserted ticks' fields will be default values for fields' types from input ticks (0 for integers etc.). where: :py:class:`onetick.py.Operation` Expression to select ticks near which the new ticks will be inserted. By default, all ticks are selected. preserve_input_ticks: bool A switch controlling whether input ticks have to be preserved in output time series or not. While the former case results in fields of input ticks to be present in the output time series together with those defined by the ``fields`` parameter, the latter case results in only defined fields to be present. If a field of the input time series is defined in the ``fields`` parameter, the defined value takes precedence. num_ticks_to_insert: int Number of ticks to insert. insert_before: bool Insert tick before each input tick or after. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- **INSERT_TICK** OneTick event processor Returns ------- :class:`Source` or ``None`` Examples -------- Insert tick before each tick with default type values. >>> data = otp.Tick(A=1) >>> data = data.insert_tick() >>> otp.run(data) Time A 0 2003-12-01 0 1 2003-12-01 1 Insert tick before each tick with field `A` copied from input tick and field `B` set to specified value. >>> data = otp.Tick(A=1) >>> data = data.insert_tick(fields={'B': 'b'}) >>> otp.run(data) Time B A 0 2003-12-01 b 1 1 2003-12-01 1 Insert two ticks only after first tick. >>> data = otp.Ticks(A=[1, 2, 3]) >>> data = data.insert_tick(where=data['A'] == 1, ... insert_before=False, ... num_ticks_to_insert=2) >>> otp.run(data) Time A 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 0 2 2003-12-01 00:00:00.000 0 3 2003-12-01 00:00:00.001 2 4 2003-12-01 00:00:00.002 3 """ if not isinstance(num_ticks_to_insert, int) or num_ticks_to_insert <= 0: raise ValueError("Parameter 'num_ticks_to_insert' must be a positive integer") if not preserve_input_ticks and not fields: raise ValueError("Parameter 'fields' must be set if 'preserve_input_ticks' is False") where = '' if where is None else str(where) fields = fields or {} update_schema = {} for field, value in fields.items(): dtype = ott.get_object_type(value) if field not in self.schema: update_schema[field] = dtype elif dtype is not self.schema[field]: raise ValueError(f"Incompatible types for field '{field}': {self.schema[field]} --> {dtype}") dtype = ott.type2str(dtype) if isinstance(value, type): value = ott.default_by_type(value) value = ott.value2str(value) fields[field] = (dtype, value) fields = ','.join( f'{field} {dtype}={value}' if value else f'{field} {dtype}' for field, (dtype, value) in fields.items() ) self.sink( otq.InsertTick( fields=fields, where=where, preserve_input_ticks=preserve_input_ticks, num_ticks_to_insert=num_ticks_to_insert, insert_before=insert_before, ) ) if preserve_input_ticks: self.table(inplace=True, strict=False, **update_schema) else: self.table(inplace=True, strict=True, **update_schema) return self
[docs] @inplace_operation def modify_query_times(self, start=None, end=None, output_timestamp=None, propagate_heartbeats=True, inplace=False): """ Modify ``start`` and ``end`` time of the query. * query times are changed for all operations only **before** this method up to the source of the graph. * all ticks' timestamps produced by this method **must** fall into original time range of the query. It is possible to change ticks' timestamps with parameter ``output_timestamp``, so they will stay inside the original time range. Note ---- Due to how OneTick works internally, tick generators :py:class:`otp.Tick <onetick.py.Tick>` and :py:func:`otp.Ticks <onetick.py.Ticks>` are not affected by this method. Parameters ---------- start: :py:class:`onetick.py.datetime` or \ :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation` Expression to replace query start time. By default, start time is not changed. Note that expression in this parameter can't depend on ticks, thus only :py:class:`~onetick.py.core.source.MetaFields` and constants can be used. end: :py:class:`onetick.py.datetime` or \ :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation` Expression to replace query end time. By default, end time is not changed. Note that expression in this parameter can't depend on ticks, thus only :py:class:`~onetick.py.core.source.MetaFields` and constants can be used. output_timestamp: :py:class:`onetick.py.Operation` Expression that produces timestamp for each tick. By default, the following expression is used: ``orig_start + orig_timestamp - start`` This expression covers cases when start time of the query is changed and keeps timestamp inside original time range. Note that it doesn't cover cases, for example, if end time was increased, you have to handle such cases yourself. propagate_heartbeats: bool Controls heartbeat propagation. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- | **MODIFY_QUERY_TIMES** OneTick event processor | :py:meth:`onetick.py.Source.time_interval_shift` Returns ------- :class:`Source` or ``None`` Examples -------- >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') By default, method does nothing: >>> t = data.modify_query_times() >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 See how ``_START_TIME`` and ``_END_TIME`` meta fields are changed. They are changed *before* ``modify_query_times``: >>> t = data.copy() >>> t['S_BEFORE'] = t['_START_TIME'] >>> t['E_BEFORE'] = t['_END_TIME'] >>> t = t.modify_query_times(start=t['_START_TIME'] + otp.Milli(1), ... end=t['_END_TIME'] - otp.Milli(1)) >>> t['S_AFTER'] = t['_START_TIME'] >>> t['E_AFTER'] = t['_END_TIME'] >>> otp.run(t, start=start, end=end) Time PRICE SIZE S_BEFORE E_BEFORE S_AFTER E_AFTER 0 2022-03-02 1.1 101 2022-03-02 00:00:00.001 2022-03-02 00:00:00.002 2022-03-02 2022-03-02 00:00:00.003 You can decrease time interval without problems: >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1), ... end=data['_END_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 1.1 101 Note that the timestamp of the tick was changed with default expression. In this case we can output original timestamps, because they fall into original time range: >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1), ... end=data['_END_TIME'] - otp.Milli(1), ... output_timestamp=data['TIMESTAMP']) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.1 101 But it will not work if new time range is wider than original: >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1), ... output_timestamp=data['TIMESTAMP']) >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...timestamp is falling out of initial start/end time range... In this case default ``output_timestamp`` expression would work just fine: >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.0 100 1 2022-03-02 00:00:00.002 1.1 101 2 2022-03-02 00:00:00.003 1.2 102 But it doesn't work, for example, if end time has crossed the borders of original time range. In this case other ``output_timestamp`` expression must be specified: >>> t = data.modify_query_times( ... start=data['_START_TIME'] - otp.Milli(2), ... output_timestamp=otp.math.min(data['TIMESTAMP'] + otp.Milli(2), data['_END_TIME']) ... ) >>> otp.run(t, start=start + otp.Milli(2), end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.0 100 1 2022-03-02 00:00:00.003 1.1 101 2 2022-03-02 00:00:00.003 1.2 102 Remember that ``start`` and ``end`` parameters can't depend on ticks: >>> t = data.copy() >>> t['X'] = 12345 >>> t = t.modify_query_times(start=t['_START_TIME'] + t['X'] - t['X'], ... end=t['_END_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...parameter must not depend on ticks... Constant datetime values can be used as parameters too: >>> t = data.modify_query_times(start=start + otp.Milli(1), ... end=end - otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 1.1 101 Note that some graph patterns are not allowed when using this method. For example, modifying query times for a branch that will be merged later: >>> t1, t2 = data[data['PRICE'] > 1.3] >>> t2 = t2.modify_query_times(start=start + otp.Milli(1)) >>> t = otp.merge([t1, t2]) >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...Invalid graph...time bound to a node...an intermediate node in one of the cycles in graph... """ start = ott.value2str(start) if start is not None else '' end = ott.value2str(end) if end is not None else '' output_timestamp = ott.value2str(output_timestamp) if output_timestamp is not None else '' self.sink( otq.ModifyQueryTimes( start_time=start, end_time=end, output_timestamp=output_timestamp, propagate_heartbeats=propagate_heartbeats, ) ) return self
[docs] def time_interval_shift(self, shift, inplace=False): """ Shifting time interval for a source. The whole data flow is shifted all the way up to the source of the graph. The start and end times of the query will be changed for all operations before this method, and will stay the same after this method. WARNING: The ticks' timestamps *are changed* automatically so they fit into original time range. You will get different set of ticks from the database, but the timestamps of the ticks from that database will not be the same as in the database. They need to be changed so they fit into the original query time range. See details in :py:meth:`onetick.py.Source.modify_query_times`. Parameters ---------- shift: int or :ref:`datetime offset<Datetime offsets>` Offset to shift the whole time interval. Can be positive or negative. Positive value moves time interval into the future, negative -- to the past. int values are interpreted as milliseconds. Timestamps of the ticks will be changed so they fit into the original query time range by subtracting ``shift`` from each timestamp. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- | :py:meth:`onetick.py.Source.modify_query_times` | :py:meth:`onetick.py.Source.time_interval_change` Examples -------- --> Also see use-case using :py:meth:`time_interval_shift` for calculating `Markouts <../../static/getting_started/use_cases.html#point-in-time-benchmarks-bbo-at-different-markouts>`_ >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') Default data: >>> otp.run(data, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Get window for a third tick: >>> otp.run(data, start=start + otp.Milli(2), end=start + otp.Milli(3)) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.2 102 Shifting time window will result in different set of ticks, but the ticks will have their timestamps changed to fit into original time range. Let's shift time 2 milliseconds back and thus get the first tick: >>> t = data.time_interval_shift(shift=-otp.Milli(2)) >>> otp.run(t, start=start + otp.Milli(2), end=start + otp.Milli(3)) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.0 100 Here we are querying empty time interval, but shifting one second back to get ticks. >>> t = data.time_interval_shift(shift=-otp.Second(1)) >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1)) Time PRICE SIZE 0 2022-03-02 00:00:01.000 1.0 100 1 2022-03-02 00:00:01.001 1.1 101 2 2022-03-02 00:00:01.002 1.2 102 Note that tick generators :py:class:`otp.Tick <onetick.py.Tick>` and :py:func:`otp.Ticks <onetick.py.Ticks>` are not really affected by this method, they will have the same timestamps: >>> t = otp.Tick(A=1) >>> otp.run(t) Time A 0 2003-12-01 1 >>> t = t.time_interval_shift(shift=otp.Second(1)) >>> otp.run(t) Time A 0 2003-12-01 1 """ start = self['_START_TIME'] + shift end = self['_END_TIME'] + shift # change timestamps so they fit into original time range output_timestamp = self['TIMESTAMP'] - shift return self.modify_query_times(start=start, end=end, output_timestamp=output_timestamp, inplace=inplace)
[docs] def time_interval_change(self, start_change=0, end_change=0, inplace=False): """ Changing time interval by making it bigger or smaller. All timestamps of ticks that are crossing the border of original time range will be set to original start time or end time depending on their original time. Parameters ---------- start_change: int or :ref:`datetime offset<Datetime offsets>` Offset to shift start time. Can be positive or negative. Positive value moves start time into the future, negative -- to the past. int values are interpreted as milliseconds. end_change: int or :ref:`datetime offset<Datetime offsets>` Offset to shift end time. Can be positive or negative. Positive value moves end time into the future, negative -- to the past. int values are interpreted as milliseconds. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- | :py:meth:`onetick.py.Source.modify_query_times` | :py:meth:`onetick.py.Source.time_interval_shift` Examples -------- >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') By default, ``time_interval_change()`` does nothing: >>> t = data.time_interval_change() >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Decreasing time range will not change ticks' timestamps: >>> t = data.time_interval_change(start_change=otp.Milli(1), end_change=-otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.1 101 Increasing time range will change timestamps of the ticks that crossed the border. In this case first tick's timestamp will be set to original start time, and third tick's to original end time. >>> t = data.time_interval_change(start_change=-otp.Milli(1), end_change=otp.Milli(1)) >>> otp.run(t, start=start + otp.Milli(1), end=start + otp.Milli(2)) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Here we are querying empty time interval, but changing start time one second back to get ticks. >>> t = data.time_interval_change(start_change=-otp.Second(1)) >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1)) Time PRICE SIZE 0 2022-03-02 00:00:01 1.0 100 1 2022-03-02 00:00:01 1.1 101 2 2022-03-02 00:00:01 1.2 102 """ start = self['_START_TIME'] + start_change end = self['_END_TIME'] + end_change # change ticks' timestamps only if they are out of bounds output_timestamp = self['TIMESTAMP'] output_timestamp = otp.math.min(output_timestamp, self['_END_TIME']) output_timestamp = otp.math.max(output_timestamp, self['_START_TIME']) return self.modify_query_times(start=start, end=end, output_timestamp=output_timestamp, inplace=inplace)
def mean(self, *columns): """ Get average value of the specified columns. Do not confuse it with :py:func:`otp.agg.average <onetick.py.agg.average>`, this method gets average of the columns' values from single row, not doing aggregation of all rows. Parameters ---------- columns: str, :class:`Column` Columns names or columns objects. All columns must have compatible types. Returns ------- :class:`~onetick.py.Operation` Examples -------- Integers and floating point values can be mixed: >>> t = otp.Tick(A=1, B=3.3) >>> t['AVG'] = t.mean('A', 'B') >>> otp.run(t) Time A B AVG 0 2003-12-01 1 3.3 2.15 You can get the average for datetime values too: >>> t = otp.Tick(START_TIME=otp.dt(2022, 1, 1), END_TIME=otp.dt(2023, 1, 1)) >>> t['MID_TIME'] = t.mean('START_TIME', 'END_TIME') >>> otp.run(t, timezone='GMT') Time START_TIME END_TIME MID_TIME 0 2003-12-01 2022-01-01 2023-01-01 2022-07-02 12:00:00 Note that things may get confusing for datetimes when the timezone with daylight saving time is used: >>> t = otp.Tick(START_TIME=otp.dt(2022, 1, 1), END_TIME=otp.dt(2023, 1, 1)) >>> t['MID_TIME'] = t.mean('START_TIME', 'END_TIME') >>> otp.run(t, timezone='EST5EDT') Time START_TIME END_TIME MID_TIME 0 2003-12-01 2022-01-01 2023-01-01 2022-07-02 13:00:00 """ dtypes = set() for column_name in map(str, columns): if column_name not in self.schema: raise ValueError(f"There is no '{column_name}' column in the schema") dtypes.add(self.schema[column_name]) if not are_numerics(*dtypes) and not are_time(*dtypes): raise ValueError('Only int, float and datetime columns are supported. ' 'Numeric and datetime columns should not be mixed.') op = None for column_name in map(str, columns): column = self[column_name] dtype = self.schema[column_name] if dtype is otp.msectime and otp.nsectime in dtypes: column = column.astype(otp.nsectime) if are_time(dtype): column = column.astype(int) if op is None: op = column else: op += column op = op / len(columns) dtype = ott.get_type_by_objects(dtypes) if are_time(dtype): # can't convert float to datetime, converting to int first op = op.astype(int) op = op.astype(dtype) return op
[docs] @inplace_operation def show_symbol_name_in_db(self, inplace=False): """ Adds the **SYMBOL_NAME_IN_DB** field to input ticks, indicating the symbol name of the tick in the database. Parameters ---------- inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- **SHOW_SYMBOL_NAME_IN_DB** OneTick event processor Returns ------- :class:`Source` or ``None`` Examples -------- For example, it can be used to display the actual symbol name for the contract (e.g., **ESM23**) instead of the artificial continuous name **ES_r_tdi**. Notice how actual symbol name can be different for each tick, e.g. in this case it is different for each month. >>> data = otp.DataSource('TDI_FUT', tick_type='TRD') # doctest: +SKIP >>> data = data[['PRICE']] # doctest: +SKIP >>> data = data.first(bucket_interval=31, bucket_units='days') # doctest: +SKIP >>> data['SYMBOL_NAME'] = data.Symbol.name # doctest: +SKIP >>> data = data.show_symbol_name_in_db() # doctest: +SKIP >>> otp.run(data, # doctest: +SKIP ... symbols='ES_r_tdi', symbol_date=otp.dt(2023, 3, 1), ... start=otp.dt(2023, 3, 1), end=otp.dt(2023, 5, 1)) Time PRICE SYMBOL_NAME SYMBOL_NAME_IN_DB 0 2023-03-01 00:00:00.549 3976.75 ES_r_tdi ESH23 1 2023-04-02 18:00:00.039 4127.00 ES_r_tdi ESM23 """ if 'SYMBOL_NAME_IN_DB' in self.schema: raise ValueError("Column 'SYMBOL_NAME_IN_DB' already exists.") self.sink(otq.ShowSymbolNameInDb()) self.schema['SYMBOL_NAME_IN_DB'] = str return self
[docs] @inplace_operation def throw(self, message='', where=1, scope='query', error_code=1, throw_before_query_execution=False, inplace=False): """ Propagates error or warning or throws an exception (depending on ``scope`` parameter) when condition provided by ``where`` parameter evaluates to True. Throwing an exception will abort query execution, propagating error will stop tick propagation and propagating warning will not change the data processing. Parameters ---------- message: str or Operation Message that will be thrown in exception or returned in error message. where: Operation Logical expression that specifies condition when exception or error will be thrown. scope: 'query' or 'symbol' 'query' will throw an exception and 'symbol' will propagate an error or warning depending on ``error_code`` parameter. error_code: int When ``scope='symbol'``, values from interval ``[1, 500]`` indicate warnings and values from interval ``[1500, 2000]`` indicate errors. Note that tick propagation will not stop when warning is raised, and will stop when error is raised. This parameter is not used when ``scope='query'``. throw_before_query_execution: bool If set to ``True``, the exception will be thrown before the execution of the query. This option is intended for supplying placeholder queries that must always throw. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- **THROW** OneTick event processor Returns ------- :class:`Source` or ``None`` Examples -------- By default, this method will throw an exception on the first tick with empty message: >>> t = otp.Tick(A=1) >>> t = t.throw() >>> otp.run(t) # doctest: +ELLIPSIS Traceback (most recent call last): ... Exception: ...: In THROW: ... ... You can specify exception message and condition (note that exception now is raised on second tick): >>> t = otp.Ticks(A=[1, 2]) >>> t = t.throw(message='A is ' + t['A'].apply(str), where=(t['A']==2)) >>> otp.run(t) # doctest: +ELLIPSIS Traceback (most recent call last): ... Exception: ...: In THROW: A is 2. ... ... Note that exception will not be thrown if there are no ticks: >>> t = otp.Empty() >>> t = t.throw() >>> otp.run(t) Empty DataFrame Columns: [] Index: [] If you need exception to be thrown always, you can use ``throw_before_query_execution`` parameter: >>> t = otp.Empty() >>> t = t.throw(throw_before_query_execution=True) >>> otp.run(t) # doctest: +ELLIPSIS Traceback (most recent call last): ... Exception: ...: In THROW: ... You can throw OneTick errors and warnings instead of exceptions. Raising error codes from 1 to 500 indicates warnings. Raising error codes from 1500 to 2000 indicates errors. Tick propagation stops only when error is raised. >>> t = otp.Ticks(A=[1, 2, 3, 4]) >>> t = t.throw(message='warning A=1', scope='symbol', error_code=2, where=(t['A']==1)) >>> t = t.throw(message='error A=3', scope='symbol', error_code=1502, where=(t['A']==3)) >>> otp.run(t) Time A 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 Right now the only supported interface to get errors and warnings is via ``map`` output structure and its methods: >>> otp.run(t, symbols='AAPL', output_structure='map').output('AAPL').error [(2, 'warning A=1'), (1502, 'error A=3')] """ if isinstance(message, str): treat_message_as_per_tick_expr = False else: treat_message_as_per_tick_expr = True if scope not in {'query', 'symbol'}: raise ValueError("Parameter 'scope' can only be set to 'query' or 'symbol'") if not (1 <= error_code <= 500 or 1500 <= error_code <= 2000): raise ValueError("Parameter 'error_code' can only take values " "from interval [1, 500] for warnings and from interval [1500, 2000] for errors") self.sink( otq.Throw( message=str(message), treat_message_as_per_tick_expr=treat_message_as_per_tick_expr, where=str(where), scope=scope.upper(), error_code=error_code, throw_before_query_execution=throw_before_query_execution, ) ) return self
_Source = Source # Backward compatibility