import copy
import functools
import inspect
import os
import re
import uuid
import warnings
from collections import defaultdict
from datetime import datetime, date, time
import pandas as pd
import numpy as np
from typing import Callable, Optional, List, Any, Tuple, Union, Type, Dict, Collection
from onetick.py.backports import Literal
import onetick.query as otq
from onetick.lib.instance import OneTickLib
import onetick.py.functions
import onetick.py.sources
from onetick import py as otp
from onetick.py import aggregations
from onetick.py import types as ott
from onetick.py import utils, configuration
from onetick.py.core._internal._manually_bound_value import _ManuallyBoundValue
from onetick.py.core._internal._multi_symbols_source import _MultiSymbolsSource
from onetick.py.core._internal._proxy_node import _ProxyNode
from onetick.py.core._internal._state_objects import _StateColumn, _TickSequence
from onetick.py.core._internal._state_vars import StateVars
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source.schema import Schema
from onetick.py.core._source.symbol import Symbol
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.core.column import _Column, _LagOperator, _ColumnAggregation
from onetick.py.core.column_operations._methods.methods import is_arithmetical, is_compare
from onetick.py.core.column_operations._methods.op_types import are_strings, _replace_parameters, are_numerics, are_time
from onetick.py.core.column_operations.base import _Operation
from onetick.py.core.lambda_object import _LambdaIfElse, apply_lambda, apply_script, _EmulateObject
from onetick.py.core.query_inspector import get_query_info, add_pins, get_query_parameter_list
from onetick.py.core.eval_query import _QueryEvalWrapper, prepare_params
from onetick.py.core.cut_builder import _BaseCutBuilder
from onetick.py.core import db_constants
from onetick.py.utils import adaptive, adaptive_to_default
from onetick.py.aggregations._docs import (copy_method,
_running_doc, _all_fields_with_policy_doc, _bucket_interval_doc,
_bucket_time_doc, _bucket_units_doc, _bucket_end_condition_doc,
_end_condition_per_group_doc, _boundary_tick_bucket_doc, _group_by_doc)
from onetick.py.docs.utils import docstring, param_doc
from onetick.py.aggregations.functions import (high_tick, low_tick,
high_time, low_time,
first_tick, last_tick,
distinct, ranking,
ob_snapshot, ob_snapshot_wide, ob_snapshot_flat)
def inplace_operation(method):
""" Decorator that adds the `inplace` parameter and logic according to this
flag. inplace=True means that method modifies an object, otherwise it copies
the object firstly, modifies copy and returns the copy.
"""
@functools.wraps(method)
def _inner(self, *args, inplace=False, **kwargs):
kwargs['inplace'] = inplace
if inplace:
method(self, *args, **kwargs)
else:
obj = self.copy()
return method(obj, *args, **kwargs)
return _inner
def _is_dict_required(symbols):
"""
Depending on symbols, determine if output of otp.run() or Source.__call__() should always be a dictionary
of {symbol: dataframe} even if only one symbol is present in the results
"""
if isinstance(symbols, (list, tuple)):
if len(symbols) == 0:
return False
elif len(symbols) > 1:
return True
else:
symbols = symbols[0]
if isinstance(symbols, otp.Source):
return True
if isinstance(symbols, otq.Symbol):
symbols = symbols.name
if isinstance(symbols, str) and 'eval' in symbols:
return True
return False
_agg_doc = param_doc(name='aggs',
annotation=Dict,
str_annotation='dict of aggregations',
desc="""
aggregation dict: key - output column name; value - aggregation""")
[docs]class Source:
"""
Base class for representing Onetick execution graph.
All :ref:`onetick-py sources <sources>` are derived from this class
and have access to all its methods.
Examples
--------
>>> data = otp.Tick(A=1)
>>> isinstance(data, otp.Source)
True
Also this class can be used to initialize raw source
with the help of ``onetick.query`` classes, but
it should be done with caution as the user is required to set
such properties as symbol name and tick type manually.
>>> data = otp.Source(otq.TickGenerator(bucket_interval=0, fields='long A = 123').tick_type('TT'))
>>> otp.run(data, symbols='LOCAL::')
Time A
0 2003-12-04 123
"""
# TODO: need to support transactions for every _source
# transaction is set of calls between _source creation and call or between two calls
# if transaction have the same operations, then it seems we should add only one set of operations
_PROPERTIES = [
"__node",
"__hash",
"__use_name_for_column_prefix",
"__sources_keys_dates",
"__sources_modify_query_times",
"__sources_base_ep_func",
"__sources_symbols",
"__source_has_output",
"__name",
"_tmp_otq"
]
_OT_META_FIELDS = ["_START_TIME", "_END_TIME", "_SYMBOL_NAME", "_DBNAME", "_TICK_TYPE", '_TIMEZONE']
meta_fields = MetaFields()
Symbol = Symbol
def __init__(
self,
node=None,
_symbols=None,
_start=adaptive,
_end=adaptive,
_base_ep_func=None,
_has_output=True,
**kwargs
):
self._tmp_otq = TmpOtq()
self.__name = None
if "Time" in kwargs:
# TODO: add tests
raise ValueError(
"It is not allowed to have 'Time' or 'TIMESTAMP' columns, because they are the key columns"
)
if "TIMESTAMP" not in kwargs:
kwargs["TIMESTAMP"] = ott.nsectime
kwargs["Time"] = ott.nsectime
kwargs["_START_TIME"] = ott.nsectime
kwargs["_END_TIME"] = ott.nsectime
kwargs["_SYMBOL_NAME"] = str
kwargs["_DBNAME"] = str
kwargs["_TICK_TYPE"] = str
kwargs["_TIMEZONE"] = str
for key, value in kwargs.items():
# calculate value type
value_type = ott.get_source_base_type(value)
self.__dict__[key] = _Column(name=key, dtype=value_type, obj_ref=self)
# just an alias to Timestamp
self.__dict__['Time'] = self.__dict__['TIMESTAMP']
self.__dict__['_state_vars'] = StateVars(self)
if node is None:
node = otq.Passthrough()
self.__hash = uuid.uuid4()
self.__sources_keys_dates = {}
self.__sources_modify_query_times = {}
self.__sources_base_ep_func = {}
self.__sources_symbols = {}
self.__source_has_output = _has_output
if isinstance(node, _ProxyNode):
self.__node = _ProxyNode(*node.copy_graph(), refresh_func=self.__refresh_hash)
else:
self.__node = _ProxyNode(*self.__from_ep_to_proxy(node), refresh_func=self.__refresh_hash)
self.__sources_keys_dates[self.__node.key()] = (_start, _end)
self.__sources_modify_query_times[self.__node.key()] = False
self.__sources_base_ep_func[self.__node.key()] = _base_ep_func
self.__sources_symbols[self.__node.key()] = _symbols
# flag controls whether we have to add node_name prefix when convert
# columns into string
self.__use_name_for_column_prefix = False
def _try_default_constructor(self, *args, node=None, **kwargs):
if node is not None:
# Source.copy() method will use this way
# all info from original source will be copied by copy() method after
Source.__init__(self, *args, node=node, **kwargs)
return True
return False
def base_ep(self, **kwargs):
# default implementation
# real implementation should return a Source object
return None
def _clean_sources_dates(self):
self.__sources_keys_dates = {}
self.__sources_modify_query_times = {}
self.__sources_base_ep_func = {}
self.__sources_symbols = {}
def _set_sources_dates(self, other, copy_symbols=True):
self.__sources_keys_dates.update(other._get_sources_dates())
self.__sources_modify_query_times.update(other._get_sources_modify_query_times())
self.__sources_base_ep_func.update(other._get_sources_base_ep_func())
if copy_symbols:
self.__sources_symbols.update(other._get_sources_symbols())
else:
# this branch is applicable for the bound symbols with callbacks,
# where we drop all adaptive symbols and keep only manually specified
# symbols
manually_bound = {
key: _ManuallyBoundValue(value)
for key, value in other._get_sources_symbols().items()
if value is not adaptive and value is not adaptive_to_default
}
self.__sources_symbols.update(manually_bound)
self.__source_has_output = other._get_source_has_output()
def _change_sources_keys(self, keys: dict):
"""
Change keys in sources dictionaries.
Need to do it, for example, after rebuilding the node history with new keys.
Parameters
----------
keys: dict
Mapping from old key to new key
"""
sources = (self.__sources_keys_dates,
self.__sources_modify_query_times,
self.__sources_base_ep_func,
self.__sources_symbols)
for dictionary in sources:
for key in list(dictionary):
dictionary[keys[key]] = dictionary.pop(key)
def _get_source_has_output(self):
return self.__source_has_output
def _get_sources_dates(self):
return self.__sources_keys_dates
def _get_sources_modify_query_times(self):
return self.__sources_modify_query_times
def _get_sources_base_ep_func(self):
return self.__sources_base_ep_func
def _get_sources_symbols(self):
return self.__sources_symbols
def use_name_for_column_prefix(self, flag=None):
if flag is not None:
self.__use_name_for_column_prefix = flag
return self.__use_name_for_column_prefix
def _check_key_in_properties(self, key: str) -> bool:
if key in self.__class__._PROPERTIES:
return True
if key.replace('_' + Source.__name__.lstrip('_'), "") in self.__class__._PROPERTIES:
return True
if key.replace(self.__class__.__name__, "") in self.__class__._PROPERTIES:
return True
return False
def __setattr__(self, key, value):
if self._check_key_in_properties(key):
self.__dict__[key] = value
return
if isinstance(value, _BaseCutBuilder):
value(key)
return
value = self.__validate_before_setting(key, value)
if key in self.__dict__:
field = self.__dict__[key]
if issubclass(type(field), _Column):
self.__update_field(field, value)
else:
raise AttributeError(f'Column "{key}" not found')
else:
assert not (
isinstance(value, _StateColumn) and value.obj_ref is None
), "State variables should be in `state` field"
self.__add_field(key, value)
def __validate_before_setting(self, key, value):
if key in ["Symbol", "_SYMBOL_NAME"]:
raise ValueError("Symbol setting is supported during creation only")
if key == "_state_vars":
raise ValueError("state field is necessary for keeping state variables and can't be rewritten")
if isinstance(value, ott.ExpressionDefinedTimeOffset):
value = value.n
if isinstance(value, np.generic):
value = value.item()
if not (ott.is_type_supported(ott.get_object_type(value))
or isinstance(value, _Operation)
or type(value) is tuple
or isinstance(value, _ColumnAggregation)):
raise TypeError(f'It is not allowed to set objects of "{type(value)}" type')
return value
def _update_field(self, field, value):
self.__update_field(field, value)
def __update_field(self, field, value):
if isinstance(value, _ColumnAggregation):
value.apply(self, str(field))
return
def _replace_positive_lag_operator_with_tmp_column(operation):
if isinstance(operation, _LagOperator) and operation.index > 0:
column = operation._op_params[0]
name = column.name
if name.startswith("__"):
raise ValueError("Column name started with two underscores should be used by system only, "
"please do not use such names.")
name = f"__{name}_{operation.index}_NEW__"
return _Column(name, column.dtype, column.obj_ref, precision=getattr(column, "_precision", None))
new_names, old_names = self.__get_old_and_new_names(_replace_positive_lag_operator_with_tmp_column, value)
if old_names:
self.sink(otq.AddFields(", ".join(f"{new} = {arg}" for arg, new in zip(old_names, new_names))))
if type(value) is tuple:
# support to be compatible with adding fields to get rid of some strange problems
# but really we do not use passed type, because update field does not support it
value, _ = value
convert_to_type = None
str_value = ott.value2str(value)
value_dtype = ott.get_object_type(value)
base_type = ott.get_base_type(value_dtype)
if base_type is bool:
# according OneTick
base_type = float
type_changes = False # because mantis 0021194
if base_type is str:
# update_field non-string field to string field (of any length) or value
# changes type to default string
if not issubclass(field.dtype, str):
field._dtype = str
type_changes = True
else:
if (
(issubclass(field.dtype, int) or
issubclass(field.dtype, float) or
issubclass(field.dtype, str))
and
(issubclass(value_dtype, ott.msectime) or
issubclass(value_dtype, ott.nsectime))
and
(str(field) != 'TIMESTAMP')
and
(not isinstance(field, _StateColumn))
):
# in OneTick after updating fields with functions that return datetime values
# the type of column will not change for long and double columns
# and will change to long (or double in older versions) when updating string column
# (see BDS-267)
# That's why we are explicitly setting type for returned value
convert_to_type = value_dtype
if issubclass(field.dtype, str):
# using update_field only for string because update_fields preserves type
# by default and raises exception if it can't be done
type_changes = True
elif issubclass(field.dtype, float) and base_type is int and not isinstance(field, _StateColumn):
# PY-574 if field was float and int, then
# it is still float in onetick, so no need to change type on otp level
convert_to_type = int
elif (issubclass(field.dtype, ott.msectime) or issubclass(field.dtype, ott.nsectime)) and base_type is int:
# if field was time type and we add something to it, then
# no need to change type
pass
elif issubclass(field.dtype, str) and base_type is int:
type_changes = True
convert_to_type = int
else:
if issubclass(value_dtype, bool):
value_dtype = float
if isinstance(field, _StateColumn):
pass # do nothing
else:
field._dtype = value_dtype
type_changes = True
# for aliases, TIMESTAMP ~ Time as an example
key = str(field)
if key == "TIMESTAMP":
self.__update_timestamp(key, value, str_value)
elif type_changes:
if (value_dtype in [otp.nsectime, int] and issubclass(field.dtype, str)):
# PY-416 string field changing the type from str to datetime leads to losing nanoseconds
# BE-142 similar issue with int from string: OneTick convert str to float, and then to int
# so we lose some precision for big integers
# work around is: make a new column first, delete accessor column
# and then recreate it with value from temp column
self.sink(otq.AddField(field=f"_TMP_{key}", value=str_value))
self.sink(otq.Passthrough(fields=key, drop_fields=True))
self.sink(otq.AddField(field=f"{key}", value=f"_TMP_{key}"))
self.sink(otq.Passthrough(fields=f"_TMP_{key}", drop_fields=True))
else:
self.sink(otq.UpdateField(field=key, value=str_value))
else:
self.sink(otq.UpdateFields(set=key + "=" + str_value))
if new_names:
self.sink(otq.Passthrough(drop_fields=True, fields=",".join(new_names)))
if convert_to_type:
# manual type conversion after update fields for some cases
self.table(**{key: convert_to_type}, inplace=True, strict=False)
def __update_timestamp(self, key, value, str_value):
if not hasattr(value, "dtype"):
# A constant value: no need to pre- or post-sort
self.sink(otq.UpdateField(field=key, value=str_value))
elif (
isinstance(value, _Column)
and not isinstance(value, _StateColumn)
and hasattr(value, "name")
and value.name in self.__dict__
):
# An existing, present column: no need to create a temporary one. See PY-253
need_to_sort = str_value not in ("_START_TIME", "_END_TIME")
if need_to_sort:
self.sort(value, inplace=True)
self.sink(otq.UpdateField(field=key, value=str_value))
if need_to_sort:
self.sort(self.Time, inplace=True)
elif (not is_compare(value) and isinstance(value, (_Operation, _LambdaIfElse))
or is_arithmetical(value)):
# An expression or a statevar: create a temp column with its value, pre- and post- sort.
self.sink(otq.AddField(field="__TEMP_TIMESTAMP__", value=str_value))
self.sink(otq.OrderByEp(order_by="__TEMP_TIMESTAMP__ ASC"))
self.sink(otq.UpdateField(field=key, value="__TEMP_TIMESTAMP__"))
self.sink(otq.Passthrough(fields="__TEMP_TIMESTAMP__", drop_fields=True))
self.sort(self.Time, inplace=True)
else:
raise Exception(f"Illegal type for timestamp assignment: {value.__class__}")
def __get_old_and_new_names(self, _replace_positive_lag_operator_with_tmp_column, value):
old_args, new_args = _replace_parameters(value, _replace_positive_lag_operator_with_tmp_column)
old_args = {str(old): (old, new) for old, new in zip(old_args, new_args)}
old_names = []
new_names = []
for op_str, (_, new) in old_args.items():
old_names.append(op_str)
new_names.append(str(new))
return new_names, old_names
def append(self, other) -> 'Source':
"""
Merge data source with `other`
Parameters
----------
other: List, Source
data source to merge
Returns
-------
Source
"""
if isinstance(other, list):
return onetick.py.functions.merge(other + [self])
else:
return onetick.py.functions.merge([self, other])
def __add_field(self, key, value):
# -------------
# ADD_FIELDS
# -------------
# TODO: merge together several add fields in one add_fields
if isinstance(value, _ColumnAggregation):
value.apply(self, key)
return
if type(value) is tuple:
value, dtype = value
else:
dtype = ott.get_object_type(value)
if type(value) is str:
if len(value) > ott.string.DEFAULT_LENGTH:
dtype = ott.string[len(value)]
if issubclass(dtype, bool):
# according to OneTick transformations
dtype = float
if issubclass(dtype, (ott.datetime, ott.date)):
# according to OneTick transformations
dtype = ott.nsectime
# TODO: shouldn't all such logic be in ott.type2str?
if np.issubdtype(dtype, np.integer):
dtype = int
if np.issubdtype(dtype, np.floating):
dtype = float
type_str = ott.type2str(dtype)
# format value
str_value = ott.value2str(value)
self.sink(otq.AddField(field=f'{type_str} {key}', value=str_value))
self.__dict__[key] = _Column(key, dtype, self)
[docs] def __getitem__(self, item):
"""
Allows to express multiple things:
- access a field by name
- filter ticks by condition
- select subset of fields
- set order of fields
Parameters
----------
item: str, :class:`Operation`, :func:`eval`, list of str
- ``str`` is to access column.
- ``Operation`` to express filter condition.
- ``otp.eval`` to express filter condition based on external query
- ``List[str]`` select subset of specified columns
- ``slice[List[str]::]`` set order of columns
- ``slice[Tuple[str, Type]::]`` type defaulting
- ``slice[:]`` alias to :meth:`Source.copy()`
- ``slice[int:int:int]`` select ticks the same way as elements in python lists
Returns
-------
Column, Source or tuple of Sources
- Column if column name was specified.
- Two sources if filtering expression or eval was provided: the first one is for ticks that pass condition
and the second one that do not.
Examples
--------
Access to the `X` column: add `Y` based on `X`
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data['Y'] = data['X'] * 2
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.000 1 2
1 2003-12-01 00:00:00.001 2 4
2 2003-12-01 00:00:00.002 3 6
Filtering based on expression
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data_more, data_less = data[(data['X'] > 2)]
>>> otp.run(data_more)
Time X
0 2003-12-01 00:00:00.002 3
>>> otp.run(data_less)
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
Filtering based on the result of another query. Another query should
have only one tick as a result with only one field (whatever it names).
>>> exp_to_select = otp.Ticks(WHERE=['X > 2'])
>>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1])
>>> data, _ = data[otp.eval(exp_to_select)]
>>> otp.run(data)
Time X Y Z
0 2003-12-01 00:00:00.002 3 c 0.1
Select subset of specified columns
>>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1])
>>> data = data[['X', 'Z']]
>>> otp.run(data)
Time X Z
0 2003-12-01 00:00:00.000 1 0.4
1 2003-12-01 00:00:00.001 2 0.3
2 2003-12-01 00:00:00.002 3 0.1
Slice with list will keep all columns, but change order:
>>> data=otp.Tick(Y=1, X=2, Z=3)
>>> otp.run(data)
Time Y X Z
0 2003-12-01 1 2 3
>>> data = data[['X', 'Y']:]
>>> otp.run(data)
Time X Y Z
0 2003-12-01 2 1 3
Slice can be used as short-cut for :meth:`Source.copy`:
>>> data[:] # doctest: +ELLIPSIS
<onetick.py.sources.Tick object at ...>
Slices can use integers.
In this case ticks are selected the same way as elements in python lists.
>>> data = otp.Ticks({'A': [1, 2, 3, 4, 5]})
Select first 3 ticks:
>>> otp.run(data[:3])
Time A
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
Skip first 3 ticks:
>>> otp.run(data[3:])
Time A
0 2003-12-01 00:00:00.003 4
1 2003-12-01 00:00:00.004 5
Select last 3 ticks:
>>> otp.run(data[-3:])
Time A
0 2003-12-01 00:00:00.002 3
1 2003-12-01 00:00:00.003 4
2 2003-12-01 00:00:00.004 5
Skip last 3 ticks:
>>> otp.run(data[:-3])
Time A
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
Skip first and last tick:
>>> otp.run(data[1:-1])
Time A
0 2003-12-01 00:00:00.001 2
1 2003-12-01 00:00:00.002 3
2 2003-12-01 00:00:00.003 4
Select every second tick:
>>> otp.run(data[::2])
Time A
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.002 3
2 2003-12-01 00:00:00.004 5
Select every second tick, not including first and last tick:
>>> otp.run(data[1:-1:2])
Time A
0 2003-12-01 00:00:00.001 2
1 2003-12-01 00:00:00.003 4
See Also
--------
| :meth:`Source.table`: another and more generic way to select subset of specified columns
| **PASSTHROUGH** OneTick event processor
| **WHERE_CLAUSE** OneTick event processor
"""
strict = True
if isinstance(item, (_Operation, _QueryEvalWrapper)):
if isinstance(item, _Operation):
item = item._make_python_way_bool_expression()
where = self.copy(ep=otq.WhereClause(where=str(item)))
if_source = where.copy()
if_source.node().out_pin("IF")
else_source = where.copy()
else_source.node().out_pin("ELSE")
# TODO: add ability to remove then this ep, because it is required only for right output
else_source.sink(otq.Passthrough())
return if_source, else_source
if isinstance(item, slice):
result = self._get_integer_slice(item)
if result:
return result
if item.step:
raise AttributeError("Source columns slice with step set makes no sense")
if item.start and item.stop:
raise AttributeError("Source columns slice with both start and stop set is not available now")
if not item.start and item.stop:
raise AttributeError("Source columns slice with only stop set is not implemented yet")
if item.start is None and item.stop is None:
return self.copy()
item = item.start
strict = False
if isinstance(item, tuple):
item = dict([item])
elif isinstance(item, list):
if not item:
return self.copy()
item_type = list(set([type(x) for x in item]))
if len(item_type) > 1:
raise AttributeError(f"Different types {item_type} in slice list is not supported")
if item_type[0] == tuple:
item = dict(item)
if isinstance(item, list):
# ---------
# TABLE
# ---------
items = []
for it in item:
if isinstance(it, _Column):
items.append(it.name)
elif isinstance(it, str):
items.append(it)
else:
raise ValueError(f"It is not supported to filter '{it}' object of '{type(it)}' type")
# validation
for item in items:
if item not in self.schema:
existing_columns = ", ".join(self.schema.keys())
raise AttributeError(
f"There is no '{item}' column. There are existing columns: {existing_columns}"
)
columns = {}
dtypes = self.columns(skip_meta_fields=True)
for column_name in items:
if column_name not in ['Time', 'Timestamp', 'TIMESTAMP']:
columns[column_name] = dtypes[column_name]
return self.table(strict=strict, **columns)
if isinstance(item, dict):
return self.table(strict=strict, **item)
name, dtype = "", None
# way to set type
if isinstance(item, tuple):
name, dtype = item
else:
name = item
if name not in self.__dict__:
if dtype is None:
raise KeyError(f'Column name {name} is not in the schema. Please, check that this column '
'is in the schema or add it using the .schema property')
if name == 0 or name == 1:
raise ValueError(f"constant {name} are not supported for indexing for now, please use otp.Empty")
if type(name) in (int, float):
raise ValueError("integer indexes are not supported")
self.__dict__[name] = _Column(name, dtype, self)
else:
if not isinstance(self.__dict__[name], _Column):
raise AttributeError(f"There is no '{name}' column")
if dtype:
type1, type2 = self.__dict__[name].dtype, dtype
b_type1, b_type2 = ott.get_base_type(type1), ott.get_base_type(type2)
if b_type1 != b_type2:
if {type1, type2} == {int, float}:
self.__dict__[name]._dtype = float
else:
raise Warning(
f"Column '{name}' was declared as '{type1}', but you want to change it to '{type2}', "
"that is not possible without setting type directly via assigning value"
)
else:
if issubclass(b_type1, str):
t1_length = ott.string.DEFAULT_LENGTH if type1 is str else type1.length
t2_length = ott.string.DEFAULT_LENGTH if type2 is str else type2.length
self.__dict__[name]._dtype = type2 if t1_length < t2_length else type1
if {type1, type2} == {ott.nsectime, ott.msectime}:
self.__dict__[name]._dtype = ott.nsectime
return self.__dict__[name]
def _get_integer_slice(self, item: slice) -> Optional['Source']:
"""
Treat otp.Source object as a sequence of ticks
and apply common python integer slicing logic to it.
"""
start, stop, step = item.start, item.stop, item.step
for v in (start, stop, step):
if v is not None and not isinstance(v, int):
return
# let's filter out cases that we don't want to support
if step is not None and step <= 0:
raise ValueError("step value can't be negative or zero")
if stop is not None and stop == 0:
raise ValueError("stop value can't be zero")
if start and stop and start > 0 and stop > 0 and start >= stop:
raise ValueError("stop value can't be less than start")
if start and start < 0 and stop and stop > 0:
raise ValueError("start value can't be negative when start value is positive")
def add_counter(src, force=False):
if '__NUM__' not in src.schema or force:
if '__NUM__' in src.schema:
src = src.drop('__NUM__')
src = src.agg({'__NUM__': otp.agg.count()}, running=True, all_fields=True)
return src
result = self.copy()
if start:
if start > 0:
result = add_counter(result)
result, _ = result[result['__NUM__'] > start]
if start < 0:
result = result.last(-start)
if stop:
if stop > 0:
result = add_counter(result)
result, _ = result[result['__NUM__'] <= stop]
if stop < 0:
result = add_counter(result)
last_ticks = result.last(-stop)
last_ticks['__FLAG__'] = 1
last_ticks = last_ticks[['__FLAG__', '__NUM__']]
result = otp.join(result, last_ticks,
on=result['__NUM__'] == last_ticks['__NUM__'],
how='outer', rprefix='RIGHT')
result, _ = result[result['__FLAG__'] == 0]
result = result.drop(['__FLAG__', 'RIGHT___NUM__'])
if step:
if step > 0:
# resetting counter
result = add_counter(result, force=True)
result, _ = result[(result['__NUM__'] - 1) % step == 0]
if '__NUM__' in result.schema:
result = result.drop('__NUM__')
return result
[docs] def __setitem__(self, key, value):
"""
Add new column to the source or update existing one.
Parameters
----------
key: str
The name of the new or existing column.
value: int, str, float, datetime, date, \
:py:class:`~onetick.py.Column`, :py:class:`~onetick.py.Operation`, :py:class:`~onetick.py.string`, \
:py:class:`otp.date <onetick.py.date>`, :py:class:`otp.datetime <onetick.py.datetime>`, \
:py:class:`~onetick.py.nsectime`, :py:class:`~onetick.py.msectime`
The new value of the column.
See also
--------
| **ADD_FIELD** OneTick event processor
| **UPDATE_FIELD** OneTick event processor
Examples
--------
>>> data = otp.Tick(A='A')
>>> data['D'] = otp.datetime(2022, 2, 2)
>>> data['X'] = 1
>>> data['Y'] = data['X']
>>> data['X'] = 12345
>>> data['Z'] = data['Y'].astype(str) + 'abc'
>>> otp.run(data)
Time A D X Y Z
0 2003-12-01 A 2022-02-02 12345 1 1abc
"""
return self.__setattr__(key, value)
[docs] @inplace_operation
def update(self, if_set, else_set=None, where=1, inplace=False) -> 'Source':
"""
Update field of the Source or state variable.
Parameters
----------
if_set: dict
Dictionary <field name>: <expression>.
else_set: dict, optional
Dictionary <field name>: <expression>
where: expression, optional
Condition of updating.
If ``where`` is True the fields from ``if_set`` will be updated with corresponding expression.
If ``where`` is False, the fields from ``else_set`` will be updated with corresponding expression.
inplace: bool
A flag controls whether operation should be applied inplace.
If ``inplace=True``, then it returns nothing. Otherwise method
returns a new modified object.
Returns
-------
:class:`Source` or ``None``.
See also
--------
**UPDATE_FIELD** and **UPDATE_FIELDS** OneTick event processors
Examples
--------
Columns can be updated with this method:
>>> # OTdirective: snippet-name: Arrange.conditional update;
>>> t = otp.Ticks({'X': [1, 2, 3],
... 'Y': [4, 5, 6],
... 'Z': [1, 0, 1]})
>>> t = t.update(if_set={'X': t['X'] + t['Y']},
... else_set={'X': t['X'] - t['Y']},
... where=t['Z'] == 1)
>>> otp.run(t) # OTdirective: snippet-example;
Time X Y Z
0 2003-12-01 00:00:00.000 5 4 1
1 2003-12-01 00:00:00.001 -3 5 0
2 2003-12-01 00:00:00.002 9 6 1
State variables can be updated too:
>>> t = otp.Ticks({'X': [1, 2, 3],
... 'Y': [4, 5, 6],
... 'Z': [1, 0, 1]})
>>> t.state_vars['X'] = 0
>>> t = t.update(if_set={t.state_vars['X']: t['X'] + t['Y']},
... else_set={t.state_vars['X']: t['X'] - t['Y']},
... where=t['Z'] == 1)
>>> t['UX'] = t.state_vars['X']
>>> otp.run(t)
Time X Y Z UX
0 2003-12-01 00:00:00.000 1 4 1 5
1 2003-12-01 00:00:00.001 2 5 0 -3
2 2003-12-01 00:00:00.002 3 6 1 9
"""
if else_set is None:
else_set = {}
if len(if_set) == 0 or not isinstance(if_set, dict):
raise ValueError(
f"'if_set' parameter should be non empty dict, but got '{if_set}' of type '{type(if_set)}'"
)
def _prepare(to_prepare):
result = {}
for in_obj, out_obj in to_prepare.items():
if isinstance(in_obj, _StateColumn):
result[in_obj] = out_obj
elif isinstance(in_obj, _Column):
result[in_obj.name] = out_obj
elif isinstance(in_obj, str):
result[in_obj.strip()] = out_obj
else:
raise AttributeError(
f"It is not supported to update item '{in_obj}' of type '{type(in_obj)}'"
)
return result
def _validate(to_validate):
result = {}
for in_key, out_obj in to_validate.items():
if isinstance(in_key, _StateColumn):
in_dtype = in_key.dtype
else:
if not (in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column)):
raise AttributeError(f"There is no '{in_key}' column to update")
if in_key == "Time" or in_key == "TIMESTAMP":
raise ValueError("It is not allowed to modify 'Time' column using .update method")
in_dtype = self.schema[in_key]
dtype = ott.get_object_type(out_obj)
if not ott.is_type_supported(dtype):
raise TypeError(f"Deduced type of object {out_obj} is not supported: {dtype}")
# will raise exception if types are incompatible
_ = ott.get_type_by_objects([dtype, in_dtype])
if isinstance(in_key, _StateColumn):
in_key = str(in_key)
if isinstance(out_obj, bool):
out_obj = int(out_obj)
result[in_key] = out_obj
return result
# prepare and validate
items = _validate(_prepare(if_set))
else_items = _validate(_prepare(else_set))
if isinstance(where, bool):
where = int(where)
if not (getattr(where, "dtype", None) is bool
or isinstance(where, int)):
raise ValueError(f"Where has not supported type '{type(where)}'")
# apply
set_rules = [f"{key}=({ott.value2str(value)})" for key, value in items.items()]
else_set_rules = [f"{key}=({ott.value2str(value)})" for key, value in else_items.items()]
self.sink(otq.UpdateFields(set=",".join(set_rules), else_set=",".join(else_set_rules), where=str(where)))
return self
[docs] @docstring(parameters=[_agg_doc, _running_doc, _all_fields_with_policy_doc, _bucket_interval_doc, _bucket_time_doc,
_bucket_units_doc, _bucket_end_condition_doc, _end_condition_per_group_doc,
_boundary_tick_bucket_doc, _group_by_doc], add_self=True)
def agg(self, aggs, *args, **kwargs) -> 'Source':
"""
Applies composition of :ref:`otp.agg <aggregations_funcs>` aggregations
See Also
--------
| :ref:`Aggregations <aggregations_funcs>`
| **COMPUTE** OneTick event processor
Examples
--------
By default the whole data is aggregated:
>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X')})
>>> otp.run(data)
Time X_SUM
0 2003-12-04 10
Multiple aggregations can be applied at the same time:
>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X'),
... 'X_MEAN': otp.agg.average('X')})
>>> otp.run(data)
Time X_SUM X_MEAN
0 2003-12-04 10 2.5
Aggregation can be used in running mode:
>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True)
>>> otp.run(data)
Time CUM_SUM
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 3
2 2003-12-01 00:00:00.002 6
3 2003-12-01 00:00:00.003 10
Aggregation can be split in buckets:
>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks')
>>> otp.run(data)
Time X_SUM
0 2003-12-01 00:00:00.001 3
1 2003-12-01 00:00:00.003 7
Running aggregation can be used with buckets too. In this case (all_fields=False and running=True) output ticks
are created when a tick enters or leaves the sliding window (that's why for this example there are 8 output
ticks for 4 input ticks):
>>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600])
>>> data = data.agg(dict(X_MEAN=otp.agg.average("X"),
... X_STD=otp.agg.stddev("X")),
... running=True, bucket_interval=2)
>>> otp.run(data)
Time X_MEAN X_STD
0 2003-12-01 00:00:00.000 1.0 0.000000
1 2003-12-01 00:00:01.000 1.5 0.500000
2 2003-12-01 00:00:01.500 2.0 0.816497
3 2003-12-01 00:00:02.000 2.5 0.500000
4 2003-12-01 00:00:03.000 3.0 0.000000
5 2003-12-01 00:00:03.500 NaN NaN
6 2003-12-01 00:00:03.600 4.0 0.000000
7 2003-12-01 00:00:05.600 NaN NaN
If all_fields=True an output tick is generated only for arrival events, but all attributes from the input tick
causing an arrival event are copied over to the output tick and the aggregation is added as another attribute:
>>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600])
>>> data = data.agg(dict(X_MEAN=otp.agg.average("X"),
... X_STD=otp.agg.stddev("X")),
... all_fields=True, running=True)
>>> otp.run(data)
Time X X_MEAN X_STD
0 2003-12-01 00:00:00.000 1 1.0 0.000000
1 2003-12-01 00:00:01.000 2 1.5 0.500000
2 2003-12-01 00:00:01.500 3 2.0 0.816497
3 2003-12-01 00:00:03.600 4 2.5 1.118034
``all_fields`` parameter can be used when there is need to have all original fields in the output:
>>> ticks = otp.Ticks(X=[3, 4, 1, 2])
>>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"),
... X_STD=otp.agg.stddev("X")),
... all_fields=True)
>>> otp.run(data)
Time X X_MEAN X_STD
0 2003-12-04 3 2.5 1.118034
There are different politics for ``all_fields`` parameter:
>>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"),
... X_STD=otp.agg.stddev("X")),
... all_fields="last")
>>> otp.run(data)
Time X X_MEAN X_STD
0 2003-12-04 2 2.5 1.118034
For low/high policies the field selected as input is set this way:
>>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"),
... X_STD=otp.agg.stddev("X")),
... all_fields=otp.agg.low_tick(data["X"]))
>>> otp.run(data)
Time X X_MEAN X_STD
0 2003-12-04 1 2.5 1.118034
"""
aggs = aggs.copy()
result = self.copy()
what_to_aggregate = aggregations.compute(
*args,
**kwargs,
)
for name, ag in aggs.items():
what_to_aggregate.add(name, ag)
result = what_to_aggregate.apply(result)
result._add_table()
return result
def sort_values(self, *args, **kwargs):
"""
alias of sort
See Also
--------
:meth:`Source.sort`
"""
return self.sort(*args, **kwargs)
[docs] @inplace_operation
def sort(self,
by: Union[str, Collection[Union[str, 'onetick.py.Column']]],
ascending=True,
inplace=False) -> Optional['Source']:
""" Sort ticks by columns.
Parameters
----------
by: str, Column or list of them
Column(s) to sort by. It is possible to pass a list of column, where is the order is important:
from the left to the right.
ascending: bool or list
Order to sort by. If list of columns is specified, then list of ascending values per column is expected.
(the :class:`nan` is the smallest for ``float`` type fields)
inplace: bool
A flag controls whether operation should be applied inplace.
If ``inplace=True``, then it returns nothing. Otherwise method
returns a new modified object
Returns
-------
:class:`Source`
See also
--------
**ORDER_BY** OneTick event processor
Examples
--------
Single column examples
>>> data = otp.Ticks({'X':[ 94, 5, 34],
... 'Y':[otp.nan, 3.1, -0.3]})
>>> data = data.sort(data['X'])
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.001 5 3.1
1 2003-12-01 00:00:00.002 34 -0.3
2 2003-12-01 00:00:00.000 94 NaN
>>> data = otp.Ticks({'X':[ 94, 5, 34],
... 'Y':[otp.nan, 3.1, -0.3]})
>>> data = data.sort(data['Y'])
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.000 94 NaN
1 2003-12-01 00:00:00.002 34 -0.3
2 2003-12-01 00:00:00.001 5 3.1
Inplace
>>> data = otp.Ticks({'X':[ 94, 5, 34],
... 'Y':[otp.nan, 3.1, -0.3]})
>>> data.sort(data['Y'], inplace=True) # OTdirective: snippet-name: Arrange.sort.inplace;
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.000 94 NaN
1 2003-12-01 00:00:00.002 34 -0.3
2 2003-12-01 00:00:00.001 5 3.1
Multiple columns
>>> data = otp.Ticks({'X':[ 5, 6, 3, 6],
... 'Y':[1.4, 3.1, 9.1, 5.5]})
>>> data = data.sort([data['X'], data['Y']])
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.002 3 9.1
1 2003-12-01 00:00:00.000 5 1.4
2 2003-12-01 00:00:00.001 6 3.1
3 2003-12-01 00:00:00.003 6 5.5
Ascending/descending control
>>> data = otp.Ticks({'X':[ 94, 5, 34],
... 'Y':[otp.nan, 3.1, -0.3]})
>>> data = data.sort(data['X'], ascending=False)
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.000 94 NaN
1 2003-12-01 00:00:00.002 34 -0.3
2 2003-12-01 00:00:00.001 5 3.1
>>> # OTdirective: snippet-name: Arrange.sort.sort;
>>> data = otp.Ticks({'X':[ 5, 6, 3, 6],
... 'Y':[1.4, 3.1, 9.1, 5.5]})
>>> data = data.sort([data['X'], data['Y']], ascending=[True, False])
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.002 3 9.1
1 2003-12-01 00:00:00.000 5 1.4
2 2003-12-01 00:00:00.003 6 5.5
3 2003-12-01 00:00:00.001 6 3.1
"""
columns = by
if isinstance(columns, list):
objs = columns
else:
objs = [columns]
if isinstance(ascending, list):
asc_objs = ascending
else:
asc_objs = [ascending]
items = []
# -------------------------------
# Columns processing
# -------------------------------
# convert to strings
# TODO: it seems as a common code, need to move to a separate function
for obj in objs:
if isinstance(obj, _Column):
items.append(obj.name)
elif isinstance(obj, str):
items.append(obj)
else:
# TODO: cover with tests
raise TypeError(f"It is not supported to order by '{obj}' of type '{type(obj)}'")
# validate
for item in items:
if item in self.__dict__:
if not isinstance(self.__dict__[item], _Column):
# TODO: cover with tests
raise AttributeError(f"There is no '{item}' column")
# if
else:
# TODO: covert with tests
raise AttributeError(f"There is no '{item}' column")
# -------------------------------
# Asc processing
# -------------------------------
asc_items = [True] * len(items)
def asc_convert(v):
return "ASC" if v else "DESC"
for inx in range(len(items)):
if inx >= len(asc_objs):
asc_obj = asc_items[inx]
else:
asc_obj = asc_objs[inx]
if isinstance(asc_obj, bool):
asc_items[inx] = asc_convert(asc_obj)
elif isinstance(asc_obj, int):
asc_items[inx] = asc_convert(asc_obj)
else:
raise TypeError(f"asc can not be '{asc_obj}' of type '{type(asc_obj)}'")
# ---------------
# combine together
order_by = [column_name + " " + asc for column_name, asc in zip(items, asc_items)]
self.sink(otq.OrderByEp(order_by=",".join(order_by)))
return self
def _hash(self):
return self.__hash
def _merge_tmp_otq(self, source):
self._tmp_otq.merge(source._tmp_otq)
def __prepare_graph(self, symbols=None, start=None, end=None, has_output=False):
# We copy object here, because we will change it according to passed
# symbols and date ranges. For example, we can add modify_query_times EP
# if it is necessary
obj = self.copy()
if has_output:
obj.sink(otq.Passthrough())
start, end, symbols = obj._get_date_range(symbols, start, end)
if start is adaptive:
start = None
if end is adaptive:
end = None
if symbols is not None and isinstance(symbols, pd.DataFrame):
symbols = utils.get_symbol_list_from_df(symbols)
if symbols is not None and not isinstance(symbols, list):
symbols = [symbols]
elif symbols is None:
symbols = []
_symbols = []
for sym in symbols:
_symbols.append(self._convert_symbol_to_string(sym, tmp_otq=obj._tmp_otq, start=start, end=end))
return obj, start, end, _symbols
[docs] def to_otq(self, file_name=None, file_suffix=None, query_name=None, symbols=None, start=None, end=None,
timezone=None, raw=None, add_passthrough=True):
"""
Save data source to .otq file and return path to the saved file.
Parameters
----------
file_name: str
Absolute or relative path to the saved file.
If ``None``, create temporary file and name it randomly.
file_suffix: str
Suffix to add to the saved file name (including extension).
Can be specified if ``file_name`` is ``None``
to distinguish between different temporary files.
Default: ".to_otq.otq"
query_name: str
Name of the main query in the created file.
If ``None``, take name from this Source object.
If that name is empty, set name to "query".
symbols: str, list, :pandas:`DataFrame <pandas.DataFrame>`, :class:`Source`
symbols to save query with
start: datetime
start time to save query with
end: datetime
end time to save query with
timezone: str
timezone to save query with
raw
.. deprecated:: 1.4.17
add_passthrough: bool
will add :py:class:`onetick.query.Passthrough` event processor at the end of resulting graph
Returns
-------
result: str
Relative (if ``file_name`` is relative) or absolute path to the created query
in the format ``file_name::query_name``
"""
if raw is not None:
warnings.warn('The "raw" flag is deprecated and makes no effect', DeprecationWarning)
if timezone is None:
timezone = configuration.config.tz
file_path = str(file_name) if file_name is not None else None
if file_suffix is None:
file_suffix = self._name_suffix('to_otq.otq')
if query_name is None:
query_name = self.get_name(remove_invalid_symbols=True)
if query_name is None:
query_name = 'query'
obj, start, end, symbols = self.__prepare_graph(symbols, start, end)
graph = obj._to_graph(add_passthrough=add_passthrough)
graph.set_symbols(symbols)
return obj._tmp_otq.save_to_file(query=graph, query_name=query_name, file_path=file_path,
file_suffix=file_suffix, start=start, end=end, timezone=timezone)
def _store_in_tmp_otq(self, tmp_otq, operation_suffix="tmp_query", symbols=None, start=None, end=None,
raw=None, add_passthrough=True, name=None):
"""
Adds this source to the tmp_otq storage
Parameters:
tmp_otq: TmpOtq
Storage object
operation_suffix: str
Suffix string to be added to the autogenerated graph name in the otq file
name: str, optional
If specified, this ``name`` will be used to save query
and ``suffix`` parameter will be ignored.
Returns:
result: str
String with the name of the saved graph (starting with THIS::)
"""
# We copy object here, because we will change it according to passed
# symbols and date ranges. For example, we can add modify_query_times EP
# if it is necessary
if raw is not None:
warnings.warn('The "raw" flag is deprecated and makes no effect', DeprecationWarning)
obj = self.copy()
start, end, symbols = obj._get_date_range(symbols, start, end)
tmp_otq.merge(obj._tmp_otq)
if symbols and not isinstance(symbols, list):
symbols = [symbols]
elif symbols is None:
symbols = []
_symbols = []
for sym in symbols:
_symbols.append(self._convert_symbol_to_string(sym, tmp_otq))
if isinstance(start, ott.dt): # OT save_to_file checks for the datetime time
start = datetime.fromtimestamp(start.timestamp())
if isinstance(end, ott.dt):
end = datetime.fromtimestamp(end.timestamp())
graph = obj._to_graph(add_passthrough=add_passthrough)
graph.set_start_time(start)
graph.set_end_time(end)
graph.set_symbols(_symbols)
suffix = self._name_suffix(suffix=operation_suffix, separator='__', remove_invalid_symbols=True)
return tmp_otq.add_query(graph, suffix=suffix, name=name)
def _save_as_tmp_otq(self, start=None, end=None, symbols=None, suffix=""):
tmp_otq = utils.TmpFile(f"{suffix}.otq")
query_path = self.to_otq(tmp_otq, symbols=symbols, start=start, end=end)
return query_path
[docs] def plot(self, y, x='Time', kind='line', **kwargs):
"""
Executes the query with known properties and builds a plot resulting dataframe.
Uses the pandas dataframe plot method to plot data.
Other parameters could be specified through the ``kwargs``.
Parameters
----------
x: str
Column name for the X axis
y: str
Column name for the Y axis
kind: str
The kind of plot
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data.plot(y='X', kind='bar') # doctest: +SKIP
"""
result = self.copy()
return result[[y, x]]().plot(x=x, y=y, kind=kind, **kwargs)
[docs] def count(self, **kwargs):
"""
Returns the number of ticks in the query.
Adds an aggregation that calculate total ticks count, and executes a query.
Result is a single value -- number of ticks. Possible application is the jupyter when
a developer wants to check data presences for example.
Parameters
----------
kwargs
parameters that will be passed to :py:func:`otp.run <onetick.py.run>`
Returns
-------
int
See Also
--------
:py:func:`otp.run <onetick.py.run>`
Source.head, Source.tail, :py:func:`onetick.py.agg.count`
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data.count()
3
>>> data = otp.Empty()
>>> data.count()
0
"""
result = self.copy()
result = otp.run(result.agg({'__num_rows': otp.agg.count()}), **kwargs)
if result.empty:
return 0
return int(result['__num_rows'][0])
[docs] def head(self, n=5, **kwargs) -> 'Union[pd.DataFrame, Source]':
"""
Executes the query and returns first ``n`` ticks as a pandas dataframe.
It is useful in the jupyter case when you want to observe first ``n`` values.
Parameters
----------
n: int, default=5
number of ticks to return
kwargs:
parameters will be passed to :py:func:`otp.run <onetick.py.run>`
Returns
-------
:pandas:`DataFrame <pandas.DataFrame>`
See Also
--------
:py:func:`otp.run <onetick.py.run>`
Source.tail, Source.count
Examples
--------
>>> data = otp.Ticks(X=list('abcdefgik'))
>>> data.head()[['X']]
X
0 a
1 b
2 c
3 d
4 e
"""
result = self.copy()
result = result.first(n=n) # pylint: disable=E1123
return otp.run(result, **kwargs)
[docs] def tail(self, n=5, **kwargs) -> Union['pd.DataFrame', 'Source']:
"""
Executes the query and returns last ``n`` ticks as a pandas dataframe.
It is useful in the jupyter case when you want to observe last ``n`` values.
Parameters
----------
n: int
number of ticks to return
kwargs:
parameters will be passed to :py:func:`otp.run <onetick.py.run>`
Returns
-------
:pandas:`DataFrame <pandas.DataFrame>`
See Also
--------
:py:func:`otp.run <onetick.py.run>`
:meth:`Source.head`, :meth:`Source.count`
Examples
--------
>>> data = otp.Ticks(X=list('abcdefgik'))
>>> data.tail()[['X']]
X
0 e
1 f
2 g
3 i
4 k
"""
result = self.copy()
result = result.last(n=n) # pylint: disable=E1123
return otp.run(result, **kwargs)
def _rename_impl_(self, columns=None, use_regex=False, fields_to_skip=None):
# prepare
items = {}
out_names = set()
fields_to_skip = fields_to_skip or []
for in_obj, out_obj in columns.items():
if isinstance(in_obj, _Column):
items[in_obj.name.strip()] = out_obj.strip()
elif isinstance(in_obj, str):
items[in_obj.strip()] = out_obj.strip()
else:
raise Exception(f"It is not supported to rename item '{in_obj}' of type {type(in_obj)}'")
if out_obj in out_names:
raise AttributeError(
f"You want to rename '{in_obj}' into '{out_obj}', "
f"but also want to rename another column into '{out_obj}'"
)
out_names.add(out_obj)
schema_update_dict = items
if use_regex:
schema_update_dict = self._get_columns_names_renaming(items, fields_to_skip)
# validate
for in_key, out_key in schema_update_dict.items():
if " " in out_key:
raise AttributeError(f"There is space in '{out_key}' column name")
if in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column):
pass
else:
raise AttributeError(f"There is no '{in_key}' column to rename")
if out_key in self.__dict__ and isinstance(self.__dict__[out_key], _Column):
raise AttributeError(f"Column '{out_key}' is already exist")
# apply
for in_key, out_key in schema_update_dict.items():
self.__dict__[in_key].rename(out_key, update_parent_object=False)
self.__dict__[out_key] = self.__dict__[in_key]
del self.__dict__[in_key]
rename_rules = [key + "=" + value for key, value in items.items()]
kwargs = dict(rename_fields=",".join(rename_rules))
if use_regex:
kwargs['use_regex'] = True
if fields_to_skip:
kwargs['fields_to_skip'] = ",".join(fields_to_skip)
self.sink(otq.RenameFieldsEp(**kwargs))
def _get_columns_names_renaming(self, rename: Dict[str, str], not_to_rename: List[str]) -> Dict[str, str]:
"""
We can't be sure python Source has consistent columns cache, because sinking complex event processors
can change columns unpredictable, so if user will specify regex as a param, we will pass regex
as an onetick's param, but rename all matched columns from python Source cache.
Parameters
----------
rename:
Dict old_name -> new_name. Some of the dictionary's items use regex.
Returns
-------
output_dict:
Dict old_name -> new_name used for renaming columns in Source. None of this dictionary's items use regex.
"""
output_dict = {}
for old_name, new_name in rename.items():
matching_columns = [col for col in self.schema
if re.match(old_name, col) and not self._is_excluded(col, not_to_rename)]
for col in matching_columns:
output_dict[col] = re.sub(old_name, new_name, col)
return output_dict
@staticmethod
def _is_excluded(s: str, not_to_rename: List[str]) -> bool:
for excluded in not_to_rename:
if re.match(excluded, s):
return True
return False
[docs] @inplace_operation
def rename(self, columns=None, use_regex=False, fields_to_skip=None, inplace=False) -> Optional['Source']:
r"""
Rename columns
Parameters
----------
columns : dict
Rules how to rename in the following format: {<column> : <new-column-name>},
where <column> is either existing column name of str type or reference to a column,
and <new-column-name> a new column name of str type.
use_regex: bool
If true, then old-name=new-name pairs in the `columns` parameter are treated as regular expressions.
This allows bulk renaming for field names. Notice that regular expressions for old names are treated as
if both their prefix and their suffix are .*, i.e. the prefix and suffix match any substring.
As a result, old-name *XX* will match all of *aXX*, *aXXB*, and *XXb*, when `use_regex=true`.
You can have old-name begin from ^ to indicate that .* prefix does not apply,
and you can have old name end at $ to indicate that .* suffix does not apply.
Default: false
fields_to_skip: list of str
A list of regular expressions for specifying fields that should be skipped
(i.e., not be renamed). If a field is matched by one of the specified regular expressions,
it won't be considered for renaming.
Default: None
inplace : bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing. Otherwise, method returns a new modified
object.
Returns
-------
:class:`Source` or ``None``
See also
--------
**RENAME** OneTick event processor
Examples
--------
>>> data = otp.Ticks(X=[1], Y=[2])
>>> data = data.rename({'X': 'XX',
... data['Y']: 'YY'})
>>> otp.run(data)
Time XX YY
0 2003-12-01 1 2
>>> data = otp.Tick(**{'X.X': 1, 'X.Y': 2})
>>> data = data.rename({r'X\.(.*)': r'\1'}, use_regex=True)
>>> otp.run(data)
Time X Y
0 2003-12-01 1 2
>>> data = otp.Tick(**{'X.X': 1, 'X.Y': 2})
>>> data = data.rename({r'X\.(.*)': r'\1'}, use_regex=True, fields_to_skip=['X.Y'])
>>> otp.run(data)
Time X X.Y
0 2003-12-01 1 2
"""
if columns is None:
columns = {}
self._rename_impl_(columns, use_regex, fields_to_skip)
return self
[docs] @inplace_operation
def execute(self, *operations, inplace=False) -> Optional['Source']:
"""
Execute operations without returning their values.
Some operations in onetick.py can be used to modify the state of some object
(tick sequences mostly) and in that case user may not want to save the result of the
operation to column.
Parameters
----------
operations : list of :py:class:`~onetick.py.Operation`
operations to execute.
inplace : bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
object.
Returns
-------
:class:`Source` or ``None``
See Also
--------
**EXECUTE_EXPRESSIONS** OneTick event processor
Examples
--------
>>> data = otp.Tick(A=1)
>>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'A')
>>> data = data.execute(data.state_vars['SET'].erase(A=1))
"""
if not operations:
raise ValueError('At least one operation must be specified in execute() method')
op_str = ';'.join(map(str, operations))
self.sink(otq.ExecuteExpressions(op_str))
return self
def __refresh_hash(self):
"""
This internal function refreshes hash for every graph modification.
It is used only in _ProxyNode, because it tracks nodes changes
"""
self.__hash = uuid.uuid3(uuid.NAMESPACE_DNS, str(self.__hash))
def _prepare_for_execution(self, symbols=None, start=None, end=None, start_time_expression=None,
end_time_expression=None, timezone=None, has_output=None,
running_query_flag=None, require_dict=False, node_name=None):
if has_output is None:
has_output = self.__source_has_output
if timezone is None:
timezone = configuration.config.tz
obj, start, end, symbols = self.__prepare_graph(symbols, start, end, has_output)
require_dict = require_dict or _is_dict_required(symbols)
if node_name is None:
node_name = 'SOURCE_CALL_MAIN_OUT_NODE'
obj.node().node_name(node_name)
graph = obj._to_graph(add_passthrough=False)
graph.set_symbols(symbols)
tmp_file = utils.TmpFile(suffix=self._name_suffix('run.otq'))
query_to_run = obj._tmp_otq.save_to_file(query=graph,
query_name=self.get_name(remove_invalid_symbols=True)
if self.get_name(remove_invalid_symbols=True) else "main_query",
file_path=tmp_file.path,
start=start, end=end,
running_query_flag=running_query_flag,
start_time_expression=start_time_expression,
end_time_expression=end_time_expression,
timezone=timezone)
# symbols and start/end times should be already stored in the query and should not be passed again
return dict(
query=query_to_run,
symbols=None,
start=None,
end=None,
start_time_expression=None,
end_time_expression=None,
require_dict=require_dict,
node_name=node_name,
time_as_nsec=True,
)
[docs] def __call__(self, *args, **kwargs):
"""
.. deprecated:: 1.48.3
Use :py:func:`otp.run <onetick.py.run>` instead.
"""
warnings.warn('__call__() method is deprecated, use otp.run() instead', DeprecationWarning, stacklevel=2)
return otp.run(self, *args, **kwargs)
[docs] def to_df(self, symbols=None, **kwargs):
"""
.. deprecated:: 1.48.3
Use :py:func:`otp.run <onetick.py.run>` instead.
"""
warnings.warn('to_df() method is deprecated, use otp.run() instead', DeprecationWarning, stacklevel=2)
# For backward compatibility: otp.run() does not accept "symbols" as a non-keyword argument
if symbols is not None:
kwargs['symbols'] = symbols
return otp.run(self, **kwargs)
to_dataframe = to_df
def print_api_graph(self):
self.node().copy_graph(print_out=True)
def _add_table(self, strict=False):
table = otq.Table(
fields=",".join(
ott.type2str(dtype) + " " + name for name, dtype in self.columns(skip_meta_fields=True).items()
),
keep_input_fields=not strict,
)
self.sink(table)
def _is_unbound_required(self):
""" Check whether a graph needs unbound symbol or not """
for symbol in self.__sources_symbols.values():
if symbol is adaptive or symbol is adaptive_to_default:
return True
return False
def _get_widest_time_range(self):
"""
Get minimum start time and maximum end time.
If time is not found, None is returned.
"""
start_times = []
end_times = []
for start, end in self.__sources_keys_dates.values():
if start is not adaptive:
start_times.append(start)
if end is not adaptive:
end_times.append(end)
start = min(start_times) if start_times else None
end = max(end_times) if end_times else None
return start, end
def _get_date_range(self, symbols=None, start=None, end=None, default_start=None, default_end=None): #noqa
if default_start is None:
default_start = configuration.config.get('default_start_time', adaptive)
if default_end is None:
default_end = configuration.config.get('default_end_time', adaptive)
# ------------ #
# Find symbols
need_to_bind_symbol = False
common_symbol = None
if symbols is None:
# let's try to understand whether we could use common symbol for all sources
# or we need to bound symbols instead
first_symbol = None
for symbol in self.__sources_symbols.values():
if first_symbol is None:
first_symbol = symbol
if isinstance(first_symbol, _ManuallyBoundValue):
# Mark that we need to bound, but keep common_symbol equal to None.
# It is applicable for the bound symbols inside the merge with bound
# symbols, for example.
need_to_bind_symbol = True
else:
common_symbol = symbol
continue
if symbol and symbol != first_symbol:
need_to_bind_symbol = True
common_symbol = None
break
# symbol is specified nowhere - just set unbound to the default one
if (first_symbol is adaptive or first_symbol is adaptive_to_default) and (
common_symbol is adaptive or common_symbol is adaptive_to_default
):
common_symbol = configuration.config.default_symbol
else:
# when unbound symbols passed
common_symbol = symbols
need_to_bind_symbol = True # use to check all sources whether some has bound symbols
# Find max and min for _source data ranges
sources_start, sources_end = self._get_widest_time_range()
sources_start = sources_start or default_start
sources_end = sources_end or default_end
common_format = "%Y-%m-%d %H:%M:%S"
for key, date_range in self.__sources_keys_dates.items():
# find a function that builds _source
func = self.__sources_base_ep_func[key]
if func:
src = func()
# --------------------------
# determine whether we have to add modify_query_times to a src
start_date, end_date = date_range
if not self.__sources_modify_query_times[key]:
if start_date is not adaptive or end_date is not adaptive:
# if some of the end is specified, then it means
# we need to check whether it is worth to wrap into the
# modify_query_times
if start_date is adaptive:
if start is None:
start_date = sources_start
else:
start_date = start
if end_date is adaptive:
if end is None:
end_date = sources_end
else:
end_date = end
if start_date is not adaptive and end_date is not adaptive:
# it might happen when either sources_start/end are adaptive
# or start/end are adaptive
if (
(start is None and sources_start is not adaptive and start_date != sources_start)
or (start is not None and start_date != start)
or (end is None and sources_end is not adaptive and end_date != sources_end)
or (end is not None and end_date != end)
):
self.__sources_modify_query_times[key] = True
mqt = otq.ModifyQueryTimes(
start_time='parse_time(("'
+ common_format
+ '.%q"),"'
+ start_date.strftime(common_format + ".%f")
+ '", _TIMEZONE)',
end_time='parse_time(("'
+ common_format
+ '.%q"),"'
+ end_date.strftime(common_format + ".%f")
+ '", _TIMEZONE)',
output_timestamp="TIMESTAMP",
)
src.sink(mqt)
if need_to_bind_symbol:
bound = None
if key in self.__sources_symbols: # TODO: this is wrong, we need to know about symbols
# it happens when we do not copy symbols when apply
# merge with bound symbols.
# Wrong, in that case merge with bound symbol is
# non distinguishable from the manually passed None
# for external queries
bound = self.__sources_symbols[key]
if isinstance(bound, _ManuallyBoundValue):
bound = bound.value
if bound and bound is not adaptive and bound is not adaptive_to_default:
src.__node.symbol(bound)
else:
# if key is not in __sources_symbols, then
# it means that symbol was not specified, and
# therefor use unbound symbol
if common_symbol is None:
if bound is adaptive_to_default:
src.__node.symbol(configuration.config.default_symbol)
else:
pass
# # TODO: write test validated this
#
# # raise Exception("One of the branch does not have symbol specified")
# --------------------------
# glue _source with the main graph
self.node().add_rules(src.node().copy_rules())
self.source_by_key(src.node().copy_graph(), key)
self._merge_tmp_otq(src)
else:
pass
if start is None:
start = sources_start
if end is None:
end = sources_end
return start, end, common_symbol
def _to_graph(self, add_passthrough=True):
"""
Construct the graph. Only for internal usage.
It is private, because it constructs the raw graph assuming that a graph
is already defined, and might confuse an end user, because by default Source
is not fully defined; it becomes fully defined only when symbols, start and
end datetime are specified.
"""
constructed_obj = self.copy()
# we add it for case when the last EP has a pin output
if add_passthrough:
constructed_obj.sink(otq.Passthrough())
return otq.GraphQuery(constructed_obj.node().get())
[docs] def to_graph(self, raw=None, symbols=None, start=None, end=None, *, add_passthrough=True):
"""
Construct an :py:class:`onetick.query.GraphQuery` object.
Parameters
----------
raw:
.. deprecated:: 1.4.17 has no effect
symbols:
symbols query to add to otq.GraphQuery
start: datetime
start time of a query
end: datetime
end time of a query
add_passthrough: bool
add additional :py:class:`onetick.query.Passthrough` event processor to the end of a resulted graph
Returns
-------
otq.GraphQuery
See Also
--------
:meth:`render`
"""
if raw is not None:
warnings.warn('The "raw" flag is deprecated and makes not effect', DeprecationWarning)
_obj, _start, _end, _symbols = self.__prepare_graph(symbols, start, end)
if _obj._tmp_otq.queries:
warnings.warn('Using .to_graph() for a Source object that uses sub-queries! '
'This operation is deprecated and is not guaranteed to work as expected. '
'Such a Source should be executed using otp.run() or saved to disk using to_otq()',
DeprecationWarning)
_obj.sink(otq.Passthrough().output_pin_name('OUT_FOR_TO_GRAPH'))
_graph = _obj._to_graph(add_passthrough=False)
_graph.set_start_time(_start)
_graph.set_end_time(_end)
_graph.set_symbols(_symbols)
query = _obj._tmp_otq.save_to_file(
query=_graph, file_suffix='_to_graph.otq')
query_path, query_name = query.split('::')
query_params = get_query_parameter_list(query_path, query_name)
source_with_nested_query = otp.Query(otp.query(query,
**{param: f'${param}' for param in query_params}),
out_pin='OUT_FOR_TO_GRAPH')
return source_with_nested_query.to_graph(
symbols=_symbols, start=_start, end=_end,
add_passthrough=add_passthrough)
else:
return _obj._to_graph(add_passthrough=add_passthrough)
[docs] def render(self, **kwargs):
"""
Renders a calculation graph using the ``graphviz`` library.
Every node is the onetick query language event processor.
Nodes in nested queries, first stage queries and eval queries are not shown.
Could be useful for debugging and in jupyter to learn the underlying graph.
Note that it's required to have :graphviz:`graphviz <>` package installed.
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data1, data2 = data[(data['X'] > 2)]
>>> data = otp.merge([data1, data2])
>>> data.render() # doctest: +SKIP
.. graphviz:: ../../static/render_example.dot
"""
kwargs.setdefault('verbose', True)
self._to_graph().render(**kwargs)
[docs] @inplace_operation
def write(self,
db: Union[str, 'otp.DB'],
symbol: Optional[Union[str, 'otp.Column']] = None,
tick_type: Optional[Union[str, 'otp.Column']] = None,
date: Optional[date] = adaptive,
start: Optional[date] = None,
end: Optional[date] = None,
append: bool = True,
keep_symbol_and_tick_type: bool = adaptive,
propagate: bool = True,
out_of_range_tick_action: Literal['exception', 'ignore', 'load'] = 'exception',
timestamp: Optional['otp.Column'] = None,
keep_timestamp: bool = True,
correction_type: Optional['otp.Column'] = None,
replace_existing_time_series: bool = False,
allow_concurrent_write: bool = False,
context: str = adaptive,
use_context_of_query: bool = False,
inplace: bool = False,
**kwargs) -> Optional['Source']:
"""
Saves data result to OneTick database.
Note
----
This method does not save anything. It adds instruction in query to save.
Data will be saved when query will be executed.
Using ``start``+``end`` parameters instead of single ``date`` have some limitations:
* ``inplace`` is not supported
* if ``DAY_BOUNDARY_TZ`` and ``DAY_BOUNDARY_OFFSET`` specified against
individual locations of database, then day boundary could be calculated incorrectly.
* ``out_of_range_tick_action`` could be only ``exception`` or ``ignore``
Parameters
----------
db: str or :py:class:`otp.DB <onetick.py.DB>`
database name or object.
symbol: str or Column
resulting symbol name string or column to get symbol name from.
If this parameter is not set, then ticks _SYMBOL_NAME pseudo-field is used.
If it is empty, an attempt is made to retrieve
the symbol name from the field named SYMBOL_NAME.
tick_type: str or Column
resulting tick type string or column to get tick type from.
If this parameter is not set, the _TICK_TYPE pseudo-field is used.
If it is empty, an attempt is made to retrieve
the tick type from the field named TICK_TYPE.
date: datetime or None
date where to save data.
Should be set to `None` if writing to accelerator or memory database.
By default, it is set to `otp.config.default_date`.
start: datetime or None
Start date for data to save. It is inclusive.
Cannot be used with ``date`` parameter.
Also cannot be used with ``inplace`` set to ``True``.
Should be set to `None` if writing to accelerator or memory database.
By default, None.
end: datetime or None
End date for data to save. It is exclusive, so be sure to set
it to the next day after the last day in data.
Cannot be used with ``date`` parameter.
Also cannot be used with ``inplace`` set to ``True``.
Should be set to `None` if writing to accelerator or memory database.
By default, None.
append: bool
If False - data will be rewritten for this ``date``
or range of dates (from ``start`` to ``end``),
otherwise data will be appended: new symbols are added,
existing symbols can be modified (append new ticks, modify existing ticks).
This option is not valid for accelerator databases.
keep_symbol_and_tick_type: bool
keep fields containing symbol name and tick type when writing ticks
to the database or propagating them.
By default, this parameter is adaptive.
If ``symbol`` or ``tick_type`` are column objects, then it's set to True.
Otherwise, it's set to False.
propagate: bool
Propagate ticks after that event processor or not.
out_of_range_tick_action: str
Action to be executed if tick's timestamp's date is not ``date`` or between ``start`` or ``end``:
* `exception`: runtime exception will be raised
* `ignore`: tick will not be written to the database
* `load`: writes tick to the database anyway.
Can be used only with ``date``, not with ``start``+``end``.
Default: `exception`
timestamp: Column
Field that contains the timestamp with which the ticks will be written to the database.
By default, the TIMESTAMP pseudo-column is used.
keep_timestamp: bool
If ``timestamp`` parameter is set and this parameter is set to True,
then timestamp column is removed.
correction_type: Column
The name of the column that contains the correction type.
This column will be removed.
If this parameter is not set, no corrections will be submitted.
replace_existing_time_series: bool
If ``append`` is set to True, setting this option to True instructs the loader
to replace existing time series, instead of appending to them.
Other time series will remain unchanged.
allow_concurrent_write: bool
Allows different queries running on the same server to load concurrently into the same database.
context: str
The server context used to look up the database.
By default, `otp.config.context` is used if ``use_context_of_query`` is not set.
use_context_of_query: bool
If this parameter is set to True and the ``context`` parameter is not set,
the context of the query is used instead of the default value of the ``context`` parameter.
inplace: bool
A flag controls whether operation should be applied inplace.
If ``inplace=True``, then it returns nothing.
Otherwise, method returns a new modified object.
Cannot be ``True`` if ``start`` and ``end`` are set.
kwargs:
.. deprecated:: use named parameters instead.
Returns
-------
:class:`Source` or None
See also
--------
**WRITE_TO_ONETICK_DB** OneTick event processor
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data = data.write('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE')
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
>>> data = otp.DataSource('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE')
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
"""
if 'append_mode' in kwargs:
warnings.warn("Parameter 'append_mode' is deprecated, use 'append'", DeprecationWarning)
append = kwargs.pop('append_mode')
if 'timestamp_field' in kwargs:
warnings.warn("Parameter 'timestamp_field' is deprecated, use 'timestamp'", DeprecationWarning)
timestamp = kwargs.pop('timestamp_field')
if 'keep_timestamp_field' in kwargs:
warnings.warn("Parameter 'keep_timestamp_field' is deprecated, use 'keep_timestamp'", DeprecationWarning)
keep_timestamp = kwargs.pop('keep_timestamp_field')
if kwargs:
raise TypeError(f'write() got unexpected arguments: {list(kwargs)}')
kwargs = {}
if date is not adaptive and (start or end):
raise ValueError('date cannot be used with start+end')
if date is adaptive and (start and end) and inplace:
# join_with_query and merge are used for multiple dates, so inplace is not supported
raise ValueError('cannot run on multiple dates if inplace is True,'
' use one value for date instead of start+end')
if (start and not end) or (not start and end):
raise ValueError('start and end should be both specified or both None')
if date is adaptive:
date = configuration.config.default_date
if symbol is not None:
if isinstance(symbol, _Column):
kwargs['symbol_name_field'] = str(symbol)
if keep_symbol_and_tick_type is adaptive:
keep_symbol_and_tick_type = True
else:
kwargs.setdefault('symbol_name_field', '_SYMBOL_NAME_FIELD_')
self[kwargs['symbol_name_field']] = symbol
if tick_type is not None:
if isinstance(tick_type, _Column):
kwargs['tick_type_field'] = str(tick_type)
if keep_symbol_and_tick_type is adaptive:
keep_symbol_and_tick_type = True
else:
kwargs.setdefault('tick_type_field', '_TICK_TYPE_FIELD_')
self[kwargs['tick_type_field']] = tick_type
if keep_symbol_and_tick_type is adaptive:
keep_symbol_and_tick_type = False
if timestamp is not None:
kwargs['timestamp_field'] = str(timestamp)
if correction_type is not None:
kwargs['correction_type_field'] = str(correction_type)
if context is not adaptive:
kwargs['context'] = context
elif not use_context_of_query:
kwargs['context'] = otp.config.context
if out_of_range_tick_action.upper() == 'IGNORE':
pass
elif out_of_range_tick_action.upper() == 'LOAD':
if start and end:
raise ValueError('LOAD out_of_range_tick_action cannot be used with start+end, use date instead')
elif out_of_range_tick_action.upper() == 'EXCEPTION':
if start and end:
# WRITE_TO_ONETICK_DB use DAY_BOUNDARY_TZ and DAY_BOUNDARY_OFFSET
# to check tick timestamp is out of range or not
# so we mimic it here with THROW event processor
src = otp.Source(otq.DbShowConfig(str(db), 'DB_TIME_INTERVALS'))
src.table(inplace=True,
DAY_BOUNDARY_TZ=str,
DAY_BOUNDARY_OFFSET=int)
# DAY_BOUNDARY_OFFSET offset are in seconds
src['DAY_BOUNDARY_OFFSET'] = src['DAY_BOUNDARY_OFFSET'] * 1000
src.rename({
'DAY_BOUNDARY_TZ': '__DAY_BOUNDARY_TZ',
'DAY_BOUNDARY_OFFSET': '__DAY_BOUNDARY_OFFSET'
}, inplace=True)
self = self.join_with_query(src,
symbol=f"{str(db)}::DUMMY",
caching='per_symbol')
start_formatted = start.strftime('%Y-%m-%d')
end_formatted = end.strftime('%Y-%m-%d')
convert_timestamp = 'tostring(AS_YYYYMMDDHHMMSS(TIMESTAMP,__DAY_BOUNDARY_TZ))+"."+' + \
'TIME_FORMAT("%J", TIMESTAMP, __DAY_BOUNDARY_TZ)'
self.sink(otq.Throw(
where=f'TIMESTAMP < PARSE_NSECTIME("%Y-%m-%d", "{start_formatted}"'
', __DAY_BOUNDARY_TZ) + __DAY_BOUNDARY_OFFSET',
message=f"'Timestamp '+{convert_timestamp}+' of a tick, "
f"visible or hidden, earlier then {start_formatted}"
" in timezone ' + __DAY_BOUNDARY_TZ",
treat_message_as_per_tick_expr='true'
))
self.sink(otq.Throw(
where=f'TIMESTAMP >= PARSE_NSECTIME("%Y-%m-%d", "{end_formatted}"'
', __DAY_BOUNDARY_TZ) + __DAY_BOUNDARY_OFFSET',
message=f"'Timestamp '+{convert_timestamp}+' of a tick, "
f"visible or hidden, later then {end_formatted}"
" in timezone ' + __DAY_BOUNDARY_TZ",
treat_message_as_per_tick_expr='true'
))
else:
raise ValueError(f'Unknown out_of_range_tick_action: {out_of_range_tick_action}.'
' Possible values are: "ignore", "exception"')
branches = []
if propagate:
branches = [self]
kwargs = dict(
**kwargs,
database=str(db),
append_mode=append,
keep_symbol_name_and_tick_type=keep_symbol_and_tick_type,
keep_timestamp_field=keep_timestamp,
replace_existing_time_series=replace_existing_time_series,
allow_concurrent_write=allow_concurrent_write,
use_context_of_query=use_context_of_query,
)
if start and end:
days = (end - start).days + 1
for i in range(days):
branch = self.copy()
branch.sink(
otq.WriteToOnetickDb(
date=(start + otp.Day(i)).strftime('%Y%m%d'),
propagate_ticks=False,
out_of_range_tick_action='IGNORE',
**kwargs
)
)
branches.append(branch)
self = otp.merge(branches)
else:
self.sink(
otq.WriteToOnetickDb(
date=date.strftime('%Y%m%d') if date else '',
propagate_ticks=propagate,
out_of_range_tick_action=out_of_range_tick_action.upper(),
**kwargs
)
)
for col in ('_SYMBOL_NAME_FIELD_', '_TICK_TYPE_FIELD_'):
if col in self.schema:
self.drop(col, inplace=True)
to_drop = set()
if not keep_symbol_and_tick_type:
to_drop.update((kwargs['symbol_name_field'], kwargs['tick_type_field']))
if not keep_timestamp and timestamp is not None and str(timestamp) not in {'Time', 'TIMESTAMP'}:
to_drop.add(str(timestamp))
if correction_type is not None:
to_drop.add(str(correction_type))
self.schema.set(**{
k: v for k, v in self.schema.items()
if k not in to_drop
})
return self
[docs] def copy(self, ep=None, columns=None, deep=False) -> 'Source':
"""
Build an object with copied calculation graph.
Every node of the resulting graph has the same id as in the original. It means that
if the original and copied graphs are merged or joined together further then all common
nodes (all that created before the .copy() method) will be glued.
For example, let's imagine that you have the following calculation graph ``G``
.. graphviz::
digraph {
rankdir="LR";
A -> B;
}
where ``A`` is a source and ``B`` is some operation on it.
Then we copy it to the ``G'`` and assign a new operation there
.. graphviz::
digraph {
rankdir="LR";
A -> B -> C;
}
After that we decided to merge ``G`` and ``G'``. The resulting calculation graph will be:
.. graphviz::
digraph {
rankdir="LR";
A -> B -> C -> MERGE;
B -> MERGE;
}
Please use the :meth:`Source.deepcopy` if you want to get the following calculation graph after merges and joins
.. graphviz::
digraph {
rankdir="LR";
A -> B -> C -> MERGE;
"A'" -> "B'" -> "C'" -> MERGE;
}
Returns
-------
Source
See Also
--------
Source.deepcopy
"""
if columns is None:
columns = self.columns()
if ep:
result = self.__class__(node=ep, **columns)
result.source(self.node().copy_graph())
# we need to clean it, because ep is not a _source
result._clean_sources_dates()
else:
result = self.__class__(node=self.node(), **columns)
result.node().add_rules(self.node().copy_rules(deep=deep))
result._set_sources_dates(self)
if deep:
# generating all new uuids for node history and for sources
# after they were initialized
keys = defaultdict(uuid.uuid4) # type: ignore
result.node().rebuild_graph(keys)
result._change_sources_keys(keys)
# add state
result._copy_state_vars_from(self)
result._tmp_otq = self._tmp_otq.copy()
result.__name = self.__name #noqa
result._copy_properties_from(self)
return result
[docs] def deepcopy(self, ep=None, columns=None) -> 'onetick.py.Source':
"""
Copy all graph and change ids for every node.
More details could be found in :meth:`Source.copy`
See Also
--------
Source.copy
"""
return self.copy(ep, columns, deep=True)
def _copy_properties_from(self, obj):
# needed if we are doing copy of a child with custom properties
for attr in set(self.__class__._PROPERTIES) - set(Source._PROPERTIES):
setattr(self, attr, getattr(obj, attr))
def _copy_state_vars_from(self, objs):
self.__dict__["_state_vars"] = StateVars(self, objs)
[docs] def split(self, expr, cases, default=False) -> Tuple['Source', ...]:
"""
The method splits data using passed expression ``expr`` for several
outputs by passed ``cases``. The method is the alias for the :meth:`Source.switch`
Parameters
----------
expr : Operation
column or column based expression
cases : list
list of values or :py:class:`onetick.py.range` objects to split by
default : bool
``True`` adds the default output
Returns
-------
Outputs according to passed cases, number of outputs is number of cases plus one if ``default=True``
See also
--------
| :meth:`Source.switch`
| **SWITCH** OneTick event processor
Examples
--------
>>> # OTdirective: snippet-name: Source operations.split;
>>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4])
>>> r1, r2, r3 = data.split(data['X'], [otp.nan, otp.range(0, 100)], default=True)
>>> otp.run(r1)
Time X
0 2003-12-01 00:00:00.002 NaN
>>> otp.run(r2)
Time X
0 2003-12-01 00:00:00.000 0.33
1 2003-12-01 00:00:00.003 9.40
>>> otp.run(r3)
Time X
0 2003-12-01 00:00:00.001 -5.1
See Also
--------
Source.switch
:py:class:`onetick.py.range`
"""
output_num = len(cases)
# format cases
def to_str(v):
if isinstance(v, onetick.py.utils.range):
return "[" + str(v.start) + "," + str(v.stop) + "]"
elif isinstance(v, str):
return '"' + v + '"'
elif isinstance(v, tuple):
return ",".join(map(to_str, list(v)))
return str(v)
cases = [f"{to_str(cases[inx])}:OUT{inx}" for inx in range(output_num)]
# create ep
params = dict(switch=str(expr), cases=";".join(cases))
if default:
params["default_output"] = "DEF_OUT"
switch = self.copy(ep=otq.SwitchEp(**params))
# construct results
result = []
for inx in range(output_num):
res = switch.copy()
res.node().out_pin(f"OUT{inx}")
res.sink(otq.Passthrough())
result.append(res)
if default:
res = switch.copy()
res.node().out_pin("DEF_OUT")
res.sink(otq.Passthrough())
result.append(res)
return tuple(result)
[docs] def switch(self, expr, cases, default=False) -> Tuple['Source', ...]:
"""
The method splits data using passed expression for several
outputs by passed cases. This method is an alias for
:meth:`Source.split` method.
Parameters
----------
expr : Operation
column or column based expression
cases : list
list of values or :py:class:`onetick.py.range` objects to split by
default : bool
``True`` adds the default output
Returns
-------
Outputs according to passed cases, number of outputs is number of cases plus one if ``default=True``
See also
--------
| :meth:`Source.split`
| **SWITCH** OneTick event processor
Examples
--------
>>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4])
>>> r1, r2, r3 = data.switch(data['X'], [otp.nan, otp.range(0, 100)], default=True)
>>> otp.run(r1)
Time X
0 2003-12-01 00:00:00.002 NaN
>>> otp.run(r2)
Time X
0 2003-12-01 00:00:00.000 0.33
1 2003-12-01 00:00:00.003 9.40
>>> otp.run(r3)
Time X
0 2003-12-01 00:00:00.001 -5.1
See Also
--------
Source.split
:py:class:`onetick.py.range`
"""
return self.split(expr, cases, default)
def columns(self, skip_meta_fields=False):
"""
Return columns in data source
Parameters
----------
skip_meta_fields: bool, default=False
do not add meta fields
Returns
-------
dict
"""
result = {}
for key, value in self.__dict__.items():
if skip_meta_fields and key in self.__class__.meta_fields:
continue
if key in self.__class__._PROPERTIES:
continue
if isinstance(value, _Column):
result[value.name] = value.dtype
return result
def drop_columns(self):
"""
Method removes all columns in the python representation, but don't
drop columns on the data.
It is used when external query is applied, because we don't know how
data schema has changed.
"""
items = []
for key, value in self.__dict__.items():
if key in self.__class__.meta_fields or key in self.__class__._PROPERTIES:
continue
if isinstance(value, _Column):
items.append(key)
for item in items:
del self.__dict__[item]
def node(self):
return self.__node
def tick_type(self, tt):
self.__node.tick_type(tt)
return self
def symbol(self, symbol):
"""
Apply symbol to graph
.. deprecated:: 1.3.31
"""
warnings.warn("symbol method is deprecated, please specify symbol during creation", DeprecationWarning)
self.__node.symbol(symbol)
return self
def node_name(self, name=None, key=None):
return self.__node.node_name(name, key)
def __add__(self, other) -> 'Source':
return onetick.py.functions.merge([self, other])
[docs] @inplace_operation
def table(self, inplace=False, strict: bool = True, **schema) -> Optional['Source']:
"""
Set the OneTick and python schemas levels according to the ``schema``
parameter. The ``schema`` should contain either (field_name -> type) pairs
or (field_name -> default value) pairs; ``None`` means no specified type, and
OneTick considers it's as a double type.
Resulting ticks have the same order as in the ``schema``. If only partial fields
are specified (i.e. when the ``strict=False``) then fields from the ``schema`` have
the most left position.
Parameters
----------
inplace: bool
The flag controls whether operations should be applied inplace
strict: bool
If set to ``False``, all fields present in an input tick will be present in the output tick.
If ``True``, then only fields specified in the ``schema``.
schema:
field_name -> type or field_name -> default value pairs that should be applied on the source.
Returns
-------
:class:`Source` or ``None``
See Also
--------
| :attr:`Source.schema`
| :meth:`__getitem__`: the table shortcut
| **TABLE** OneTick event processor
Examples
--------
Selection case
>>> data = otp.Ticks(X1=[1, 2, 3],
... X2=[3, 2, 1],
... A1=["A", "A", "A"])
>>> data = data.table(X2=int, A1=str) # OTdirective: snippet-name: Arrange.set schema;
>>> otp.run(data)
Time X2 A1
0 2003-12-01 00:00:00.000 3 A
1 2003-12-01 00:00:00.001 2 A
2 2003-12-01 00:00:00.002 1 A
Defining default values case (note the order)
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data = data.table(Y=0.5, strict=False)
>>> otp.run(data)
Time Y X
0 2003-12-01 00:00:00.000 0.5 1
1 2003-12-01 00:00:00.001 0.5 2
2 2003-12-01 00:00:00.002 0.5 3
"""
def is_time_type_or_nsectime(obj):
return ott.is_time_type(obj) or isinstance(obj, ott.nsectime)
def transformer(name, obj):
if obj is None:
return name
res = f'{ott.type2str(ott.get_object_type(obj))} {name}'
if not isinstance(obj, Type) and not is_time_type_or_nsectime(obj):
res += f' ({ott.value2str(obj)})'
return res
def get_type(value):
if value is None:
return float
return ott.get_object_type(value)
for c_name in list(schema.keys()):
if c_name in self.__class__.meta_fields:
# meta fields should not be propagated to Table ep
# otherwise new user-defined field with the same name will appear in schema
# and using this field will raise an "ambiguous use" error in OneTick
warnings.warn(f"Meta field '{c_name}' should not be used in .table() method.")
schema.pop(c_name)
if not schema:
return self
schema_to_set = {c_name: get_type(c_value)
for c_name, c_value in schema.items()}
if strict:
self.schema.set(**schema_to_set)
else:
self.schema.update(**schema_to_set)
fields = ','.join([transformer(c_name, c_value)
for c_name, c_value in schema.items()])
self.sink(otq.Table(fields=fields, keep_input_fields=not strict))
for c_name, c_value in schema.items():
# datetime and nsetime values require onetick built-in functions to be initialized
# but built-in functions can't be used in table ep so updating columns after the table
if is_time_type_or_nsectime(c_value):
self.update({c_name: c_value}, where=self[c_name] == 0, inplace=True)
self._fix_varstrings()
return self
def _fix_varstrings(self):
"""
PY-556: converting to varstring results in string with null-characters
"""
varstring_columns = {
name: self[name]
for name, dtype in self.schema.items()
if dtype is ott.varstring
}
# just updating the column removes null-characters
if varstring_columns:
self.update(varstring_columns, inplace=True)
[docs] @inplace_operation
def drop(self, columns: List[Any], inplace=False) -> Optional['Source']:
r""" Remove a list of columns from the Source.
If column with such name wasn't found the error will be raised, if the regex was specified and there aren't
any matched columns, do nothing.
Regex is any string containing any of characters ***+?\:[]{}()**,
dot is a valid symbol for OneTick identifier,
so **a.b** will be passed to OneTick as an identifier,
if you want specify such regex use parenthesis - **(a.b)**
Parameters
----------
columns : str, Column or list of them
Column(s) to remove. You could specify a regex or collection of regexes, in such case columns with
match names will be deleted.
inplace: bool
A flag controls whether operation should be applied inplace.
If inplace=True, then it returns nothing. Otherwise method
returns a new modified object.
Returns
----------
:class:`Source` or ``None``
See Also
--------
**PASSTHROUGH** OneTick event processor
Examples
--------
>>> data = otp.Ticks(X1=[1, 2, 3],
... X2=[3, 2, 1],
... A1=["A", "A", "A"])
>>> data = data.drop("X1") # OTdirective: snippet-name: Arrange.drop.one field;
>>> otp.run(data)
Time X2 A1
0 2003-12-01 00:00:00.000 3 A
1 2003-12-01 00:00:00.001 2 A
2 2003-12-01 00:00:00.002 1 A
Regexes also could be specified in such case all matched columns will be deleted
>>> data = otp.Ticks(X1=[1, 2, 3],
... X2=[3, 2, 1],
... A1=["A", "A", "A"])
>>> data = data.drop(r"X\d+") # OTdirective: snippet-name: Arrange.drop.regex;
>>> otp.run(data)
Time A1
0 2003-12-01 00:00:00.000 A
1 2003-12-01 00:00:00.001 A
2 2003-12-01 00:00:00.002 A
Several parameters can be specified
>>> # OTdirective: snippet-name: Arrange.drop.multiple;
>>> data = otp.Ticks(X1=[1, 2, 3],
... X2=[3, 2, 1],
... Y1=[1, 2, 3],
... Y2=[3, 2, 1],
... YA=["a", "b", "c"],
... A1=["A", "A", "A"])
>>> data = data.drop([r"X\d+", "Y1", data["Y2"]])
>>> otp.run(data)
Time YA A1
0 2003-12-01 00:00:00.000 a A
1 2003-12-01 00:00:00.001 b A
2 2003-12-01 00:00:00.002 c A
**a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)**
>>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1],
... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]})
>>> data = data.drop("COLUMN.A") # OTdirective: skip-snippet:;
>>> otp.run(data)
Time COLUMN1A COLUMN1B COLUMN2A
0 2003-12-01 00:00:00.000 3 a c
1 2003-12-01 00:00:00.001 2 b b
2 2003-12-01 00:00:00.002 1 c a
>>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1],
... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]})
>>> data = data.drop("(COLUMN.A)") # OTdirective: skip-snippet:;
>>> otp.run(data)
Time COLUMN1B
0 2003-12-01 00:00:00.000 a
1 2003-12-01 00:00:00.001 b
2 2003-12-01 00:00:00.002 c
"""
self.__delitem__(columns)
return self
[docs] @inplace_operation
def dropna(self,
how: Literal["any", "all"] = "any",
subset: Optional[List[Any]] = None,
inplace=False) -> Optional['Source']:
"""
Drops ticks that contain NaN values according to the policy in the ``how`` parameter
Parameters
----------
how: "any" or "all"
``any`` - filters out ticks if at least one field has NaN value
``all`` - filters out ticks if all fields have NaN values.
subset: list of str
list of columns to check for NaN values. If ``None`` then all columns are checked.
inplace: bool
the flag controls whether operation should be applied inplace.
Returns
-------
:class:`Source` or ``None``
Examples
--------
Drop ticks where **at least one** field has ``nan`` value.
>>> data = otp.Ticks([[ 'X', 'Y'],
... [ 0.0, 1.0],
... [ otp.nan, 2.0],
... [ 4.0, otp.nan],
... [ otp.nan, otp.nan],
... [ 6.0, 7.0]])
>>> data = data.dropna()
>>> otp.run(data)[['X', 'Y']]
X Y
0 0.0 1.0
1 6.0 7.0
Drop ticks where **all** fields have ``nan`` values.
>>> data = otp.Ticks([[ 'X', 'Y'],
... [ 0.0, 1.0],
... [ otp.nan, 2.0],
... [ 4.0, otp.nan],
... [ otp.nan, otp.nan],
... [ 6.0, 7.0]])
>>> data = data.dropna(how='all')
>>> otp.run(data)[['X', 'Y']]
X Y
0 0.0 1.0
1 NaN 2.0
2 4.0 NaN
3 6.0 7.0
Drop ticks where **all** fields in **subset** of columns have ``nan`` values.
>>> data = otp.Ticks([[ 'X', 'Y', 'Z'],
... [ 0.0, 1.0, otp.nan],
... [ otp.nan, 2.0, otp.nan],
... [ 4.0, otp.nan, otp.nan],
... [ otp.nan, otp.nan, otp.nan],
... [ 6.0, 7.0, otp.nan]])
>>> data = data.dropna(how='all', subset=['X', 'Y'])
>>> otp.run(data)[['X', 'Y', 'Z']]
X Y Z
0 0.0 1.0 NaN
1 NaN 2.0 NaN
2 4.0 NaN NaN
3 6.0 7.0 NaN
"""
if how not in ["any", "all"]:
raise ValueError(f"It is expected to see 'any' or 'all' values for 'how' parameter, but got '{how}'")
condition = None
columns = self.columns(skip_meta_fields=True)
if subset is not None:
for column_name in subset:
if column_name not in columns:
raise ValueError(f"There is no '{column_name}' column in the source")
if columns[column_name] is not float:
raise ValueError(f"Column '{column_name}' is not float type")
for column_name, dtype in columns.items():
if subset is not None and column_name not in subset:
continue
if dtype is float:
if condition is None:
condition = self[column_name] != ott.nan
else:
if how == "any":
condition &= self[column_name] != ott.nan
elif how == "all":
condition |= self[column_name] != ott.nan
self.sink(otq.WhereClause(where=str(condition)))
return self
def __delitem__(self, obj):
if isinstance(obj, list) or isinstance(obj, tuple):
objs = obj
else:
objs = (obj,)
items_to_passthrough, names_of_columns, regex = self._get_columns_names_drop(objs)
self._validate_columns_names(names_of_columns)
for item in names_of_columns:
del self.__dict__[item]
self.sink(otq.Passthrough(drop_fields=True, fields=",".join(items_to_passthrough), use_regex=regex))
def _validate_columns_names(self, names_of_columns):
for item in names_of_columns:
if item in self.__dict__:
if not isinstance(self.__dict__[item], _Column):
raise AttributeError(f"There is no '{item}' column")
else:
raise AttributeError(f"There is no '{item}' column")
def _get_columns_names_drop(self, objs: Tuple[Union[_Column, str]]) -> Tuple[List[str], List[str], bool]:
"""
We can't be sure python Source has consistent columns cache, because sinking complex event processors
can change columns unpredictable, so if user will specify regex as a param, we will pass regex
as an onetick's param, but delete all matched columns from python Source cache.
Parameters
----------
objs:
Tuple of _Columns or string to drop. String can be regex.
Returns
-------
items_to_passthrough:
Items to pass to Passthrough as `field` parameter.
names_of_columns:
Names of onetick.py Source columns to delete from Source instance.
regex:
Value to pass to Passthrough as `use_regex` parameter.
"""
items_to_passthrough = []
names_of_columns = []
regex = False
for obj in objs:
if isinstance(obj, _Column):
name: str = obj.name
items_to_passthrough.append(name)
names_of_columns.append(name)
elif isinstance(obj, str):
items_to_passthrough.append(obj)
if any(c in "*+?\\:[]{}()" for c in obj): # it is a possible regex
regex = True
names_of_columns.extend(col for col in self.columns() if re.match(obj, col))
else:
names_of_columns.append(obj)
else:
raise TypeError(f"It is not supported to delete item '{obj}' of type '{type(obj)}'")
# remove duplications and meta_fields
names_of_columns = set(names_of_columns) - set(self.__class__.meta_fields)
return items_to_passthrough, list(names_of_columns), regex
def __from_ep_to_proxy(self, ep):
in_pin, out_pin = None, None
if isinstance(ep, otq.graph_components.EpBase.PinnedEp):
if hasattr(ep, "_output_name"):
out_pin = getattr(ep, "_output_name")
else:
in_pin = getattr(ep, "_input_name")
ep = getattr(ep, "_ep")
return ep, uuid.uuid4(), in_pin, out_pin
[docs] def sink(self, ep, out_pin=None, move_node=True):
"""
Appends node inplace to the source.
Connect `out_pin` of this source to `ep`.
Can be used to connect onetick.query objects to onetick.py source.
Data schema changes (added or deleted columns) will not be detected automatically
after applying `sink` function, so the user must change the schema himself
by updating `schema` property.
Parameters
----------
ep: otq.graph_components.EpBase,\
otq.graph_components.EpBase.PinnedEp,\
Tuple[otq.graph_components.EpBase, uuid.uuid4, Optional[str], Optional[str]]
onetick.query EP object to append to source.
out_pin: Optional[str], default=None
name of the out pin to connect to `ep`
move_node: bool, default=True
Returns
----------
result: otq.graph_components.EpBase, otq.graph_components.EpBase.PinnedEp
Last node of the source
See Also
--------
onetick.py.Source.schema
onetick.py.core._source.schema.Schema
Examples
--------
Adding column 'B' directly with onetick.query EP.
>>> data = otp.Tick(A=1)
>>> data.sink(otq.AddField(field='B', value=2)) # OTdirective: skip-snippet:;
AddField(field='B',value=2)
>>> otp.run(data) # OTdirective: skip-snippet:;
Time A B
0 2003-12-01 1 2
But we can't use this column with onetick.py methods yet.
>>> data['C'] = data['B'] # OTdirective: skip-snippet:; # doctest: +ELLIPSIS
Traceback (most recent call last):
...
KeyError: 'Column name B is not in the schema...
We should manually change source's schema
>>> data.schema.update(B=int) # OTdirective: skip-snippet:;
>>> data['C'] = data['B']
>>> otp.run(data)
Time A B C
0 2003-12-01 1 2 2
"""
if not (
issubclass(type(ep), otq.graph_components.EpBase)
or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp)
or type(ep) is tuple
):
raise Exception("sinking is allowed only for EpBase instances")
if type(ep) is tuple:
# for already existed EP fetched from _ProxyNode
return self.__node.sink(out_pin, *ep, move_node)
else:
return self.__node.sink(out_pin, *self.__from_ep_to_proxy(ep), move_node)
def __rshift__(self, ep):
""" duplicates sink() method, but returns new object """
new_source = self.copy()
new_source.sink(ep)
return new_source
def __irshift__(self, ep):
""" duplicates sink() method, but assigns source new object """
new_source = self.copy()
new_source.sink(ep)
return new_source
def source(self, ep, in_pin=None):
""" Add node as source to root node """
if not (
issubclass(type(ep), otq.graph_components.EpBase)
or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp)
or type(ep) is tuple
):
raise Exception("sourcing is allowed only for EpBase instances")
if type(ep) is tuple:
# for already existed EP fetched from _ProxyNode
return self.__node.source(in_pin, *ep)
else:
return self.__node.source(in_pin, *self.__from_ep_to_proxy(ep))
def source_by_key(self, ep, to_key):
""" Add node as source to graph node by key"""
if not (
issubclass(type(ep), otq.graph_components.EpBase)
or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp)
or type(ep) is tuple
):
raise Exception("sourcing is allowed only for EpBase instances")
if type(ep) is tuple:
# for already existed EP fetched from _ProxyNode
return self.__node.source_by_key(to_key, *ep)
else:
return self.__node.source_by_key(to_key, *self.__from_ep_to_proxy(ep))
def apply_query(self, query, in_pin="IN", out_pin="OUT", **params):
"""
Apply data source to query
.. deprecated:: 1.3.77
See Also
--------
Source.apply
"""
warnings.warn(
'The "apply_query" method is deprecated. Please, use the .apply method instead or '
"call a reference queries directly.",
DeprecationWarning
)
res = onetick.py.functions.apply_query(query, {in_pin: self}, [out_pin], **params)
res.node().out_pin(out_pin)
return res
[docs] def apply(self, obj) -> Union['otp.Column', 'Source']:
"""
Apply object to data source.
Parameters
----------
obj: onetick.py.query, Callable, type, onetick.query.GraphQuery
- `onetick.py.query` allows to apply external nested query
- python `Callable` allows to translate python code to similar OneTick's CASE expression.
There are some limitations to which python operators can be used in this callable.
See :ref:`Python callables parsing guide <python callable parser>` article for details.
In :ref:`Remote OTP with Ray<ray-remote>` any `Callable` must be decorated with `@otp.remote` decorator,
see :ref:`Ray usage examples<apply-remote-context>` for details.
- `type` allows to apply default type conversion
- `onetick.query.GraphQuery` allows to apply a build onetick.query.Graph
Returns
-------
Column, Source
Examples
--------
Apply external query to a tick flow. In this case it assumes that query has
only one input and one output. Check the :class:`query` examples if you
want to use a query with multiple inputs or outputs.
>>> data = otp.Ticks(X=[1, 2, 3])
>>> external_query = otp.query('update.otq')
>>> data = data.apply(external_query)
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00.000 2
1 2003-12-01 00:00:00.001 4
2 2003-12-01 00:00:00.002 6
Apply a predicate to a column / operation.
In this case value passed to a predicate is column values.
Result is a column.
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data['Y'] = data['X'].apply(lambda x: x * 2)
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.000 1 2
1 2003-12-01 00:00:00.001 2 4
2 2003-12-01 00:00:00.002 3 6
Another example of applying more sophisticated operation
>>> data = otp.Ticks(X=[1, 2, 3])
>>> data['Y'] = data['X'].apply(lambda x: 1 if x > 2 else 0)
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.000 1 0
1 2003-12-01 00:00:00.001 2 0
2 2003-12-01 00:00:00.002 3 1
Example of applying a predicate to a Source. In this case value passed
to a predicate is a whole tick. Result is a column.
>>> data = otp.Ticks(X=[1, 2, 3], Y=[.5, -0.4, .2])
>>> data['Z'] = data.apply(lambda tick: 1 if abs(tick['X'] * tick['Y']) > 0.5 else 0)
>>> otp.run(data)
Time X Y Z
0 2003-12-01 00:00:00.000 1 0.5 0
1 2003-12-01 00:00:00.001 2 -0.4 1
2 2003-12-01 00:00:00.002 3 0.2 1
See Also
--------
:py:class:`onetick.py.query`
:py:meth:`onetick.py.Source.script`
:ref:`Python callables parsing guide <python callable parser>`
"""
if isinstance(obj, onetick.py.sources.query):
graph = obj.graph_info
if len(graph.nested_inputs) != 1:
raise Exception(
f'It is expected the query "{obj.query_name}" to have one input, but it '
f"has {len(graph.nested_inputs)}"
)
if len(graph.nested_outputs) > 1:
raise Exception(
f'It is expected the query "{obj.query_name}" to have one or no output, but it '
f"has {len(graph.nested_outputs)}"
)
in_pin = graph.nested_inputs[0].NESTED_INPUT
if len(graph.nested_outputs) == 0:
out_pin = None # means no output
else:
out_pin = graph.nested_outputs[0].NESTED_OUTPUT
return obj(**{in_pin: self})[out_pin]
elif isinstance(obj, otq.GraphQuery):
tmp_file = utils.TmpFile(suffix=".otq")
obj.save_to_file(
tmp_file.path,
"graph_query_to_apply",
# start and end times don't matter for this query, use some constants
start=db_constants.DEFAULT_START_DATE,
end=db_constants.DEFAULT_END_DATE,
)
query_info = get_query_info(tmp_file.path)
if len(query_info.sources) != 1:
raise Exception(
"It is expected the query to have one input, but it " f"has {len(query_info.sources)}"
)
if len(query_info.sinks) != 1:
raise Exception("It is expected the query to have one output, but it " f"has {len(query_info.sinks)}")
add_pins(
tmp_file.path,
"graph_query_to_apply",
[(query_info.sources[0], 1, "IN"), (query_info.sinks[0], 0, "OUT")],
)
return onetick.py.sources.query(tmp_file.path + "::graph_query_to_apply")(IN=self)["OUT"]
return apply_lambda(obj, _EmulateObject(self))
[docs] def dump(self,
label: str = None,
where: 'otp.Operation' = None,
columns: Union[str, Tuple[str], List[str]] = None,
callback: Callable = None):
"""
Dumps the ``columns`` from ticks into std::out in runtime if they fit the ``where`` condition.
Every dump has a corresponding header that always includes the TIMESTAMP field. Other fields
could be configured using the ``columns`` parameter. A header could be augmented with a ``label`` parameter;
this label is an addition column that helps to distinguish ticks
from multiple dumps with the same schema, because ticks from different dumps could be mixed.
It might happen because of the OneTick multithreading, and there is the operating system
buffer between the OneTick and the actual output.
This method is helpful for debugging.
Parameters
----------
label: str
A label for a dump. It adds a special column **_OUT_LABEL_** for all ticks and set to the
specified value. It helps to distinguish ticks from multiple dumps, because actual
output could contain mixed ticks due the concurrency. ``None`` means no label.
where: Operation
A condition that allows to filter ticks to dump. ``None`` means no filtering.
columns: str, tupel or list
List of columns that should be in the output. ``None`` means dump all columns.
callback : callable
Callable, which preprocess source before printing.
See also
--------
**WRITE_TEXT** OneTick event processor
Examples
--------
>>> # OTdirective: skip-snippet:;
>>> data.dump(label='Debug point', where=data['PRICE'] > 99.3, columns=['PRICE', 'QTY']) # doctest: +SKIP
>>> data.dump(columns="X", callback=lambda x: x.first(), label="first") # doctest: +SKIP
"""
self_c = self.copy()
if callback:
self_c = callback(self_c)
if where is not None: # can't be simplified because the _Operation overrides __bool__
self_c, _ = self_c[(where)]
if columns:
self_c = self_c[columns if isinstance(columns, (list, tuple)) else [columns]]
if label:
self_c['_OUT_LABEL_'] = label
self_c.sink(otq.WriteText(formats_of_fields='TIMESTAMP=%|' + configuration.config.tz + '|%d-%m-%Y %H:%M:%S.%J',
prepend_timestamp=False,
prepend_symbol_name=False,
propagate_ticks=True))
# print <no data> in case there are 0 ticks
self_c = self_c.agg({'COUNT': otp.agg.count()})
self_c, _ = self_c[self_c['COUNT'] == 0]
self_c['NO_DATA'] = '<no data>'
self_c = self_c[['NO_DATA']]
self_c.sink(otq.WriteText(output_headers=False,
prepend_timestamp=False,
prepend_symbol_name=False,
propagate_ticks=True))
# Do not propagate ticks then, because we want just to write them into
# the std::out. We have to do that, because otherwise these ticks would
# go to a query output, that would mess real output.
self_c, _ = self_c[self_c['Time'] != self_c['Time']]
# We have to merge the branch back to the main branch even that these
# branch does not generate ticks, because we do not introduce one more
# output point, because the OneTick would add it to the final output
# datastructure.
self.sink(otq.Merge(identify_input_ts=False))
self.source(self_c.node().copy_graph())
self.node().add_rules(self_c.node().copy_rules())
self._merge_tmp_otq(self_c)
[docs] def script(self, func: Union[Callable[['Source'], Optional[bool]], str], inplace=False) -> 'Source':
# TODO: need to narrow Source object to get rid of undesired methods like aggregations
"""
Implements a script for every tick.
Allows to pass a ``func`` that will be applied per every tick.
A ``func`` can be python callable in this case it will be translated to per tick script.
In order to use it in Remote OTP with Ray, the function should be decorated with ``@otp.remote``,
see :ref:`Ray usage examples<apply-remote-context>` for details.
See :ref:`Per Tick Script Guide <python callable parser>` for more detailed description
of python to OneTick code translation and per-tick script features.
The script written in per tick script language can be passed itself as a string or path to a file with the code.
onetick-py doesn't validate the script, but configure the schema accordingly.
Parameters
----------
func: callable, str or path
- a callable that takes only one parameter - actual tick that behaves like a `Source` instance
- or the script on per-tick script language
- or a path to file with onetick script
Returns
-------
:class:`Source`
See also
--------
**PER_TICK_SCRIPT** OneTick event processor
Examples
--------
>>> t = otp.Ticks({'X': [1, 2, 3], 'Y': [4, 5, 6]})
>>> def fun(tick):
... tick['Z'] = 0
... if tick['X'] + tick['Y'] == 5:
... tick['Z'] = 1
... elif tick['X'] + tick['Y'] * 2 == 15:
... tick['Z'] = 2
>>> t = t.script(fun)
>>> otp.run(t)
Time X Y Z
0 2003-12-01 00:00:00.000 1 4 1
1 2003-12-01 00:00:00.001 2 5 0
2 2003-12-01 00:00:00.002 3 6 2
See also
--------
:py:meth:`onetick.py.Source.apply`
:ref:`Per-Tick Script Guide <python callable parser>`
"""
res = self if inplace else self.copy()
changed_tick_lists = {}
if callable(func):
_new_columns, _script = apply_script(func, _EmulateObject(self))
changed_tick_lists = _EmulateObject.get_changed_tick_lists()
elif isinstance(func, str):
if os.path.isfile(func):
# path to the file with script
with open(func) as file:
_script = file.read()
else:
_script = func
_new_columns = self._extract_columns_from_script(_script)
else:
raise ValueError("Wrong argument was specify, please use callable or string with either script on per tick "
"language or path to it")
res.sink(otq.PerTickScript(script=_script))
res.schema.update(**_new_columns)
for tick_list_name, tick_list_schema in changed_tick_lists.items():
res.state_vars[tick_list_name]._schema = tick_list_schema
return res
def _extract_columns_from_script(self, script):
result = {}
supported_types = {'byte', 'short', 'uint', 'long', 'ulong', 'int', 'float', 'double',
'string', 'time32', 'nsectime', 'msectime', 'varstring', 'decimal'}
types = r"(?P<type>varstring|byte|short|uint|int|ulong|long|" \
r"float|double|decimal|string|time32|msectime|nsectime|matrix)"
length = r"(\[(?P<length>\d+)\])?"
name = r"\s+(?P<name>\w+)\s*=\s*"
pattern = re.compile(types + length + name)
for p in re.finditer(pattern, script):
groupdict = p.groupdict()
type = ott.str2type(groupdict["type"]) if groupdict["type"] in supported_types else None
if type:
length = groupdict["length"]
if length:
length = int(length)
if type is str and length != ott.string.DEFAULT_LENGTH:
type = ott.string[length]
result[groupdict["name"]] = type
else:
warnings.warn(f"{groupdict['type']} isn't supported for now, so field {groupdict['name']} won't "
f"be added to schema.")
return result
[docs] def to_symbol_param(self):
"""
Creates a read-only instance with the same columns except Time.
It is used as a result of a first stage query with symbol params.
See also
--------
:ref:`Symbol Parameters Objects`
:ref:`Symbol parameters`
Examples
--------
>>> symbols = otp.Ticks({'SYMBOL_NAME': ['S1', 'S2'], 'PARAM': ['A', 'B']})
>>> symbol_params = symbols.to_symbol_param()
>>> t = otp.DataSource('SOME_DB', tick_type='TT')
>>> t['S_PARAM'] = symbol_params['PARAM']
>>> result = otp.run(t, symbols=symbols)
>>> result['S1']
Time X S_PARAM
0 2003-12-01 00:00:00.000 1 A
1 2003-12-01 00:00:00.001 2 A
2 2003-12-01 00:00:00.002 3 A
"""
return _SymbolParamSource(**self.columns())
@staticmethod
def _convert_symbol_to_string(symbol, tmp_otq=None, start=None, end=None, timezone=None):
if start is adaptive:
start = None
if end is adaptive:
end = None
if isinstance(symbol, Source):
symbol = otp.eval(symbol).to_eval_string(tmp_otq=tmp_otq,
start=start, end=end, timezone=timezone,
operation_suffix='symbol',
query_name=None,
file_suffix=symbol._name_suffix('symbol.otq'))
if isinstance(symbol, onetick.py.sources.query):
return symbol.to_eval_string()
else:
return symbol
@staticmethod
def _construct_multi_branch_graph(branches):
# TODO: add various checks, e.g. that branches have common parts
main = branches[0].copy()
for branch in branches[1:]:
main.node().add_rules(branch.node().copy_rules())
main._merge_tmp_otq(branch)
return main
def _apply_side_branches(self, side_branches):
for side_branch in side_branches:
self.node().add_rules(side_branch.node().copy_rules())
self._merge_tmp_otq(side_branch)
self.__sources_keys_dates.update(side_branch.__sources_keys_dates)
self.__sources_modify_query_times.update(side_branch.__sources_modify_query_times)
self.__sources_base_ep_func.update(side_branch.__sources_base_ep_func)
self.__sources_symbols.update(side_branch.__sources_symbols)
def symbols_for(self, func, *, symbol="SYMBOL_NAME", start="_PARAM_START_TIME_NANOS", end="_PARAM_END_TIME_NANOS"):
"""
Apply ticks from the current _source as symbols for
calculations in the 'func'.
Parameters
----------
func: callable
A callable object that takes only one parameter, that
references to the current symbol.
symbol: str
The column name that contains symbols. Default is 'SYMBOL_NAME'.
The mandatory column.
start: str
The column name that contains start time for a symbol. Default
is '_PARAM_START_TIME_NANOS'. The optional column.
end: str
The column name that contains end time for a symbol. Default
is '_PARAM_END_TIME_NANOS'. The optional column.
Returns
-------
A multi symbol ticks _source.
"""
symbols = self.copy()
if str(symbol) != "SYMBOL_NAME":
symbols["SYMBOL_NAME"] = symbols[str(symbol)]
if str(start) != "_PARAM_START_TIME_NANOS":
symbols["_PARAM_START_TIME_NANOS"] = symbols[str(start)]
if str(end) != "_PARAM_END_TIME_NANOS":
symbols["_PARAM_END_TIME_NANOS"] = symbols[str(end)]
if "SYMBOL_NAME" not in symbols.columns():
raise Exception("Ticks do not have the SYMBOL_NAME column, but this is mandatory column.")
# -------------- #
num_params = len(inspect.signature(func).parameters)
if num_params == 0:
logic = func()
elif num_params == 1:
logic = func(symbols.to_symbol_param())
else:
raise ValueError(
f"It is expected only one parameter from the callback, but {num_params} passed"
) # TODO: test this case
# -------------- #
# Find date range for the logic query
# If date range is not specified for at least one _source, then
# try to deduce date range - set to adaptive
# TODO: we don't need to get common symbol from _get_date_range,
# refactor this function, so we don't get an error if default symbol is not set
logic_start, logic_end, _ = logic.copy()._get_date_range(default_start=adaptive, default_end=adaptive)
if logic_start is adaptive:
logic_start = onetick.py.utils.INF_TIME
if logic_end is adaptive:
logic_end = onetick.py.ZERO_TIME
# The same as previous, but for the symbols
symbols_start, symbols_end, _ = symbols.copy()._get_date_range(default_start=adaptive, default_end=adaptive)
if symbols_start is adaptive:
symbols_start = onetick.py.utils.INF_TIME
if symbols_end is adaptive:
symbols_end = onetick.py.ZERO_TIME
# Query interval should be as wider as possible
start = min(logic_start, symbols_start)
end = max(logic_end, symbols_end)
# If nothing is specified, then just use default
if start == onetick.py.utils.INF_TIME:
start = None
if end == onetick.py.ZERO_TIME:
end = None
return _MultiSymbolsSource(symbols.copy(), logic, start=start, end=end)
@staticmethod
def _columns_to_params_for_joins(columns, query_params=False):
"""
Converts a dictionary of columns into a parameters string.
This is mainly used for join_with_query and join_with_collection.
query_params control whether resulting string should be considered query params or symbol params
(as it impacts some of the conversion rules)
"""
params_list = []
for key, value in columns.items():
dtype = ott.get_object_type(value)
convert_rule = "'" + key + "=' + "
if key == '_PARAM_SYMBOL_TIME' and not query_params:
# this symbol parameter has to be formatted differently because Onetick treats this parameter
# in a special way
if dtype is otp.nsectime:
convert_rule += f'NSECTIME_FORMAT("%Y%m%d%H%M%S.%J",{ott.value2str(value)},_TIMEZONE)'
elif dtype is str:
if are_strings(getattr(value, "dtype", None)):
convert_rule += str(value)
else:
convert_rule += '"' + value + '"'
else:
raise ValueError('Parameter symbol_time has to be a datetime value!')
elif dtype is str:
if are_strings(getattr(value, "dtype", None)):
convert_rule += str(value)
else:
convert_rule += '"' + value + '"'
elif dtype is otp.msectime:
convert_rule += "tostring(GET_MSECS(" + str(value) + "))"
elif dtype is otp.nsectime:
if key == '_SYMBOL_TIME' and query_params:
# hack to support passing _SYMBOL_TIME to called query as a parameter
warnings.warn('Query parameter _SYMBOL_TIME passed to join_with_query! '
'This is deprecated. Please use a dedicated `symbol_time` parameter of the '
'join_with_query function')
convert_rule += "tostring(GET_MSECS(" + str(value) + "))"
elif query_params:
# this can be used for query params but cannot be used for symbol params
# overall it's better
convert_rule += "'NSECTIME('+tostring(NSECTIME_TO_LONG(" + str(value) + "))+')'"
else:
# this matches the common way onetick converts nanoseconds to symbol parameters
convert_rule += "tostring(GET_MSECS(" + str(value) + \
"))+'.'+SUBSTR(NSECTIME_FORMAT('%J'," + str(value) + ",_TIMEZONE),3,6)"
else:
convert_rule += "tostring(" + str(value) + ")"
params_list.append(convert_rule)
return "+','+".join(params_list)
@staticmethod
def _get_default_fields_for_outer_join_str(default_fields_for_outer_join, how, sub_source_schema):
"""
Default fields for outer join definition.
Used by join_with_query() and join_with_collection()
"""
default_fields_for_outer_join_str = ''
if default_fields_for_outer_join:
if how != 'outer':
raise ValueError('The `default_fields_for_outer_join` parameter can be used only for outer join')
for field, expr in default_fields_for_outer_join.items():
if field not in sub_source_schema.keys():
raise KeyError(f'Field {field} is specified in `default_fields_for_outer_join` parameter, '
'but is not present in the joined source schema!')
if default_fields_for_outer_join_str != '':
default_fields_for_outer_join_str += ','
default_fields_for_outer_join_str += f'{field}={ott.value2str(expr)}'
return default_fields_for_outer_join_str
[docs] def join_with_collection(
self,
collection_name,
query_func=None,
how="outer",
params=None,
start=None,
end=None,
prefix=None,
caching=None,
keep_time=None,
default_fields_for_outer_join=None,
) -> 'Source':
"""
For each tick uses ``query_func`` to join ticks from ``collection_name`` tick collection
(tick set, unordered tick set, tick list, or tick deque).
Parameters
----------
collection_name: str
Name of the collection state variable from which to join ticks. Collections are the following types:
:py:class:`TickSet <onetick.py.core._internal._state_objects.TickSet>`,
:py:class:`TickSetUnordered <onetick.py.core._internal._state_objects.TickSetUnordered>`,
:py:class:`TickList <onetick.py.core._internal._state_objects.TickList>` and
:py:class:`TickDeque <onetick.py.core._internal._state_objects.TickDeque>`.
query_func: callable
Callable ``query_func`` should return :class:`Source`. If passed, this query will be used on ticks
from collection before joining them.
In this case, ``query_func`` object will be evaluated by OneTick (not python)
for every input tick. Note that python code will be executed only once,
so all python's conditional expressions will be evaluated only once too.
Callable should have ``source`` parameter. When callable is called, this parameter
will have value of a :class:`Source` object representing ticks loaded directly from the collection.
Any operation applied to this source will be applied to ticks from the collection
before joining them.
Also, callable should have the parameters with names
from ``params`` if they are specified in this method.
If ``query_func`` is not passed, then all ticks from the collection will be joined.
how: 'inner', 'outer'
Type of join. If **inner**, then output tick is propagated
only if some ticks from the collection were joined to the input tick.
params: dict
Mapping of the parameters' names and their values for the ``query_func``.
:py:class:`Columns <onetick.py.Column>` can be used as a value.
start: datetime, Operation
Start time to select ticks from collection.
If specified, only ticks in collection that have higher or equal timestamp will be processed.
If not passed, then there will be no lower time bound for the collection ticks.
This means that even ticks with TIMESTAMP lower than _START_TIME of the main query will be joined.
end: datetime, Operation
End time to select ticks from collection.
If specified, only ticks in collection that have lower timestamp will be processed.
If not passed, then there will be no upper time bound for the collection ticks.
This means that even ticks with TIMESTAMP higher than _END_TIME of the main query will be joined.
prefix : str
Prefix for the names of joined tick fields.
caching : str
If `None` caching is disabled. You can specify caching by using values:
* 'per_symbol': cache is different for each symbol.
keep_time : str
Name for the joined timestamp column. `None` means no timestamp column will be joined.
default_fields_for_outer_join : dict
When you use outer join, all output ticks will have fields from the schema of the joined source.
If nothing was joined to a particular output tick, these fields will have default values for their type.
This parameter allows to override the values that would be added to ticks for which nothing was joined.
Dictionary keys should be field names, and dictionary values should be constants
or :class:`Operation` expressions
Returns
-------
:class:`Source`
Source with joined ticks from ``collection_name``
See also
--------
**JOIN_WITH_COLLECTION_SUMMARY** OneTick event processor
Examples
--------
>>> # OTdirective: snippet-name: Special functions.join with collection.without query;
>>> src = otp.Tick(A=1)
>>> src.state_vars['TICK_SET'] = otp.state.tick_set('LATEST_TICK', 'B', otp.eval(otp.Tick(B=1, C='STR')))
>>> src = src.join_with_collection('TICK_SET')
>>> otp.run(src)[["A", "B", "C"]]
A B C
0 1 1 STR
>>> # OTdirective: snippet-name: Special functions.join with collection.with query and params;
>>> src = otp.Ticks(A=[1, 2, 3, 4, 5],
... B=[2, 2, 3, 3, 3])
>>> src.state_vars['TICK_LIST'] = otp.state.tick_list()
>>> def fun(tick): tick.state_vars['TICK_LIST'].push_back(tick)
>>> src = src.script(fun)
>>>
>>> def join_fun(source, param_b):
... source = source.agg(dict(VALUE=otp.agg.sum(source['A'])))
... source['VALUE'] = source['VALUE'] + param_b
... return source
>>>
>>> src = src.join_with_collection('TICK_LIST', join_fun, params=dict(param_b=src['B']))
>>> otp.run(src)[["A", "B", "VALUE"]]
A B VALUE
0 1 2 3
1 2 2 5
2 3 3 9
3 4 3 13
4 5 3 18
Join last standing quote from each exchange to trades:
>>> # OTdirective: snippet-name: Special functions.join with collection.standing quotes per exchange;
>>> trd = otp.Ticks(offset=[1000, 2000, 3000, 4000, 5000],
... PRICE=[10.1, 10.2, 10.15, 10.23, 10.4],
... SIZE=[100, 50, 100, 60, 200])
>>>
>>> qte = otp.Ticks(offset=[500, 600, 1200, 2500, 3500, 3600, 4800],
... EXCHANGE=['N', 'C', 'Q', 'Q', 'C', 'N', 'C'],
... ASK_PRICE=[10.2, 10.18, 10.18, 10.15, 10.31, 10.32, 10.44],
... BID_PRICE=[10.1, 10.17, 10.17, 10.1, 10.23, 10.31, 10.4])
>>>
>>> trd['TICK_TYPE'] = 'TRD'
>>> qte['TICK_TYPE'] = 'QTE'
>>>
>>> trd_qte = trd + qte
>>> trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'] = otp.state.tick_set('LATEST', 'EXCHANGE')
>>> del trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].schema['TICK_TYPE']
>>> del trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].schema['PRICE']
>>> del trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].schema['SIZE']
>>>
>>> trd_qte = trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].update(where=trd_qte['TICK_TYPE'] == 'QTE',
... value_fields=['ASK_PRICE', 'BID_PRICE'])
>>> trd, _ = trd_qte[trd_qte['TICK_TYPE'] == 'TRD']
>>> trd.drop(['ASK_PRICE', 'BID_PRICE', 'EXCHANGE'], inplace=True)
>>> trd = trd.join_with_collection('LAST_QUOTE_PER_EXCHANGE')
>>> otp.run(trd)[['PRICE', 'SIZE', 'EXCHANGE', 'ASK_PRICE', 'BID_PRICE']]
PRICE SIZE EXCHANGE ASK_PRICE BID_PRICE
0 10.10 100 N 10.20 10.10
1 10.10 100 C 10.18 10.17
2 10.20 50 N 10.20 10.10
3 10.20 50 C 10.18 10.17
4 10.20 50 Q 10.18 10.17
5 10.15 100 N 10.20 10.10
6 10.15 100 C 10.18 10.17
7 10.15 100 Q 10.15 10.10
8 10.23 60 N 10.32 10.31
9 10.23 60 C 10.31 10.23
10 10.23 60 Q 10.15 10.10
11 10.40 200 N 10.32 10.31
12 10.40 200 C 10.44 10.40
13 10.40 200 Q 10.15 10.10
"""
# check that passed collection is good
if collection_name not in self.state_vars.names:
raise KeyError(f'Collection with name {collection_name} is not in the list of available state variables')
if not isinstance(self.state_vars[collection_name], _TickSequence):
raise ValueError(f'State variable {collection_name} is not a tick collection! '
'Only TickSet, TickSetUnordered, TickList and TickDeque objects are supported '
'as data sources for join_with_collection')
if params is None:
params = {}
special_params = ('source', '__fixed_start_time', '__fixed_end_time')
for sp_param in special_params:
if sp_param in params.keys():
raise ValueError(f'Parameter name "{sp_param}" is special and cannot be used for params '
'of join_with_collection function. Please, select a different name.')
# JOIN_WITH_COLLECTION_SUMMARY has START_TIME and END_TIME parameters with the precision of millisecond.
# So, here we add a workaround on onetick.py side to support nsectime precision
# "start" and "end" parameters of the EP are kept as they may be necessary
# for performance reasons
if start is not None:
params['__fixed_start_time'] = start
start = start - otp.Milli(1)
if end is not None:
params['__fixed_end_time'] = end
end = end + otp.Milli(1)
# prepare temporary file
# ------------------------------------ #
# TODO: this should be a common code somewhere
collection_schema = {
key: value for key, value in self.state_vars[collection_name].schema.items()
if key not in self.__class__.meta_fields and key not in self.__class__._PROPERTIES
}
join_source_root = otp.DataSource(db=otp.config.default_db,
tick_type="ANY",
schema_policy="manual",
**collection_schema)
if query_func is None:
query_func = lambda source: source # noqa
converted_params = prepare_params(**params)
fixed_start_time = None
fixed_end_time = None
if '__fixed_start_time' in converted_params.keys():
fixed_start_time = converted_params['__fixed_start_time']
del converted_params['__fixed_start_time']
if '__fixed_end_time' in converted_params.keys():
fixed_end_time = converted_params['__fixed_end_time']
del converted_params['__fixed_end_time']
sub_source = query_func(source=join_source_root, **converted_params)
if fixed_start_time is not None:
sub_source = sub_source[sub_source['TIMESTAMP'] >= fixed_start_time][0]
if fixed_end_time is not None:
sub_source = sub_source[sub_source['TIMESTAMP'] < fixed_end_time][0]
sub_source = self._process_keep_time_param(keep_time, sub_source)
params_str = self._columns_to_params_for_joins(params, query_params=True)
sub_source_schema = sub_source.schema.copy()
columns = {}
columns.update(self._get_columns_with_prefix(sub_source, prefix))
columns.update(self.columns(skip_meta_fields=True))
res = self.copy(columns=columns)
res._merge_tmp_otq(sub_source)
query_name = sub_source._store_in_tmp_otq(
res._tmp_otq,
symbols='_NON_EXISTING_SYMBOL_',
operation_suffix="join_with_collection"
)
# ------------------------------------ #
default_fields_for_outer_join_str = self._get_default_fields_for_outer_join_str(
default_fields_for_outer_join, how, sub_source_schema
)
join_params = dict(
collection_name=str(self.state_vars[collection_name]),
otq_query=f'"THIS::{query_name}"',
join_type=how.upper(),
otq_query_params=params_str,
default_fields_for_outer_join=default_fields_for_outer_join_str,
)
self._fill_aux_params_for_joins(
join_params, caching, end, prefix, start,
symbol_name=None, timezone=None, join_with_collection=True
)
res.sink(otq.JoinWithCollectionSummary(**join_params))
res._add_table()
res.sink(otq.Passthrough(fields="TIMESTAMP", drop_fields=True))
return res
[docs] def join_with_query(
self,
query,
how="outer",
symbol=None,
params=None,
start=None,
end=None,
timezone=None,
prefix=None,
caching=None,
keep_time=None,
where=None,
default_fields_for_outer_join=None,
symbol_time=None,
concurrency=None,
**kwargs,
) -> 'Source':
"""
For each tick executes ``query``.
Parameters
----------
query: callable, Source
Callable ``query`` should return :class:`Source`. This object will be evaluated by OneTick (not python)
for every tick. Note python code will be executed only once, so all python's conditional expressions
will be evaluated only once too.
Callable should have ``symbol`` parameter and the parameters with names
from ``params`` if they are specified in this method.
If ``query`` is a :class:`Source` object then it will be propagated as a query to OneTick.
how: 'inner', 'outer'
Type of join. If **inner**, then each tick is propagated
only if its ``query`` execution has a non-empty result.
params: dict
Mapping of the parameters' names and their values for the ``query``.
:py:class:`Columns <onetick.py.Column>` can be used as a value.
symbol: str, Operation, dict, Source, or Tuple[Union[str, Operation], Union[dict, Source]]
Symbol name to use in ``query``. In addition, symbol params can be passed along with symbol name.
Symbol name can be passed as a string or as an :class:`Operation`.
Symbol parameters can be passed as a dictionary. Also, the main :class:`Source` object,
or the object containing a symbol parameter list, can be used as a list of symbol parameter.
Special symbol parameters (`_PARAM_START_TIME_NANOS` and `_PARAM_END_TIME_NANOS`)
will be ignored and will not be propagated to ``query``.
``symbol`` will be interpreted as a symbol name or as symbol parameters, depending on its type.
You can pass both as a tuple.
If symbol name is not passed, then symbol name from the main source is used.
start: datetime, Operation
Start time of ``query``.
By default, start time of the main source is used.
end: datetime, Operation
End time of ``query``.
By default, end time of the main source is used.
start_time:
.. deprecated:: 1.48.4
The same as ``start``.
end_time:
.. deprecated:: 1.48.4
The same as ``end``.
timezone : Optional, str
Timezone of ``query``.
By default, timezone of the main source is used.
prefix : str
Prefix for the names of joined tick fields.
caching : str
If `None` caching is disabled. You can specify caching by using values:
* 'cross_symbol': cache is the same for all symbols
* 'per_symbol': cache is different for each symbol.
keep_time : str
Name for the joined timestamp column. `None` means no timestamp column will be joined.
where : Operation
Condition to filter ticks for which the result of the ``query`` will be joined.
default_fields_for_outer_join : dict
When you use outer join, all output ticks will have fields from the schema of the joined source.
If nothing was joined to a particular output tick, these fields will have default values for their type.
This parameter allows to override the values that would be added to ticks for which nothing was joined.
Dictionary keys should be field names, and dictionary values should be constants
or :class:`Operation` expressions
symbol_time : datetime, Operation
Time that will be used by Onetick to map the symbol with which ``query`` is executed to the reference data.
This parameter is only necessary if the query is expected to perform symbology conversions.
concurrency : int
Specifies number of threads for asynchronous processing of ``query`` per unbound symbol list.
By default, the number of threads is 1.
Returns
-------
:class:`Source`
Source with joined ticks from ``query``
See also
--------
**JOIN_WITH_QUERY** OneTick event processor
Examples
--------
>>> # OTdirective: snippet-name: Special functions.join with query.with an otp data source;
>>> d = otp.Ticks(Y=[-1])
>>> d = d.update(dict(Y=1), where=(d.Symbol.name == "a"))
>>> data = otp.Ticks(X=[1, 2],
... S=["a", "b"])
>>> res = data.join_with_query(d, how='inner', symbol=data['S'])
>>> otp.run(res)[["X", "Y", "S"]]
X Y S
0 1 1 a
1 2 -1 b
>>> d = otp.Ticks(ADDED=[-1])
>>> d = d.update(dict(ADDED=1), where=(d.Symbol.name == "3")) # symbol name is always string
>>> data = otp.Ticks(A=[1, 2], B=[2, 4])
>>> res = data.join_with_query(d, how='inner', symbol=(data['A'] + data['B'])) # OTdirective: skip-snippet:;
>>> df = otp.run(res)
>>> df[["A", "B", "ADDED"]]
A B ADDED
0 1 2 1
1 2 4 -1
Constants as symbols are also supported
>>> d = otp.Ticks(ADDED=[d.Symbol.name])
>>> data = otp.Ticks(A=[1, 2], B=[2, 4])
>>> res = data.join_with_query(d, how='inner', symbol=1) # OTdirective: skip-snippet:;
>>> df = otp.run(res)
>>> df[["A", "B", "ADDED"]]
A B ADDED
0 1 2 1
1 2 4 1
Function object as query is also supported (Note it will be executed only once in python's code)
>>> def func(symbol):
... d = otp.Ticks(TYPE=["six"])
... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string
... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST']
... return d
>>> # OTdirective: snippet-name: Special functions.join with query.with a function
>>> data = otp.Ticks(A=[1, 2], B=[2, 4])
>>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B'], dict(PREF="_", POST="$")))
>>> df = otp.run(res)
>>> df[["A", "B", "TYPE"]]
A B TYPE
0 1 2 _three$
1 2 4 _six$
It's possible to pass the source itself as a list of symbol parameters, which will make all of its fields
accessible through the "symbol" object
>>> def func(symbol):
... d = otp.Ticks(TYPE=["six"])
... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST']
... return d
>>> # OTdirective: snippet-name: Source operations.join with query.source as symbol;
>>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["_", "$"], POST=["$", "_"])
>>> res = data.join_with_query(func, how='inner', symbol=data)
>>> df = otp.run(res)
>>> df[["A", "B", "TYPE"]]
A B TYPE
0 1 2 _six$
1 2 4 $six_
The examples above can be rewritten by using onetick query parameters instead of symbol parameters.
OTQ parameters are global for query, while symbol parameters can be redefined by bound symbols.
>>> def func(symbol, pref, post):
... d = otp.Ticks(TYPE=["six"])
... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string
... d["TYPE"] = pref + d["TYPE"] + post
... return d
>>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params;
>>> data = otp.Ticks(A=[1, 2], B=[2, 4])
>>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']),
... params=dict(pref="_", post="$"))
>>> df = otp.run(res)
>>> df[["A", "B", "TYPE"]]
A B TYPE
0 1 2 _three$
1 2 4 _six$
Some or all onetick query parameters can be column or expression also
>>> def func(symbol, pref, post):
... d = otp.Ticks(TYPE=["six"])
... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string
... d["TYPE"] = pref + d["TYPE"] + post
... return d
>>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params from fields; # noqa
>>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["^", "_"], POST=["!", "$"])
>>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']),
... params=dict(pref=data["PREF"] + ".", post=data["POST"]))
>>> df = otp.run(res)
>>> df[["A", "B", "TYPE"]]
A B TYPE
0 1 2 ^.three!
1 2 4 _.six$
You can specify start and end time of the query to select specific ticks from db
>>> # OTdirective: snippet-name: Special functions.join with query.passing start/end times;
>>> d = otp.Ticks(Y=[1, 2])
>>> data = otp.Ticks(X=[1, 2])
>>> start = datetime(2003, 12, 1, 0, 0, 0, 1000, tzinfo=pytz.timezone("EST5EDT"))
>>> end = datetime(2003, 12, 1, 0, 0, 0, 3000, tzinfo=pytz.timezone("EST5EDT"))
>>> res = data.join_with_query(d, how='inner', start=start, end=end)
>>> otp.run(res)
Time Y X
0 2003-12-01 00:00:00.000 1 1
1 2003-12-01 00:00:00.000 2 1
2 2003-12-01 00:00:00.001 1 2
3 2003-12-01 00:00:00.001 2 2
Use keep_time param to keep or rename original timestamp column
>>> # OTdirective: snippet-name: Special functions.join with query.keep the timestamps of the joined ticks;
>>> d = otp.Ticks(Y=[1, 2])
>>> data = otp.Ticks(X=[1, 2])
>>> res = data.join_with_query(d, how='inner', keep_time="ORIG_TIME")
>>> otp.run(res)
Time Y ORIG_TIME X
0 2003-12-01 00:00:00.000 1 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.000 2 2003-12-01 00:00:00.001 1
2 2003-12-01 00:00:00.001 1 2003-12-01 00:00:00.000 2
3 2003-12-01 00:00:00.001 2 2003-12-01 00:00:00.001 2
"""
# TODO: check if join_with_query checks schema of joined source against primary source,
# by itself or with process_by_group
if params is None:
params = {}
# "symbol" parameter can contain a symbol name (string, field, operation etc),
# a symbol parameter list (dict, Source, _SymbolParamSource),
# or both together as a tuple
def _check_and_convert_symbol(symbol):
if isinstance(symbol, _Operation): # TODO: PY-35
return True, f"tostring({symbol})"
elif isinstance(symbol, str):
return True, f"'{symbol}'"
elif type(symbol) in {int, float}: # constant
return True, f"tostring({symbol})"
elif symbol is None:
# this is necessary to distinguish None (which is valid value for symbol) from invalid values
return True, None
else:
return False, None
def _convert_symbol_param_and_columns(symbol_param):
"""
We need to create two objects from a symbol param (a dict, a Source or a _SymbolParamSource):
1. Dictionary of columns to generate list of symbol parameters for the JOIN_WITH_QUERY EP
2. _SymbolParamSource object to pass to the source function if necessary
"""
if isinstance(symbol_param, dict):
converted_symbol_param_columns = symbol_param
converted_symbol_param = _SymbolParamSource(**{key: ott.get_object_type(column)
for key, column in symbol_param.items()})
elif isinstance(symbol_param, _Source):
converted_symbol_param_columns = {field_name: symbol_param[field_name]
for field_name in symbol_param.columns(skip_meta_fields=True).keys()}
converted_symbol_param = symbol_param.to_symbol_param()
elif isinstance(symbol_param, _SymbolParamSource):
converted_symbol_param_columns = {field_name: symbol_param[field_name]
for field_name in symbol_param.schema.keys()}
converted_symbol_param = symbol_param
else:
return None, None
# we want to pass all the fields to the joined query as symbol parameters,
# except for some special fields that would override explicitly set parameters
ignore_symbol_fields = [
'_PARAM_START_TIME_NANOS',
'_PARAM_END_TIME_NANOS',
]
filtered_converted_symbol_param_columns = {}
for field_name, field_value in converted_symbol_param_columns.items():
if field_name in ignore_symbol_fields:
warnings.warn(f'Special symbol parameter "{field_name}" was passed to the joined query! '
'This parameter would be ignored. Please, use parameters of the `join_with_query` '
'function itself to set it.')
else:
filtered_converted_symbol_param_columns[field_name] = field_value
filtered_converted_symbol_param = _SymbolParamSource(
**{field_name: field_value for field_name, field_value in converted_symbol_param.schema.items()
if field_name not in ignore_symbol_fields}
)
return filtered_converted_symbol_param_columns, filtered_converted_symbol_param
# if "symbol" is tuple, we unpack it
if isinstance(symbol, tuple) and len(symbol) == 2:
symbol_name, symbol_param = symbol
else:
# see if "symbol" contains symbol name or symbol params
is_symbol, _ = _check_and_convert_symbol(symbol)
if is_symbol:
symbol_name = symbol
symbol_param = {}
else:
symbol_name = None
symbol_param = symbol
_, converted_symbol_name = _check_and_convert_symbol(symbol_name)
# default symbol name should be this: _SYMBOL_NAME if it is not empty else _NON_EXISTING_SYMBOL_
# this way we will force JWQ to substitute symbol with any symbol parameters we may have passed
# otherwise (if an empty symbol name is passed to JWQ), it will not substitute either symbol name
# or symbol parameters, and so symbol parameters may get lost
# see BDS-263
if converted_symbol_name is None:
converted_symbol_name = "CASE(_SYMBOL_NAME,'','_NON_EXISTING_SYMBOL',_SYMBOL_NAME)"
converted_symbol_param_columns, converted_symbol_param = _convert_symbol_param_and_columns(symbol_param)
if converted_symbol_param is None:
# we couldn't interpret "symbols" as either symbol name or symbol parameters
raise ValueError('"symbol" parameter has a wrong format! It should be a symbol name, a symbol parameter '
'object (dict or Source), or a tuple containing both')
# adding symbol time
if '_PARAM_SYMBOL_TIME' in converted_symbol_param_columns.keys():
warnings.warn('"_PARAM_SYMBOL_TIME" explicitly passed among join_with_query symbol parameters! '
'This is deprecated - please use symbol_time parameter instead. '
'If you specify symbol_time parameter, it will override the explicitly passed value')
if symbol_time is not None:
if ott.get_object_type(symbol_time) is not otp.nsectime and ott.get_object_type(symbol_time) is not str:
raise ValueError(f'Parameter of type {ott.get_object_type(symbol_time)} passed as symbol_time! '
'This parameter only supports datetime values or strings')
converted_symbol_param_columns['_PARAM_SYMBOL_TIME'] = symbol_time
# prepare temporary file
# ------------------------------------ #
converted_params = prepare_params(**params)
if isinstance(query, Source):
sub_source = query
else:
# inspect function
# -------
sig = inspect.signature(query)
if "symbol" in sig.parameters:
if "symbol" in converted_params.keys():
raise AttributeError('"params" contains key "symbol", which is reserved for symbol parameters. '
'Please, rename this parameter to another name')
converted_params["symbol"] = converted_symbol_param # type: ignore
sub_source = query(**converted_params)
sub_source = self._process_keep_time_param(keep_time, sub_source)
if not sub_source._is_unbound_required():
sub_source += onetick.py.sources.Empty()
params_str = self._columns_to_params_for_joins(params, query_params=True)
symbol_params_str = self._columns_to_params_for_joins(converted_symbol_param_columns)
sub_source_schema = sub_source.schema.copy()
columns = {}
columns.update(self._get_columns_with_prefix(sub_source, prefix))
columns.update(self.columns(skip_meta_fields=True))
res = self.copy(columns=columns)
res._merge_tmp_otq(sub_source)
query_name = sub_source._store_in_tmp_otq(
res._tmp_otq,
symbols='_NON_EXISTING_SYMBOL_',
operation_suffix="join_with_query"
) # TODO: combine with _convert_symbol_to_string
# ------------------------------------ #
if where is not None and how != 'outer':
raise ValueError('The `where` parameter can be used only for outer join')
default_fields_for_outer_join_str = self._get_default_fields_for_outer_join_str(
default_fields_for_outer_join, how, sub_source_schema
)
join_params = dict(
otq_query=f'"THIS::{query_name}"',
join_type=how.upper(),
otq_query_params=params_str,
symbol_params=symbol_params_str,
where=str(where._make_python_way_bool_expression()) if where is not None else '',
default_fields_for_outer_join=default_fields_for_outer_join_str,
)
if concurrency is not None:
if type(concurrency) is not int or concurrency <= 0:
raise ValueError('Wrong value of concurrency parameter passed! '
'concurrency should be a positive integer')
join_params['shared_thread_count'] = concurrency
start_time = kwargs.get('start_time', start)
end_time = kwargs.get('end_time', end)
self._fill_aux_params_for_joins(
join_params, caching, end_time, prefix, start_time, converted_symbol_name, timezone
)
res.sink(otq.JoinWithQuery(**join_params))
res._add_table()
res.sink(otq.Passthrough(fields="TIMESTAMP", drop_fields=True))
return res
@property
def state_vars(self) -> StateVars:
"""
Provides access to state variables
Returns
-------
State Variables: Dict[str, state variable]
State variables, you can access one with its name.
See Also
--------
| `State Variables \
<../../static/getting_started/variables_and_data_structures.html#variables-and-data-structures>`_
| **DECLARE_STATE_VARIABLES** OneTick event processor
"""
return self.__dict__['_state_vars']
__invalid_query_name_symbols_regex = re.compile('[^a-zA-Z0-9_]')
def __remove_invalid_symbols(self, s):
"""
Replaces symbols that cannot be put in query names with '_'
"""
return self.__invalid_query_name_symbols_regex.sub('_', s)
def get_name(self, remove_invalid_symbols=False):
"""
Returns source name.
If remove_invalid_symbols == True, returned name only contains symbols that can be put in query names.
"""
if remove_invalid_symbols and self.__name:
return self.__remove_invalid_symbols(self.__name)
else:
return self.__name
def set_name(self, new_name):
"""
Sets source name.
Source name cannot be an empty string but can be None.
"""
assert isinstance(new_name, str) or new_name is None, "Source name must be a string or None."
if new_name is not None:
assert new_name != '', "Source name must be a non-empty string."
self.__name = new_name
def _name_suffix(self, suffix, separator='.', remove_invalid_symbols=False):
if remove_invalid_symbols:
suffix = self.__remove_invalid_symbols(suffix)
separator = self.__remove_invalid_symbols(separator)
name = self.get_name(remove_invalid_symbols=True)
else:
name = self.__name
return f'{separator}{name}{separator}{suffix}' if name else f'{separator}{suffix}'
@property
def schema(self) -> Schema:
"""
Represents actual python data schema in the column-name -> type format.
For example, could be used after the :meth:`Source.sink` to adjust
the schema.
Returns
-------
Schema
See Also
--------
Source.sink
Examples
--------
>>> data = otp.Ticks([['X', 'Y', 'Z'],
... [ 1, 0.5, 'abc']])
>>> data['T'] = data['Time']
>>> data.schema
{'X': <class 'int'>, 'Y': <class 'float'>, 'Z': <class 'str'>, 'T': <class 'onetick.py.types.nsectime'>}
>>> data.schema['X']
<class 'int'>
>>> data.schema['X'] = float
>>> data.schema['X']
<class 'float'>
>>> 'W' in data.schema
False
>>> data.schema['W'] = otp.nsectime
>>> 'W' in data.schema
True
>>> data.schema['W']
<class 'onetick.py.types.nsectime'>
"""
schema = self.columns(skip_meta_fields=True)
hidden_columns = {'Time': ott.nsectime, 'TIMESTAMP': ott.nsectime}
return Schema(_base_source=self, _hidden_columns=hidden_columns, **schema)
def set_schema(self, **kwargs):
"""
Set schema of the source.
Note: this method affect python part only and won't make any db queries. It used to set schema after db reading/
complex query.
.. deprecated:: please use the :property:`Source.schema` to access and adjust the schema.
Parameters
----------
kwargs
schema in the column_name=type format
Examples
--------
Python can't follow low level change of column, e.g. complex query or pertick script can be sink.
>>> data = otp.Ticks(dict(A=[1, 2], B=["a", "b"]))
>>> data.sink(otq.AddField(field='Z', value='5')) # doctest: +SKIP
>>> data.columns(skip_meta_fields=True)
{'A': <class 'int'>, 'B': <class 'str'>}
>>> # OTdirective: snippet-name: Arrange.schema.set;
>>> data.set_schema(A=int, B=str, Z=int)
>>> data.columns(skip_meta_fields=True)
{'A': <class 'int'>, 'B': <class 'str'>, 'Z': <class 'int'>}
"""
self.drop_columns()
for name, dtype in kwargs.items():
dtype = ott.get_source_base_type(dtype)
self[(name, dtype)]
def _process_keep_time_param(self, keep_time, sub_source):
if keep_time == "TIMESTAMP":
raise ValueError("TIMESTAMP is reserved OneTick name, please, specify another one.")
if keep_time in self.columns():
raise ValueError(f"{keep_time} column is already presented.")
sub_source = sub_source.copy()
if keep_time:
sub_source[keep_time] = sub_source["Time"]
return sub_source
def _get_columns_with_prefix(self, sub_source, prefix) -> dict:
sub_source_columns = sub_source.schema
if prefix is None:
prefix = ""
if not isinstance(prefix, str):
raise ValueError("Only string constants are supported for now.")
new_columns = {prefix + name: dtype for name, dtype in sub_source_columns.items()}
same_names = set(new_columns) & set(self.schema)
if same_names:
raise ValueError(f"After applying prefix some columns aren't unique: {', '.join(same_names)}.")
return new_columns
def _fill_aux_params_for_joins(
self, join_params, caching, end_time, prefix, start_time, symbol_name, timezone, join_with_collection=False
):
if symbol_name and not join_with_collection:
join_params["symbol_name"] = symbol_name
if prefix is not None:
join_params["prefix_for_output_ticks"] = str(prefix)
if caching:
if join_with_collection:
supported = ("per_symbol",)
else:
supported = ("cross_symbol", "per_symbol")
if caching in supported:
join_params["caching_scope"] = caching
else:
raise ValueError(f"Unknown value for caching param, please use None or any of {supported}.")
self._fill_time_param_for_jwq(join_params, start_time, end_time, timezone)
if join_with_collection:
del join_params['timezone']
def _fill_time_param_for_jwq(self, join_params, start_time, end_time, timezone):
self._process_start_or_end_of_jwq(join_params, start_time, "start_timestamp")
self._process_start_or_end_of_jwq(join_params, end_time, "end_timestamp")
if timezone:
join_params["timezone"] = f"'{timezone}'"
else:
join_params["timezone"] = "_TIMEZONE" # this may break something, need to test
def _process_start_or_end_of_jwq(self, join_params, time, param_name):
if time is not None:
if isinstance(time, (datetime, otp.dt)):
join_params[f"{param_name}"] = time.timestamp() * 1000
elif isinstance(time, _Operation):
join_params[f"{param_name}"] = str(time)
else:
raise ValueError(f"{param_name} should be datetime.datetime instance or OneTick expression")
[docs] @inplace_operation
def transpose(
self,
inplace: bool = False,
direction: Literal['rows', 'columns'] = 'rows',
n: Optional[int] = None,
) -> Optional['Source']:
"""
Data transposing.
The main idea is joining many ticks into one or splitting one tick to many.
Parameters
----------
inplace: bool, default=False
if `True` method will modify current object,
otherwise it will return modified copy of the object.
direction: 'rows', 'columns', default='rows'
- `rows`: join certain input ticks (depending on other parameters) with preceding ones.
Fields of each tick will be added to the output tick and their names will be suffixed
with **_K** where **K** is the positional number of tick (starting from 1) in reverse order.
So fields of current tick will be suffixed with **_1**, fields of previous tick will be
suffixed with **_2** and so on.
- `columns`: the operation is opposite to `rows`. It splits each input tick to several
output ticks. Each input tick must have fields with names suffixed with **_K**
where **K** is the positional number of tick (starting from 1) in reverse order.
n: Optional[int], default=None
must be specified only if ``direction`` is 'rows'.
Joins every **n** number of ticks with **n-1** preceding ticks.
Returns
-------
If ``inplace`` parameter is `True` method will return `None`,
otherwise it will return modified copy of the object.
See also
--------
**TRANSPOSE** OneTick event processor
Examples
--------
Merging two ticks into one.
>>> data = otp.Ticks(dict(A=[1, 2],
... B=[3, 4]))
>>> data = data.transpose(direction='rows', n=2) # OTdirective: skip-snippet:;
>>> otp.run(data)
Time TIMESTAMP_1 A_1 B_1 TIMESTAMP_2 A_2 B_2
0 2003-12-01 00:00:00.001 2003-12-01 00:00:00.001 2 4 2003-12-01 1 3
And splitting them back into two.
>>> data = data.transpose(direction='columns') # OTdirective: skip-snippet:;
>>> otp.run(data)
Time A B
0 2003-12-01 00:00:00.000 1 3
1 2003-12-01 00:00:00.001 2 4
"""
direction_map = {'rows': 'ROWS_TO_COLUMNS', 'columns': 'COLUMNS_TO_ROWS'}
n = '' if n is None else n
self.sink(otq.Transpose(direction=direction_map[direction],
key_constraint_values=n))
# TODO: we should change source's schema after transposing
return self
[docs] @inplace_operation
def process_by_group(self, process_source_func, group_by=None, source_name=None,
num_threads=None, inplace=False) -> Union['Source', Tuple['Source', ...], None]:
"""
Groups data by ``group_by`` and run ``process_source_func`` for each group and merge outputs for every group.
Note ``process_source_func`` will be converted to Onetick object and passed to query,
that means that python callable will be called only once.
Parameters
----------
process_source_func: callable
``process_source_func`` should take :class:`Source` apply necessary logic and return it
or tuple of :class:`Source` in this case all of them should have a common root that is the
input :class:`Source`.
The number of sources returned by this method is the same as the number of sources
returned by ``process_source_func``.
group_by: list
A list of field names to group input ticks by.
If group_by is None then no group_by fields are defined
and logic of ``process_source_func`` is applied to all input ticks
at once
source_name: str
A name for the source that represents all of group_by sources. Can be passed here or as a name
of the inner sources; if passed by both ways, should be consistent
num_threads: int
If specified and not zero, turns on asynchronous processing mode
and specifies number of threads to be used for processing input ticks.
If this parameter is not specified or zero, then input ticks are processed synchronously.
inplace: bool
If True - nothing will be returned and changes will be applied to current query
otherwise changes query will be returned.
Error is raised if ``inplace`` is set to True
and multiple sources returned by ``process_source_func``.
Returns
-------
:class:`Source`, Tuple[:class:`Source`] or None:
See also
--------
**GROUP_BY** OneTick event processor
Examples
--------
>>> # OTdirective: snippet-name: Arrange.group.single output;
>>> d = otp.Ticks(X=[1, 1, 2, 2],
... Y=[1, 2, 3, 4])
>>>
>>> def func(source):
... return source.first()
>>>
>>> res = d.process_by_group(func, group_by=['X'])
>>> otp.run(res)[["X", "Y"]]
X Y
0 1 1
1 2 3
Set asynchronous processing:
>>> res = d.process_by_group(func, group_by=['X'], num_threads=2)
>>> otp.run(res)[['X', 'Y']]
X Y
0 1 1
1 2 3
Return multiple outputs, each with unique grouping logic:
>>> d = otp.Ticks(X=[1, 1, 2, 2],
... Y=[1, 2, 1, 3])
>>>
>>> def func(source):
... source['Z'] = source['X']
... source2 = source.copy()
... source = source.first()
... source2 = source2.last()
... return source, source2
>>> # OTdirective: snippet-name: Arrange.group.multiple output;
>>> res1, res2 = d.process_by_group(func, group_by=['Y'])
>>> df1 = otp.run(res1)
>>> df2 = otp.run(res2)
>>> df1[['X', 'Y', 'Z']]
X Y Z
0 1 1 1
1 1 2 1
2 2 3 2
>>> df2[['X', 'Y', 'Z']] # OTdirective: skip-snippet:;
X Y Z
0 1 2 1
1 2 1 2
2 2 3 2
"""
if group_by is None:
group_by = []
if inplace:
main_source = self
else:
main_source = self.copy()
input_schema = main_source.columns(skip_meta_fields=True)
for field in group_by:
if field not in input_schema:
raise ValueError(f"Group by field name {field} not present in input source schema")
process_source_root = onetick.py.sources.Custom(tick_type="ANY", schema_policy="manual", **input_schema)
if source_name:
process_source_root.set_name(source_name)
process_sources = process_source_func(process_source_root)
if isinstance(process_sources, Source):
# returned one source
process_sources = [process_sources]
elif len(process_sources) == 1:
# returned one source as an iterable
pass
else:
# returned multiple sources
if inplace:
raise ValueError("Cannot use inplace=True with multi-source processing function!")
num_source = 0
for process_source in process_sources:
output_schema = process_source.columns(skip_meta_fields=True)
if process_source.get_name():
if not process_source_root.get_name():
process_source_root.set_name(process_source.get_name())
if process_source_root.get_name() != process_source.get_name():
warnings.warn("Different strings passed as names for the root source used in "
f"process_by_group: '{process_source.get_name()}' "
f"and '{process_source_root.get_name()}'")
# removing key fields from output schema since they will be
# added by the GROUP_BY EP
process_source.drop([field for field in group_by if field in output_schema], inplace=True)
process_source.sink(otq.Passthrough().node_name(f"OUT_{num_source}"))
process_source_root.node().add_rules(process_source.node().copy_rules())
main_source._merge_tmp_otq(process_source)
num_source += 1
query_name = process_source_root._store_in_tmp_otq(
main_source._tmp_otq, operation_suffix="group_by", add_passthrough=False
)
process_path = f'THIS::{query_name}'
num_outputs = len(process_sources)
# we shouldn't set named outputs if GROUP_BY EP has only one output due to onetick behaviour
if num_outputs == 1:
outputs = ""
else:
outputs = ",".join([f"OUT_{i}" for i in range(0, num_outputs)])
kwargs = {}
if num_threads is not None:
if num_threads < 0:
raise ValueError("Parameter 'num_threads' can't be negative.")
kwargs['num_threads'] = num_threads
main_source.sink(
otq.GroupBy(key_fields=",".join(group_by), query_name=process_path, outputs=outputs, **kwargs)
)
output_sources = []
for num_output in range(0, num_outputs):
if num_outputs == 1 and inplace:
output_source = main_source
else:
output_source = main_source.copy()
if num_outputs > 1:
output_source.node().out_pin(f"OUT_{num_output}")
# setting schema after processing
output_schema = process_sources[num_output].columns(skip_meta_fields=True)
for field in group_by:
output_schema[field] = input_schema[field]
for field, field_type in output_schema.items():
output_source.schema[field] = field_type
output_source = output_source[[field for field in output_schema]]
output_source._merge_tmp_otq(main_source)
output_sources.append(output_source)
if num_outputs == 1:
return output_sources[0]
else:
return tuple(output_sources)
def unite_columns(self, sep="", *, apply_str=False):
"""
Join values of all columns into one string
The method unite all fields to one string, just like python ``join`` method. All fields should be strings,
otherwise the error will be generated. To change this behavior, ``apply_str=True`` argument should be specified,
in this case all fields will be converted to string type before joining.
Parameters
----------
sep: str
Separator between values, empty string be dafault.
apply_str: bool
If set every column will be converted to string during operation. False be default.
Returns
-------
result: column
Column with str type
Examples
--------
>>> # OTdirective: snippet-name: Arrange.join columns as strings;
>>> data = otp.Ticks(X=[1, 2, 3], A=["A", "A", "A"], B=["A", "B", "C"])
>>> data["S_ALL"] = data.unite_columns(sep=",", apply_str=True)
>>> data["S"] = data[["A", "B"]].unite_columns()
>>> otp.run(data)[["S", "S_ALL"]]
S S_ALL
0 AA 1,A,A
1 AB 2,A,B
2 AC 3,A,C
"""
if apply_str:
cols = (self[col].apply(str) for col in self.schema)
else:
not_str = [name for name, t in self.schema.items() if not are_strings(t)]
if not_str:
raise ValueError(f"All joining columns should be strings, while {', '.join(not_str)} "
f"are not. Specify `apply_str=True` for automatic type conversion")
else:
cols = (self[col] for col in self.schema)
return functools.reduce(lambda x, y: x + sep + y, cols)
# Aggregations copy
# we need this functions to store and collect documentation
# copy_method decorator will
# set docstring (will compare docstring of donor function and method docstring)
# apply same signature from donor function + self
# for mimic=True will apply agg function as is
[docs] @copy_method(high_tick)
def high(self):
"""
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000])
>>> data = data.high(['X'], 2) # OTdirective: snippet-name: Aggregations.high tick;
>>> otp.run(data)
Time X
0 2003-12-01 00:00:01.500 3
1 2003-12-01 00:00:03.000 4
"""
pass
[docs] @copy_method(low_tick)
def low(self):
"""
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000])
>>> data = data.low(['X'],2) # OTdirective: snippet-name: Aggregations.low tick;
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00 1
1 2003-12-01 00:00:01 2
"""
pass
[docs] @copy_method(first_tick)
def first(self):
"""
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.first() # OTdirective: snippet-name: Aggregations.first;
>>> otp.run(data)
Time X
0 2003-12-01 1
"""
pass
[docs] @copy_method(last_tick)
def last(self):
"""
Examples
--------
>>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000])
>>> data = data.last() # OTdirective: snippet-name: Aggregations.last;
>>> otp.run(data)
Time X
0 2003-12-01 00:00:03 4
"""
pass
[docs] @copy_method(distinct, mimic=False)
def distinct(self, *args, **kwargs):
"""
Examples
--------
>>> data = otp.Ticks(dict(x=[1, 3, 1, 5, 3]))
>>> data = data.distinct('x') # OTdirective: snippet-name: Aggregations.distinct;
>>> otp.run(data)
Time x
0 2003-12-04 1
1 2003-12-04 3
2 2003-12-04 5
"""
if 'bucket_interval_units' in kwargs:
kwargs['bucket_units'] = kwargs.pop('bucket_interval_units')
agg = distinct(*args, **kwargs)
return agg.apply(self)
[docs] @copy_method(high_time, mimic=False) # mimic=False for backward compatibility
def high_time(self, *args, **kwargs):
"""
Returns timestamp of tick with the highest value of input field
.. deprecated:: 1.14.5
Use :py:func:`.high_time` instead
See Also
--------
:py:func:`.high_time`
"""
warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. "
f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead",
DeprecationWarning, stacklevel=2)
agg = high_time(*args, **kwargs)
return agg.apply(self, 'VALUE')
[docs] @copy_method(low_time, mimic=False) # mimic=False for backward compatibility
def low_time(self, *args, **kwargs):
"""
Returns timestamp of tick with the lowest value of input field
.. deprecated:: 1.14.5
Use :py:func:`.low_time` instead
See Also
--------
:py:func:`.low_time`
"""
warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. "
f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead",
DeprecationWarning, stacklevel=2)
agg = low_time(*args, **kwargs)
return agg.apply(self, 'VALUE')
[docs] @copy_method(ob_snapshot)
def ob_snapshot(self):
"""
Examples
--------
>>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP
>>> data = data.ob_snapshot(max_levels=1) # doctest: +SKIP
>>> otp.run(data) # doctest: +SKIP
Time PRICE UPDATE_TIME SIZE LEVEL BUY_SELL_FLAG
0 2003-12-04 2.0 2003-12-01 00:00:00.003 6 1 1
1 2003-12-04 5.0 2003-12-01 00:00:00.004 7 1 0
"""
pass
[docs] @copy_method(ob_snapshot_wide)
def ob_snapshot_wide(self):
"""
Examples
--------
>>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP
>>> data = data.ob_snapshot_wide(max_levels=1) # doctest: +SKIP
>>> otp.run(data) # doctest: +SKIP
Time BID_PRICE BID_UPDATE_TIME BID_SIZE ASK_PRICE ASK_UPDATE_TIME ASK_SIZE LEVEL
0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 1
"""
pass
[docs] @copy_method(ob_snapshot_flat)
def ob_snapshot_flat(self):
"""
Examples
--------
>>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA') # doctest: +SKIP
>>> data = data.ob_snapshot_flat(max_levels=1) # doctest: +SKIP
>>> otp.run(data) # doctest: +SKIP
Time BID_PRICE1 BID_UPDATE_TIME1 BID_SIZE1 ASK_PRICE1 ASK_UPDATE_TIME1 ASK_SIZE1
0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6
"""
pass
[docs] @inplace_operation
def add_prefix(self, prefix, inplace=False, columns=None, ignore_columns=None) -> Optional['Source']:
"""
Adds prefix to all column names.
Parameters
----------
prefix : str
String prefix to add to all columns.
inplace : bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
object.
columns: List[str], optional
If set, only selected columns will be updated with prefix. Can't be used with ``ignore_columns`` parameter.
ignore_columns: List[str], optional
If set, selected columns won't be updated with prefix. Can't be used with ``columns`` parameter.
Returns
-------
:class:`Source` or ``None``
Examples
--------
>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1')
>>> data = data.add_prefix('test_')
>>> otp.run(data)
Time test_X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
>>> data.schema
{'test_X': <class 'int'>}
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL')
>>> data = data.add_prefix('test_')
>>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2))
Time test_PRICE test_SIZE
0 2022-03-01 00:00:00.000 1.3 100
1 2022-03-01 00:00:00.001 1.4 10
2 2022-03-01 00:00:00.002 1.4 50
>>> data.schema
{'test_PRICE': <class 'float'>}
>>> data = otp.Tick(X=1, XX=2)
>>> data.add_prefix('X')
Traceback (most recent call last):
...
AttributeError: Column XX already exists, please, use another prefix
Parameter ``columns`` specifies columns to be updated with prefix:
>>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
>>> data = data.add_prefix('test_', columns=['A', 'B', 'C'])
>>> otp.run(data)
Time test_A test_B test_C D E
0 2003-12-01 1 2 3 4 5
Parameter ``ignore_columns`` specifies columns to ignore:
>>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
>>> data = data.add_prefix('test_', ignore_columns=['A', 'B', 'C'])
>>> otp.run(data)
Time A B C test_D test_E
0 2003-12-01 1 2 3 4 5
Parameters `columns` and `ignore_columns` can't be used at the same time:
>>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
>>> data.add_prefix('test_', columns=['B', 'C'], ignore_columns=['A'])
Traceback (most recent call last):
...
AttributeError: It is allowed to use only one of `columns` or `ignore_columns` parameters at a time
"""
return self._add_prefix_and_suffix(
prefix=prefix,
columns=columns,
ignore_columns=ignore_columns,
)
[docs] @inplace_operation
def add_suffix(self, suffix, inplace=False, columns=None, ignore_columns=None) -> Optional['Source']:
"""
Adds suffix to all column names.
Parameters
----------
suffix : str
String suffix to add to all columns.
inplace : bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
object.
columns: List[str], optional
If set, only selected columns will be updated with suffix. Can't be used with `ignore_columns` parameter.
ignore_columns: List[str], optional
If set, selected columns won't be updated with suffix. Can't be used with `columns` parameter.
Returns
-------
:class:`Source` or ``None``
Examples
--------
>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1')
>>> data = data.add_suffix('_test')
>>> otp.run(data)
Time X_test
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
>>> data.schema
{'X_test': <class 'int'>}
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL')
>>> data = data.add_suffix('_test')
>>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2))
Time PRICE_test SIZE_test
0 2022-03-01 00:00:00.000 1.3 100
1 2022-03-01 00:00:00.001 1.4 10
2 2022-03-01 00:00:00.002 1.4 50
>>> data.schema
{'PRICE_test': <class 'float'>}
>>> data = otp.Tick(X=1, XX=2)
>>> data.add_suffix('X')
Traceback (most recent call last):
...
AttributeError: Column XX already exists, please, use another suffix
Parameter `columns` specify columns to be updated with suffix:
>>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
>>> data = data.add_prefix('test_', columns=['A', 'B', 'C'])
>>> otp.run(data)
Time test_A test_B test_C D E
0 2003-12-01 1 2 3 4 5
Parameter `ignore_columns` specify columns to ignore:
>>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
>>> data = data.add_prefix('test_', ignore_columns=['A', 'B', 'C'])
>>> otp.run(data)
Time A B C test_D test_E
0 2003-12-01 1 2 3 4 5
Parameters `columns` and `ignore_columns` can't be used at the same time:
>>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
>>> data.add_prefix('test_', columns=['B', 'C'], ignore_columns=['A'])
Traceback (most recent call last):
...
AttributeError: It is allowed to use only one of `columns` or `ignore_columns` parameters at a time
"""
return self._add_prefix_and_suffix(
suffix=suffix,
columns=columns,
ignore_columns=ignore_columns,
)
def _add_prefix_and_suffix(
self,
prefix='',
suffix='',
columns=None,
ignore_columns=None,
) -> Optional['Source']:
if not prefix and not suffix:
raise AttributeError('Both suffix and prefix are empty')
if ' ' in prefix:
raise AttributeError(f'There is space in prefix: {prefix}')
if ' ' in suffix:
raise AttributeError(f'There is space in suffix: {prefix}')
columns = columns or []
ignore_columns = ignore_columns or []
if columns and ignore_columns:
raise AttributeError('It is allowed to use only one of `columns` or `ignore_columns` parameters at a time')
schema = self.schema
for column in columns:
if column not in schema:
raise AttributeError(f'Column `{column}` does not exist in the schema')
for column_name in (columns or schema):
if column_name in ignore_columns:
continue
new_column_name = f'{prefix}{column_name}{suffix}'
if new_column_name in self.__dict__:
if prefix:
raise AttributeError(f'Column {new_column_name} already exists, please, use another prefix')
else:
raise AttributeError(f'Column {new_column_name} already exists, please, use another suffix')
for column_name in (columns or schema):
if column_name in ignore_columns:
continue
new_column_name = f'{prefix}{column_name}{suffix}'
self.__dict__[column_name].rename(new_column_name, update_parent_object=False)
self.__dict__[new_column_name] = self.__dict__[column_name]
del self.__dict__[column_name]
if not columns and not ignore_columns:
self.sink(otq.RenameFieldsEp(rename_fields=f'(.*)={prefix}\\1{suffix}', use_regex=True))
elif columns:
renames = [f'{column}={prefix}{column}{suffix}' for column in columns]
self.sink(otq.RenameFieldsEp(rename_fields=','.join(renames)))
else:
renames = [f'{column}={prefix}{column}{suffix}' for column in schema if column not in ignore_columns]
self.sink(otq.RenameFieldsEp(rename_fields=','.join(renames)))
return self
[docs] @inplace_operation
def time_filter(self,
discard_on_match: bool = False,
start_time: Union[str, int, time] = 0,
end_time: Union[str, int, time] = 0,
day_patterns: str = "",
timezone=utils.default, # type: ignore
end_time_tick_matches: bool = False,
inplace=False) -> Optional['Source']:
"""
Filters ticks by time.
Parameters
----------
discard_on_match : bool, optional
If ``True``, then ticks that match the filter will be discarded.
Otherwise, only ticks that match the filter will be passed.
start_time : str or int or datetime.time, optional
Start time of the filter, string must be in the format ``HHMMSSmmm``.
Default value is 0.
end_time : str or int or datetime.time, optional
End time of the filter, string must be in the format ``HHMMSSmmm``.
To filter ticks for an entire day, this parameter should be set to 240000000.
Default value is 0.
day_patterns : list or str
Pattern or list of patterns that determines days for which the ticks can be propagated.
A tick can be propagated if its date matches one or more of the patterns.
Three supported pattern formats are:
1. ``month.week.weekdays``, 0 month means any month, 0 week means any week,
6 week means the last week of the month for a given weekday(s),
weekdays are digits for each day, 0 being Sunday.
2. ``month/day``, 0 month means any month.
3. ``year/month/day``, 0 year means any year, 0 month means any month.
timezone : str, optional
Timezone of the filter.
Default value is ``configuration.config.tz``
or timezone set in the parameter of :py:func:`onetick.py.run`.
end_time_tick_matches : bool, optional
If ``True``, then the end time is inclusive.
Otherwise, the end time is exclusive.
inplace : bool, optional
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
object. Default value is ``False``.
Returns
-------
:class:`Source` or ``None``
Returns ``None`` if ``inplace=True``.
See also
--------
**TIME_FILTER** OneTick event processor
Examples
--------
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL')
>>> data = data.time_filter(start_time='000000001', end_time='000000003')
>>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2))
Time PRICE SIZE
0 2022-03-01 00:00:00.001 1.4 10
1 2022-03-01 00:00:00.002 1.4 50
"""
if timezone is utils.default:
# doesn't work without expr for some reason
timezone = 'expr(_TIMEZONE)'
if day_patterns:
if isinstance(day_patterns, str):
day_patterns = [day_patterns]
for day_pattern in day_patterns:
if not re.match(r"(^\d\d?\.[0-6].\d\d?$)|(^\d\d?\/\d\d?$)|(^\d{1,4}\/\d\d?\/\d\d?$)", day_pattern):
raise ValueError(f"Invalid day pattern: {day_pattern}")
if isinstance(start_time, time):
start_time = start_time.strftime('%H%M%S%f')[:-3]
if isinstance(end_time, time):
end_time = end_time.strftime('%H%M%S%f')[:-3]
day_patterns = ",".join(day_patterns)
self.sink(otq.TimeFilter(discard_on_match=discard_on_match,
start_time=start_time,
end_time=end_time,
timezone=timezone,
day_patterns=day_patterns,
end_time_tick_matches=end_time_tick_matches,
))
return self
[docs] @copy_method(ranking)
def ranking(self, *args, **kwargs):
pass
[docs] @inplace_operation
def insert_tick(self,
fields=None,
where=None,
preserve_input_ticks=True,
num_ticks_to_insert=1,
insert_before=True,
inplace=False) -> Optional['Source']:
"""
Insert tick.
Parameters
----------
fields: dict of str to :py:class:`onetick.py.Operation`
Mapping of field names to some expressions or values.
These fields in inserted ticks will be set to corresponding values or results of expressions.
If field is presented in input tick, but not set in ``fields`` dict,
then the value of the field will be copied from input tick to inserted tick.
If parameter ``fields`` is not set at all, then values for inserted ticks' fields
will be default values for fields' types from input ticks (0 for integers etc.).
where: :py:class:`onetick.py.Operation`
Expression to select ticks near which the new ticks will be inserted.
By default, all ticks are selected.
preserve_input_ticks: bool
A switch controlling whether input ticks have to be preserved in output time series or not.
While the former case results in fields of input ticks to be present in the output time series
together with those defined by the ``fields`` parameter,
the latter case results in only defined fields to be present.
If a field of the input time series is defined in the ``fields`` parameter,
the defined value takes precedence.
num_ticks_to_insert: int
Number of ticks to insert.
insert_before: bool
Insert tick before each input tick or after.
inplace: bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing.
Otherwise method returns a new modified object.
See also
--------
**INSERT_TICK** OneTick event processor
Returns
-------
:class:`Source` or ``None``
Examples
--------
Insert tick before each tick with default type values.
>>> data = otp.Tick(A=1)
>>> data = data.insert_tick()
>>> otp.run(data)
Time A
0 2003-12-01 0
1 2003-12-01 1
Insert tick before each tick with field `A` copied from input tick
and field `B` set to specified value.
>>> data = otp.Tick(A=1)
>>> data = data.insert_tick(fields={'B': 'b'})
>>> otp.run(data)
Time B A
0 2003-12-01 b 1
1 2003-12-01 1
Insert two ticks only after first tick.
>>> data = otp.Ticks(A=[1, 2, 3])
>>> data = data.insert_tick(where=data['A'] == 1,
... insert_before=False,
... num_ticks_to_insert=2)
>>> otp.run(data)
Time A
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.000 0
2 2003-12-01 00:00:00.000 0
3 2003-12-01 00:00:00.001 2
4 2003-12-01 00:00:00.002 3
"""
if not isinstance(num_ticks_to_insert, int) or num_ticks_to_insert <= 0:
raise ValueError("Parameter 'num_ticks_to_insert' must be a positive integer")
if not preserve_input_ticks and not fields:
raise ValueError("Parameter 'fields' must be set if 'preserve_input_ticks' is False")
where = '' if where is None else str(where)
fields = fields or {}
update_schema = {}
for field, value in fields.items():
dtype = ott.get_object_type(value)
if field not in self.schema:
update_schema[field] = dtype
elif dtype is not self.schema[field]:
raise ValueError(f"Incompatible types for field '{field}': {self.schema[field]} --> {dtype}")
dtype = ott.type2str(dtype)
if isinstance(value, Type):
value = ott.default_by_type(value)
value = ott.value2str(value)
fields[field] = (dtype, value)
fields = ','.join(
f'{field} {dtype}={value}' if value else f'{field} {dtype}'
for field, (dtype, value) in fields.items()
)
self.sink(
otq.InsertTick(
fields=fields,
where=where,
preserve_input_ticks=preserve_input_ticks,
num_ticks_to_insert=num_ticks_to_insert,
insert_before=insert_before,
)
)
if preserve_input_ticks:
self.table(inplace=True, strict=False, **update_schema)
else:
self.table(inplace=True, strict=True, **update_schema)
return self
[docs] @inplace_operation
def modify_query_times(self,
start=None,
end=None,
output_timestamp=None,
propagate_heartbeats=True,
inplace=False):
"""
Modify ``start`` and ``end`` time of the query.
* query times are changed for all operations
only **before** this method up to the source of the graph.
* all ticks' timestamps produced by this method
**must** fall into original time range of the query.
It is possible to change ticks' timestamps with parameter ``output_timestamp``,
so they will stay inside the original time range.
Note
----
Due to how OneTick works internally, tick generators
:py:class:`otp.Tick <onetick.py.Tick>` and :py:func:`otp.Ticks <onetick.py.Ticks>`
are not affected by this method.
Parameters
----------
start: :py:class:`onetick.py.datetime` or \
:py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation`
Expression to replace query start time.
By default, start time is not changed.
Note that expression in this parameter can't depend on ticks, thus only
:py:class:`~onetick.py.core.source.MetaFields` and constants can be used.
end: :py:class:`onetick.py.datetime` or \
:py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation`
Expression to replace query end time.
By default, end time is not changed.
Note that expression in this parameter can't depend on ticks, thus only
:py:class:`~onetick.py.core.source.MetaFields` and constants can be used.
output_timestamp: :py:class:`onetick.py.Operation`
Expression that produces timestamp for each tick.
By default, the following expression is used: ``orig_start + orig_timestamp - start``
This expression covers cases when start time of the query is changed and keeps
timestamp inside original time range.
Note that it doesn't cover cases, for example, if end time was increased,
you have to handle such cases yourself.
propagate_heartbeats: bool
Controls heartbeat propagation.
inplace: bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing.
Otherwise method returns a new modified object.
See also
--------
| **MODIFY_QUERY_TIMES** OneTick event processor
| :py:meth:`onetick.py.Source.time_interval_shift`
Returns
-------
:class:`Source` or ``None``
Examples
--------
>>> start = otp.dt(2022, 3, 2)
>>> end = otp.dt(2022, 3, 2) + otp.Milli(3)
>>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD')
By default, method does nothing:
>>> t = data.modify_query_times()
>>> otp.run(t, start=start, end=end)
Time PRICE SIZE
0 2022-03-02 00:00:00.000 1.0 100
1 2022-03-02 00:00:00.001 1.1 101
2 2022-03-02 00:00:00.002 1.2 102
See how ``_START_TIME`` and ``_END_TIME`` meta fields are changed.
They are changed *before* ``modify_query_times``:
>>> t = data.copy()
>>> t['S_BEFORE'] = t['_START_TIME']
>>> t['E_BEFORE'] = t['_END_TIME']
>>> t = t.modify_query_times(start=t['_START_TIME'] + otp.Milli(1),
... end=t['_END_TIME'] - otp.Milli(1))
>>> t['S_AFTER'] = t['_START_TIME']
>>> t['E_AFTER'] = t['_END_TIME']
>>> otp.run(t, start=start, end=end)
Time PRICE SIZE S_BEFORE E_BEFORE S_AFTER E_AFTER
0 2022-03-02 1.1 101 2022-03-02 00:00:00.001 2022-03-02 00:00:00.002 2022-03-02 2022-03-02 00:00:00.003
You can decrease time interval without problems:
>>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1),
... end=data['_END_TIME'] - otp.Milli(1))
>>> otp.run(t, start=start, end=end)
Time PRICE SIZE
0 2022-03-02 1.1 101
Note that the timestamp of the tick was changed with default expression.
In this case we can output original timestamps,
because they fall into original time range:
>>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1),
... end=data['_END_TIME'] - otp.Milli(1),
... output_timestamp=data['TIMESTAMP'])
>>> otp.run(t, start=start, end=end)
Time PRICE SIZE
0 2022-03-02 00:00:00.001 1.1 101
But it will not work if new time range is wider than original:
>>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1),
... output_timestamp=data['TIMESTAMP'])
>>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) # doctest: +ELLIPSIS
Traceback (most recent call last):
Exception...timestamp is falling out of initial start/end time range...
In this case default ``output_timestamp`` expression would work just fine:
>>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1))
>>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1))
Time PRICE SIZE
0 2022-03-02 00:00:00.001 1.0 100
1 2022-03-02 00:00:00.002 1.1 101
2 2022-03-02 00:00:00.003 1.2 102
But it doesn't work, for example, if end time has crossed the borders of original time range.
In this case other ``output_timestamp`` expression must be specified:
>>> t = data.modify_query_times(
... start=data['_START_TIME'] - otp.Milli(2),
... output_timestamp=otp.math.min(data['TIMESTAMP'] + otp.Milli(2), data['_END_TIME'])
... )
>>> otp.run(t, start=start + otp.Milli(2), end=end)
Time PRICE SIZE
0 2022-03-02 00:00:00.002 1.0 100
1 2022-03-02 00:00:00.003 1.1 101
2 2022-03-02 00:00:00.003 1.2 102
Remember that ``start`` and ``end`` parameters can't depend on ticks:
>>> t = data.copy()
>>> t['X'] = 12345
>>> t = t.modify_query_times(start=t['_START_TIME'] + t['X'] - t['X'],
... end=t['_END_TIME'] - otp.Milli(1))
>>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS
Traceback (most recent call last):
Exception...parameter must not depend on ticks...
Constant datetime values can be used as parameters too:
>>> t = data.modify_query_times(start=start + otp.Milli(1),
... end=end - otp.Milli(1))
>>> otp.run(t, start=start, end=end)
Time PRICE SIZE
0 2022-03-02 1.1 101
Note that some graph patterns are not allowed when using this method.
For example, modifying query times for a branch that will be merged later:
>>> t1, t2 = data[data['PRICE'] > 1.3]
>>> t2 = t2.modify_query_times(start=start + otp.Milli(1))
>>> t = otp.merge([t1, t2])
>>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS
Traceback (most recent call last):
Exception...Invalid graph...time bound to a node...an intermediate node in one of the cycles in graph...
"""
start = ott.value2str(start) if start is not None else ''
end = ott.value2str(end) if end is not None else ''
output_timestamp = ott.value2str(output_timestamp) if output_timestamp is not None else ''
self.sink(
otq.ModifyQueryTimes(
start_time=start,
end_time=end,
output_timestamp=output_timestamp,
propagate_heartbeats=propagate_heartbeats,
)
)
return self
[docs] def time_interval_shift(self, shift, inplace=False):
"""
Shifting time interval for a source.
The whole data flow is shifted all the way up to the source of the graph.
The start and end times of the query will be changed for all operations before this method,
and will stay the same after this method.
WARNING: The ticks' timestamps *are changed* automatically so they fit into original time range.
You will get different set of ticks from the database, but the timestamps of the ticks
from that database will not be the same as in the database.
They need to be changed so they fit into the original query time range.
See details in :py:meth:`onetick.py.Source.modify_query_times`.
Parameters
----------
shift: int or :ref:`datetime offset<offsets>`
Offset to shift the whole time interval.
Can be positive or negative.
Positive value moves time interval into the future, negative -- to the past.
int values are interpreted as milliseconds.
Timestamps of the ticks will be changed so they fit into the original query time range
by subtracting ``shift`` from each timestamp.
inplace: bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing.
Otherwise method returns a new modified object.
Returns
-------
:class:`Source` or ``None``
See also
--------
| :py:meth:`onetick.py.Source.modify_query_times`
| :py:meth:`onetick.py.Source.time_interval_change`
Examples
--------
--> Also see use-case using :py:meth:`time_interval_shift` for calculating
`Markouts <../../static/getting_started/use_cases.html#point-in-time-benchmarks-bbo-at-different-markouts>`_
>>> start = otp.dt(2022, 3, 2)
>>> end = otp.dt(2022, 3, 2) + otp.Milli(3)
>>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD')
Default data:
>>> otp.run(data, start=start, end=end)
Time PRICE SIZE
0 2022-03-02 00:00:00.000 1.0 100
1 2022-03-02 00:00:00.001 1.1 101
2 2022-03-02 00:00:00.002 1.2 102
Get window for a third tick:
>>> otp.run(data, start=start + otp.Milli(2), end=start + otp.Milli(3))
Time PRICE SIZE
0 2022-03-02 00:00:00.002 1.2 102
Shifting time window will result in different set of ticks,
but the ticks will have their timestamps changed to fit into original time range.
Let's shift time 2 milliseconds back and thus get the first tick:
>>> t = data.time_interval_shift(shift=-otp.Milli(2))
>>> otp.run(t, start=start + otp.Milli(2), end=start + otp.Milli(3))
Time PRICE SIZE
0 2022-03-02 00:00:00.002 1.0 100
Here we are querying empty time interval, but shifting one second back to get ticks.
>>> t = data.time_interval_shift(shift=-otp.Second(1))
>>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1))
Time PRICE SIZE
0 2022-03-02 00:00:01.000 1.0 100
1 2022-03-02 00:00:01.001 1.1 101
2 2022-03-02 00:00:01.002 1.2 102
Note that tick generators
:py:class:`otp.Tick <onetick.py.Tick>` and :py:func:`otp.Ticks <onetick.py.Ticks>`
are not really affected by this method, they will have the same timestamps:
>>> t = otp.Tick(A=1)
>>> otp.run(t)
Time A
0 2003-12-01 1
>>> t = t.time_interval_shift(shift=otp.Second(1))
>>> otp.run(t)
Time A
0 2003-12-01 1
"""
start = self['_START_TIME'] + shift
end = self['_END_TIME'] + shift
# change timestamps so they fit into original time range
output_timestamp = self['TIMESTAMP'] - shift
return self.modify_query_times(start=start,
end=end,
output_timestamp=output_timestamp,
inplace=inplace)
[docs] def time_interval_change(self,
start_change=0,
end_change=0,
inplace=False):
"""
Changing time interval by making it bigger or smaller.
All timestamps of ticks that are crossing the border of original time range
will be set to original start time or end time depending on their original time.
Parameters
----------
start_change: int or :ref:`datetime offset<offsets>`
Offset to shift start time.
Can be positive or negative.
Positive value moves start time into the future, negative -- to the past.
int values are interpreted as milliseconds.
end_change: int or :ref:`datetime offset<offsets>`
Offset to shift end time.
Can be positive or negative.
Positive value moves end time into the future, negative -- to the past.
int values are interpreted as milliseconds.
inplace: bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing.
Otherwise method returns a new modified object.
Returns
-------
:class:`Source` or ``None``
See also
--------
| :py:meth:`onetick.py.Source.modify_query_times`
| :py:meth:`onetick.py.Source.time_interval_shift`
Examples
--------
>>> start = otp.dt(2022, 3, 2)
>>> end = otp.dt(2022, 3, 2) + otp.Milli(3)
>>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD')
By default, ``time_interval_change()`` does nothing:
>>> t = data.time_interval_change()
>>> otp.run(t, start=start, end=end)
Time PRICE SIZE
0 2022-03-02 00:00:00.000 1.0 100
1 2022-03-02 00:00:00.001 1.1 101
2 2022-03-02 00:00:00.002 1.2 102
Decreasing time range will not change ticks' timestamps:
>>> t = data.time_interval_change(start_change=otp.Milli(1), end_change=-otp.Milli(1))
>>> otp.run(t, start=start, end=end)
Time PRICE SIZE
0 2022-03-02 00:00:00.001 1.1 101
Increasing time range will change timestamps of the ticks that crossed the border.
In this case first tick's timestamp will be set to original start time,
and third tick's to original end time.
>>> t = data.time_interval_change(start_change=-otp.Milli(1), end_change=otp.Milli(1))
>>> otp.run(t, start=start + otp.Milli(1), end=start + otp.Milli(2))
Time PRICE SIZE
0 2022-03-02 00:00:00.001 1.0 100
1 2022-03-02 00:00:00.001 1.1 101
2 2022-03-02 00:00:00.002 1.2 102
Here we are querying empty time interval, but changing start time one second back to get ticks.
>>> t = data.time_interval_change(start_change=-otp.Second(1))
>>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1))
Time PRICE SIZE
0 2022-03-02 00:00:01 1.0 100
1 2022-03-02 00:00:01 1.1 101
2 2022-03-02 00:00:01 1.2 102
"""
start = self['_START_TIME'] + start_change
end = self['_END_TIME'] + end_change
# change ticks' timestamps only if they are out of bounds
output_timestamp = self['TIMESTAMP']
output_timestamp = otp.math.min(output_timestamp, self['_END_TIME'])
output_timestamp = otp.math.max(output_timestamp, self['_START_TIME'])
return self.modify_query_times(start=start,
end=end,
output_timestamp=output_timestamp,
inplace=inplace)
def mean(self, *columns):
"""
Get average value of the specified columns.
Do not confuse it with :py:func:`otp.agg.average <onetick.py.agg.average>`,
this method gets average of the columns' values from single row, not doing
aggregation of all rows.
Parameters
----------
columns: str, :class:`Column`
Columns names or columns objects.
All columns must have compatible types.
Returns
-------
:class:`~onetick.py.Operation`
Examples
--------
Integers and floating point values can be mixed:
>>> t = otp.Tick(A=1, B=3.3)
>>> t['AVG'] = t.mean('A', 'B')
>>> otp.run(t)
Time A B AVG
0 2003-12-01 1 3.3 2.15
You can get the average for datetime values too:
>>> t = otp.Tick(START_TIME=otp.dt(2022, 1, 1), END_TIME=otp.dt(2023, 1, 1))
>>> t['MID_TIME'] = t.mean('START_TIME', 'END_TIME')
>>> otp.run(t, timezone='GMT')
Time START_TIME END_TIME MID_TIME
0 2003-12-01 2022-01-01 2023-01-01 2022-07-02 12:00:00
Note that things may get confusing for datetimes when
the timezone with daylight saving time is used:
>>> t = otp.Tick(START_TIME=otp.dt(2022, 1, 1), END_TIME=otp.dt(2023, 1, 1))
>>> t['MID_TIME'] = t.mean('START_TIME', 'END_TIME')
>>> otp.run(t, timezone='EST5EDT')
Time START_TIME END_TIME MID_TIME
0 2003-12-01 2022-01-01 2023-01-01 2022-07-02 13:00:00
"""
dtypes = set()
for column_name in map(str, columns):
if column_name not in self.schema:
raise ValueError(f"There is no '{column_name}' column in the schema")
dtypes.add(self.schema[column_name])
if not are_numerics(*dtypes) and not are_time(*dtypes):
raise ValueError('Only int, float and datetime columns are supported. '
'Numeric and datetime columns should not be mixed.')
op = None
for column_name in map(str, columns):
column = self[column_name]
dtype = self.schema[column_name]
if dtype is otp.msectime and otp.nsectime in dtypes:
column = column.astype(otp.nsectime)
if are_time(dtype):
column = column.astype(int)
if op is None:
op = column
else:
op += column
op = op / len(columns)
dtype = ott.get_type_by_objects(dtypes)
if are_time(dtype):
# can't convert float to datetime, converting to int first
op = op.astype(int)
op = op.astype(dtype)
return op
[docs] @inplace_operation
def show_symbol_name_in_db(self, inplace=False):
"""
Adds the **SYMBOL_NAME_IN_DB** field to input ticks,
indicating the symbol name of the tick in the database.
Parameters
----------
inplace: bool
The flag controls whether operation should be applied inplace or not.
If ``inplace=True``, then it returns nothing.
Otherwise method returns a new modified object.
See also
--------
**SHOW_SYMBOL_NAME_IN_DB** OneTick event processor
Returns
-------
:class:`Source` or ``None``
Examples
--------
For example, it can be used to display
the actual symbol name for the contract (e.g., **ESM23**)
instead of the artificial continuous name **ES_r_tdi**.
Notice how actual symbol name can be different for each tick,
e.g. in this case it is different for each month.
>>> data = otp.DataSource('TDI_FUT', tick_type='TRD') # doctest: +SKIP
>>> data = data[['PRICE']] # doctest: +SKIP
>>> data = data.first(bucket_interval=31, bucket_units='days') # doctest: +SKIP
>>> data['SYMBOL_NAME'] = data.Symbol.name # doctest: +SKIP
>>> data = data.show_symbol_name_in_db() # doctest: +SKIP
>>> otp.run(data, # doctest: +SKIP
... symbols='ES_r_tdi', symbol_date=otp.dt(2023, 3, 1),
... start=otp.dt(2023, 3, 1), end=otp.dt(2023, 5, 1))
Time PRICE SYMBOL_NAME SYMBOL_NAME_IN_DB
0 2023-03-01 00:00:00.549 3976.75 ES_r_tdi ESH23
1 2023-04-02 18:00:00.039 4127.00 ES_r_tdi ESM23
"""
if 'SYMBOL_NAME_IN_DB' in self.schema:
raise ValueError("Column 'SYMBOL_NAME_IN_DB' already exists.")
self.sink(otq.ShowSymbolNameInDb())
self.schema['SYMBOL_NAME_IN_DB'] = str
return self
_Source = Source # Backward compatibility