import copy
import functools
import inspect
import os
import re
import uuid
import warnings
from collections import defaultdict
from datetime import datetime, date, time
import pandas as pd
import numpy as np
from typing import Callable, Optional, List, Any, Tuple, Union, Type, Dict, Collection, Set
from onetick.py.backports import Literal
import onetick.query as otq
from onetick.lib.instance import OneTickLib
import onetick.py.functions
import onetick.py.sources
from onetick import py as otp
from onetick.py import aggregations
from onetick.py import types as ott
from onetick.py import utils, configuration
from onetick.py.core._internal._manually_bound_value import _ManuallyBoundValue
from onetick.py.core._internal._multi_symbols_source import _MultiSymbolsSource
from onetick.py.core._internal._proxy_node import _ProxyNode
from onetick.py.core._internal._state_objects import _StateColumn, _TickSequence
from onetick.py.core._internal._state_vars import StateVars
from onetick.py.core._source._symbol_param import _SymbolParamSource
from onetick.py.core._source.schema import Schema
from onetick.py.core._source.symbol import Symbol
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.core.column import _Column, _LagOperator, _ColumnAggregation
from onetick.py.core.column_operations._methods.methods import is_arithmetical, is_compare
from onetick.py.core.column_operations._methods.op_types import are_strings, _replace_parameters, are_numerics, are_time
from onetick.py.core.column_operations.base import _Operation
from onetick.py.core.lambda_object import _LambdaIfElse, apply_lambda, apply_script, _EmulateObject
from onetick.py.core.query_inspector import get_query_info, add_pins, get_query_parameter_list
from onetick.py.core.eval_query import _QueryEvalWrapper, prepare_params
from onetick.py.core.cut_builder import _BaseCutBuilder
from onetick.py.core import db_constants
from onetick.py.utils import adaptive, adaptive_to_default
from onetick.py.aggregations._docs import (copy_method,
                                           _running_doc, _all_fields_with_policy_doc, _bucket_interval_doc,
                                           _bucket_time_doc, _bucket_units_doc, _bucket_end_condition_doc,
                                           _end_condition_per_group_doc, _boundary_tick_bucket_doc, _group_by_doc)
from onetick.py.docs.utils import docstring, param_doc
from onetick.py.aggregations.functions import (high_tick, low_tick,
                                               high_time, low_time,
                                               first_tick, last_tick,
                                               distinct, ranking,
                                               ob_snapshot, ob_snapshot_wide, ob_snapshot_flat)
def inplace_operation(method):
    """ Decorator that adds the `inplace` parameter and logic according to this
    flag. inplace=True means that method modifies an object, otherwise it copies
    the object firstly, modifies copy and returns the copy.
    """
    @functools.wraps(method)
    def _inner(self, *args, inplace=False, **kwargs):
        kwargs['inplace'] = inplace
        if inplace:
            method(self, *args, **kwargs)
        else:
            obj = self.copy()
            return method(obj, *args, **kwargs)
    return _inner
def _is_dict_required(symbols):
    """
    Depending on symbols, determine if output of otp.run() or Source.__call__() should always be a dictionary
    of {symbol: dataframe} even if only one symbol is present in the results
    """
    if isinstance(symbols, (list, tuple)):
        if len(symbols) == 0:
            return False
        elif len(symbols) > 1:
            return True
        else:
            symbols = symbols[0]
    if isinstance(symbols, otp.Source):
        return True
    if isinstance(symbols, otq.Symbol):
        symbols = symbols.name
    if isinstance(symbols, str) and 'eval' in symbols:
        return True
    return False
_agg_doc = param_doc(name='aggs',
                     annotation=Dict,
                     str_annotation='dict of aggregations',
                     desc="""
                      aggregation dict: key - output column name; value - aggregation""")
[docs]class Source:
    """
    Base class for representing Onetick execution graph.
    All :ref:`onetick-py sources <sources>` are derived from this class
    and have access to all its methods.
    Examples
    --------
    >>> data = otp.Tick(A=1)
    >>> isinstance(data, otp.Source)
    True
    Also this class can be used to initialize raw source
    with the help of ``onetick.query`` classes, but
    it should be done with caution as the user is required to set
    such properties as symbol name and tick type manually.
    >>> data = otp.Source(otq.TickGenerator(bucket_interval=0, fields='long A = 123').tick_type('TT'))
    >>> otp.run(data, symbols='LOCAL::')
            Time    A
    0 2003-12-04  123
    """
    # TODO: need to support transactions for every _source
    # transaction is set of calls between _source creation and call or between two calls
    # if transaction have the same operations, then it seems we should add only one set of operations
    _PROPERTIES = [
        "__node",
        "__hash",
        "__use_name_for_column_prefix",
        "__sources_keys_dates",
        "__sources_modify_query_times",
        "__sources_base_ep_func",
        "__sources_symbols",
        "__source_has_output",
        "__name",
        "_tmp_otq"
    ]
    _OT_META_FIELDS = ["_START_TIME", "_END_TIME", "_SYMBOL_NAME", "_DBNAME", "_TICK_TYPE", '_TIMEZONE']
    meta_fields = MetaFields()
    Symbol = Symbol
    def __init__(
        self,
        node=None,
        _symbols=None,
        _start=adaptive,
        _end=adaptive,
        _base_ep_func=None,
        _has_output=True,
        **kwargs
    ):
        self._tmp_otq = TmpOtq()
        self.__name = None
        if "Time" in kwargs:
            # TODO: add tests
            raise ValueError(
                "It is not allowed to have 'Time' or 'TIMESTAMP' columns, because they are the key columns"
            )
        if "TIMESTAMP" not in kwargs:
            kwargs["TIMESTAMP"] = ott.nsectime
            kwargs["Time"] = ott.nsectime
        kwargs["_START_TIME"] = ott.nsectime
        kwargs["_END_TIME"] = ott.nsectime
        kwargs["_SYMBOL_NAME"] = str
        kwargs["_DBNAME"] = str
        kwargs["_TICK_TYPE"] = str
        kwargs["_TIMEZONE"] = str
        for key, value in kwargs.items():
            # calculate value type
            value_type = ott.get_source_base_type(value)
            self.__dict__[key] = _Column(name=key, dtype=value_type, obj_ref=self)
        # just an alias to Timestamp
        self.__dict__['Time'] = self.__dict__['TIMESTAMP']
        self.__dict__['_state_vars'] = StateVars(self)
        if node is None:
            node = otq.Passthrough()
        self.__hash = uuid.uuid4()
        self.__sources_keys_dates = {}
        self.__sources_modify_query_times = {}
        self.__sources_base_ep_func = {}
        self.__sources_symbols = {}
        self.__source_has_output = _has_output
        if isinstance(node, _ProxyNode):
            self.__node = _ProxyNode(*node.copy_graph(), refresh_func=self.__refresh_hash)
        else:
            self.__node = _ProxyNode(*self.__from_ep_to_proxy(node), refresh_func=self.__refresh_hash)
            self.__sources_keys_dates[self.__node.key()] = (_start, _end)
            self.__sources_modify_query_times[self.__node.key()] = False
            self.__sources_base_ep_func[self.__node.key()] = _base_ep_func
            self.__sources_symbols[self.__node.key()] = _symbols
        # flag controls whether we have to add node_name prefix when convert
        # columns into string
        self.__use_name_for_column_prefix = False
    def _try_default_constructor(self, *args, node=None, **kwargs):
        if node is not None:
            # Source.copy() method will use this way
            # all info from original source will be copied by copy() method after
            Source.__init__(self, *args, node=node, **kwargs)
            return True
        return False
    def base_ep(self, **kwargs):
        # default implementation
        # real implementation should return a Source object
        return None
    def _clean_sources_dates(self):
        self.__sources_keys_dates = {}
        self.__sources_modify_query_times = {}
        self.__sources_base_ep_func = {}
        self.__sources_symbols = {}
    def _set_sources_dates(self, other, copy_symbols=True):
        self.__sources_keys_dates.update(other._get_sources_dates())
        self.__sources_modify_query_times.update(other._get_sources_modify_query_times())
        self.__sources_base_ep_func.update(other._get_sources_base_ep_func())
        if copy_symbols:
            self.__sources_symbols.update(other._get_sources_symbols())
        else:
            # this branch is applicable for the bound symbols with callbacks,
            # where we drop all adaptive symbols and keep only manually specified
            # symbols
            manually_bound = {
                key: _ManuallyBoundValue(value)
                for key, value in other._get_sources_symbols().items()
                if value is not adaptive and value is not adaptive_to_default
            }
            self.__sources_symbols.update(manually_bound)
        self.__source_has_output = other._get_source_has_output()
    def _change_sources_keys(self, keys: dict):
        """
        Change keys in sources dictionaries.
        Need to do it, for example, after rebuilding the node history with new keys.
        Parameters
        ----------
        keys: dict
            Mapping from old key to new key
        """
        sources = (self.__sources_keys_dates,
                   self.__sources_modify_query_times,
                   self.__sources_base_ep_func,
                   self.__sources_symbols)
        for dictionary in sources:
            for key in list(dictionary):
                dictionary[keys[key]] = dictionary.pop(key)
    def _get_source_has_output(self):
        return self.__source_has_output
    def _get_sources_dates(self):
        return self.__sources_keys_dates
    def _get_sources_modify_query_times(self):
        return self.__sources_modify_query_times
    def _get_sources_base_ep_func(self):
        return self.__sources_base_ep_func
    def _get_sources_symbols(self):
        return self.__sources_symbols
    def use_name_for_column_prefix(self, flag=None):
        if flag is not None:
            self.__use_name_for_column_prefix = flag
        return self.__use_name_for_column_prefix
    def _check_key_in_properties(self, key: str) -> bool:
        if key in self.__class__._PROPERTIES:
            return True
        if key.replace('_' + Source.__name__.lstrip('_'), "") in self.__class__._PROPERTIES:
            return True
        if key.replace(self.__class__.__name__, "") in self.__class__._PROPERTIES:
            return True
        return False
    def __setattr__(self, key, value):
        if self._check_key_in_properties(key):
            self.__dict__[key] = value
            return
        if isinstance(value, _BaseCutBuilder):
            value(key)
            return
        value = self.__validate_before_setting(key, value)
        if key in self.__dict__:
            field = self.__dict__[key]
            if issubclass(type(field), _Column):
                self.__update_field(field, value)
            else:
                raise AttributeError(f'Column "{key}" not found')
        else:
            assert not (
                isinstance(value, _StateColumn) and value.obj_ref is None
            ), "State variables should be in `state` field"
            self.__add_field(key, value)
    def __validate_before_setting(self, key, value):
        if key in ["Symbol", "_SYMBOL_NAME"]:
            raise ValueError("Symbol setting is supported during creation only")
        if key == "_state_vars":
            raise ValueError("state field is necessary for keeping state variables and can't be rewritten")
        if isinstance(value, ott.ExpressionDefinedTimeOffset):
            value = value.n
        if isinstance(value, np.generic):
            value = value.item()
        if not (ott.is_type_supported(ott.get_object_type(value))
                or isinstance(value, _Operation)
                or type(value) is tuple
                or isinstance(value, _ColumnAggregation)):
            raise TypeError(f'It is not allowed to set objects of "{type(value)}" type')
        return value
    def _update_field(self, field, value):
        self.__update_field(field, value)
    def __update_field(self, field, value):
        if isinstance(value, _ColumnAggregation):
            value.apply(self, str(field))
            return
        def _replace_positive_lag_operator_with_tmp_column(operation):
            if isinstance(operation, _LagOperator) and operation.index > 0:
                column = operation._op_params[0]
                name = column.name
                if name.startswith("__"):
                    raise ValueError("Column name started with two underscores should be used by system only, "
                                     "please do not use such names.")
                name = f"__{name}_{operation.index}_NEW__"
                return _Column(name, column.dtype, column.obj_ref, precision=getattr(column, "_precision", None))
        new_names, old_names = self.__get_old_and_new_names(_replace_positive_lag_operator_with_tmp_column, value)
        if old_names:
            self.sink(otq.AddFields(", ".join(f"{new} = {arg}" for arg, new in zip(old_names, new_names))))
        if type(value) is tuple:
            # support to be compatible with adding fields to get rid of some strange problems
            # but really we do not use passed type, because update field does not support it
            value, _ = value
        convert_to_type = None
        str_value = ott.value2str(value)
        value_dtype = ott.get_object_type(value)
        base_type = ott.get_base_type(value_dtype)
        if base_type is bool:
            # according OneTick
            base_type = float
        type_changes = False  # because mantis 0021194
        if base_type is str:
            # update_field non-string field to string field (of any length) or value
            # changes type to default string
            if not issubclass(field.dtype, str):
                field._dtype = str
                type_changes = True
        else:
            if (
                (issubclass(field.dtype, int) or
                 issubclass(field.dtype, float) or
                 issubclass(field.dtype, str))
                and
                (issubclass(value_dtype, ott.msectime) or
                 issubclass(value_dtype, ott.nsectime))
                and
                (str(field) != 'TIMESTAMP')
                and
                (not isinstance(field, _StateColumn))
            ):
                # in OneTick after updating fields with functions that return datetime values
                # the type of column will not change for long and double columns
                # and will change to long (or double in older versions) when updating string column
                # (see BDS-267)
                # That's why we are explicitly setting type for returned value
                convert_to_type = value_dtype
                if issubclass(field.dtype, str):
                    # using update_field only for string because update_fields preserves type
                    # by default and raises exception if it can't be done
                    type_changes = True
            elif issubclass(field.dtype, float) and base_type is int and not isinstance(field, _StateColumn):
                # PY-574 if field was float and int, then
                # it is still float in onetick, so no need to change type on otp level
                convert_to_type = int
            elif (issubclass(field.dtype, ott.msectime) or issubclass(field.dtype, ott.nsectime)) and base_type is int:
                # if field was time type and we add something to it, then
                # no need to change type
                pass
            elif issubclass(field.dtype, str) and base_type is int:
                type_changes = True
                convert_to_type = int
            else:
                if issubclass(value_dtype, bool):
                    value_dtype = float
                if isinstance(field, _StateColumn):
                    pass  # do nothing
                else:
                    field._dtype = value_dtype
                    type_changes = True
        # for aliases, TIMESTAMP ~ Time as an example
        key = str(field)
        if key == "TIMESTAMP":
            self.__update_timestamp(key, value, str_value)
        elif type_changes:
            if (value_dtype in [otp.nsectime, int] and issubclass(field.dtype, str)):
                # PY-416 string field changing the type from str to datetime leads to losing nanoseconds
                # BE-142 similar issue with int from string: OneTick convert str to float, and then to int
                # so we lose some precision for big integers
                # work around is: make a new column first, delete accessor column
                # and then recreate it with value from temp column
                self.sink(otq.AddField(field=f"_TMP_{key}", value=str_value))
                self.sink(otq.Passthrough(fields=key, drop_fields=True))
                self.sink(otq.AddField(field=f"{key}", value=f"_TMP_{key}"))
                self.sink(otq.Passthrough(fields=f"_TMP_{key}", drop_fields=True))
            else:
                self.sink(otq.UpdateField(field=key, value=str_value))
        else:
            self.sink(otq.UpdateFields(set=key + "=" + str_value))
        if new_names:
            self.sink(otq.Passthrough(drop_fields=True, fields=",".join(new_names)))
        if convert_to_type:
            # manual type conversion after update fields for some cases
            self.table(**{key: convert_to_type}, inplace=True, strict=False)
    def __update_timestamp(self, key, value, str_value):
        if not hasattr(value, "dtype"):
            # A constant value: no need to pre- or post-sort
            self.sink(otq.UpdateField(field=key, value=str_value))
        elif (
            isinstance(value, _Column)
            and not isinstance(value, _StateColumn)
            and hasattr(value, "name")
            and value.name in self.__dict__
        ):
            # An existing, present column: no need to create a temporary one. See PY-253
            need_to_sort = str_value not in ("_START_TIME", "_END_TIME")
            if need_to_sort:
                self.sort(value, inplace=True)
            self.sink(otq.UpdateField(field=key, value=str_value))
            if need_to_sort:
                self.sort(self.Time, inplace=True)
        elif (not is_compare(value) and isinstance(value, (_Operation, _LambdaIfElse))
              or is_arithmetical(value)):
            # An expression or a statevar: create a temp column with its value, pre- and post- sort.
            self.sink(otq.AddField(field="__TEMP_TIMESTAMP__", value=str_value))
            self.sink(otq.OrderByEp(order_by="__TEMP_TIMESTAMP__ ASC"))
            self.sink(otq.UpdateField(field=key, value="__TEMP_TIMESTAMP__"))
            self.sink(otq.Passthrough(fields="__TEMP_TIMESTAMP__", drop_fields=True))
            self.sort(self.Time, inplace=True)
        else:
            raise Exception(f"Illegal type for timestamp assignment: {value.__class__}")
    def __get_old_and_new_names(self, _replace_positive_lag_operator_with_tmp_column, value):
        old_args, new_args = _replace_parameters(value, _replace_positive_lag_operator_with_tmp_column)
        old_args = {str(old): (old, new) for old, new in zip(old_args, new_args)}
        old_names = []
        new_names = []
        for op_str, (_, new) in old_args.items():
            old_names.append(op_str)
            new_names.append(str(new))
        return new_names, old_names
    def append(self, other) -> 'Source':
        """
        Merge data source with `other`
        Parameters
        ----------
        other: List, Source
            data source to merge
        Returns
        -------
        Source
        """
        if isinstance(other, list):
            return onetick.py.functions.merge(other + [self])
        else:
            return onetick.py.functions.merge([self, other])
    def __add_field(self, key, value):
        # -------------
        # ADD_FIELDS
        # -------------
        # TODO: merge together several add fields in one add_fields
        if isinstance(value, _ColumnAggregation):
            value.apply(self, key)
            return
        if type(value) is tuple:
            value, dtype = value
        else:
            dtype = ott.get_object_type(value)
            if type(value) is str:
                if len(value) > ott.string.DEFAULT_LENGTH:
                    dtype = ott.string[len(value)]
        if issubclass(dtype, bool):
            # according to OneTick transformations
            dtype = float
        if issubclass(dtype, (ott.datetime, ott.date)):
            # according to OneTick transformations
            dtype = ott.nsectime
        # TODO: shouldn't all such logic be in ott.type2str?
        if np.issubdtype(dtype, np.integer):
            dtype = int
        if np.issubdtype(dtype, np.floating):
            dtype = float
        type_str = ott.type2str(dtype)
        # format value
        str_value = ott.value2str(value)
        self.sink(otq.AddField(field=f'{type_str} {key}', value=str_value))
        self.__dict__[key] = _Column(key, dtype, self)
[docs]    def __getitem__(self, item):
        """
        Allows to express multiple things:
        - access a field by name
        - filter ticks by condition
        - select subset of fields
        - set order of fields
        Parameters
        ----------
        item: str, :class:`Operation`, :func:`eval`, list of str
            - ``str`` is to access column by name or columns specified by regex.
            - ``Operation`` to express filter condition.
            - ``otp.eval`` to express filter condition based on external query
            - ``List[str]`` select subset of specified columns or columns specified in regexes.
            - ``slice[List[str]::]`` set order of columns
            - ``slice[Tuple[str, Type]::]`` type defaulting
            - ``slice[:]`` alias to :meth:`Source.copy()`
            - ``slice[int:int:int]`` select ticks the same way as elements in python lists
        Returns
        -------
        Column, Source or tuple of Sources
            - Column if column name was specified.
            - Two sources if filtering expression or eval was provided: the first one is for ticks that pass condition
              and the second one that do not.
        Examples
        --------
        Access to the `X` column: add `Y` based on `X`
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data['Y'] = data['X'] * 2
        >>> otp.run(data)
                             Time  X  Y
        0 2003-12-01 00:00:00.000  1  2
        1 2003-12-01 00:00:00.001  2  4
        2 2003-12-01 00:00:00.002  3  6
        Filtering based on expression
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data_more, data_less = data[(data['X'] > 2)]
        >>> otp.run(data_more)
                             Time  X
        0 2003-12-01 00:00:00.002  3
        >>> otp.run(data_less)
                             Time  X
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.001  2
        Filtering based on the result of another query. Another query should
        have only one tick as a result with only one field (whatever it names).
        >>> exp_to_select = otp.Ticks(WHERE=['X > 2'])
        >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1])
        >>> data, _ = data[otp.eval(exp_to_select)]
        >>> otp.run(data)
                             Time  X  Y    Z
        0 2003-12-01 00:00:00.002  3  c  0.1
        Select subset of specified columns
        >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1])
        >>> data = data[['X', 'Z']]
        >>> otp.run(data)
                             Time  X    Z
        0 2003-12-01 00:00:00.000  1  0.4
        1 2003-12-01 00:00:00.001  2  0.3
        2 2003-12-01 00:00:00.002  3  0.1
        Slice with list will keep all columns, but change order:
        >>> data=otp.Tick(Y=1, X=2, Z=3)
        >>> otp.run(data)
                Time  Y  X  Z
        0 2003-12-01  1  2  3
        >>> data = data[['X', 'Y']:]
        >>> otp.run(data)
                Time  X  Y  Z
        0 2003-12-01  2  1  3
        Slice can be used as short-cut for :meth:`Source.copy`:
        >>> data[:] # doctest: +ELLIPSIS
        <onetick.py.sources.Tick object at ...>
        Slices can use integers.
        In this case ticks are selected the same way as elements in python lists.
        >>> data = otp.Ticks({'A': [1, 2, 3, 4, 5]})
        Select first 3 ticks:
        >>> otp.run(data[:3])
                             Time  A
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.001  2
        2 2003-12-01 00:00:00.002  3
        Skip first 3 ticks:
        >>> otp.run(data[3:])
                             Time  A
        0 2003-12-01 00:00:00.003  4
        1 2003-12-01 00:00:00.004  5
        Select last 3 ticks:
        >>> otp.run(data[-3:])
                             Time  A
        0 2003-12-01 00:00:00.002  3
        1 2003-12-01 00:00:00.003  4
        2 2003-12-01 00:00:00.004  5
        Skip last 3 ticks:
        >>> otp.run(data[:-3])
                             Time  A
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.001  2
        Skip first and last tick:
        >>> otp.run(data[1:-1])
                             Time  A
        0 2003-12-01 00:00:00.001  2
        1 2003-12-01 00:00:00.002  3
        2 2003-12-01 00:00:00.003  4
        Select every second tick:
        >>> otp.run(data[::2])
                             Time  A
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.002  3
        2 2003-12-01 00:00:00.004  5
        Select every second tick, not including first and last tick:
        >>> otp.run(data[1:-1:2])
                             Time  A
        0 2003-12-01 00:00:00.001  2
        1 2003-12-01 00:00:00.003  4
        Regular expressions can be used to select fields too:
        >>> data = otp.Tick(A=1, AA=2, AB=3, B=4, BB=5, BA=6)
        >>> otp.run(data['A.*'])
                Time  A  AA  AB  BA
        0 2003-12-01  1   2   3   6
        Note that by default pattern is matched in any position of the string.
        Use characters ^ and $ to specify start and end of the string:
        >>> otp.run(data['^A'])
                Time  A  AA  AB
        0 2003-12-01  1   2   3
        Several regular expressions can be specified too:
        >>> otp.run(data[['^A+$', '^B+$']])
                Time  A  AA  B  BB
        0 2003-12-01  1   2  4   5
        See Also
        --------
        | :meth:`Source.table`: another and more generic way to select subset of specified columns
        | **PASSTHROUGH** OneTick event processor
        | **WHERE_CLAUSE** OneTick event processor
        """
        strict = True
        if isinstance(item, (_Operation, _QueryEvalWrapper)):
            if isinstance(item, _Operation):
                item = item._make_python_way_bool_expression()
            where = self.copy(ep=otq.WhereClause(where=str(item)))
            if_source = where.copy()
            if_source.node().out_pin("IF")
            else_source = where.copy()
            else_source.node().out_pin("ELSE")
            # TODO: add ability to remove then this ep, because it is required only for right output
            else_source.sink(otq.Passthrough())
            return if_source, else_source
        if isinstance(item, slice):
            result = self._get_integer_slice(item)
            if result:
                return result
            if item.step:
                raise AttributeError("Source columns slice with step set makes no sense")
            if item.start and item.stop:
                raise AttributeError("Source columns slice with both start and stop set is not available now")
            if not item.start and item.stop:
                raise AttributeError("Source columns slice with only stop set is not implemented yet")
            if item.start is None and item.stop is None:
                return self.copy()
            item = item.start
            strict = False
            if isinstance(item, tuple):
                item = dict([item])
            elif isinstance(item, list):
                if not item:
                    return self.copy()
                item_type = list(set([type(x) for x in item]))
                if len(item_type) > 1:
                    raise AttributeError(f"Different types {item_type} in slice list is not supported")
                if item_type[0] == tuple:
                    item = dict(item)
        if isinstance(item, (list, str)):
            # check if item has regex characters
            item_list = [item] if isinstance(item, str) else item
            try:
                items_to_passthrough, use_regex = self._columns_names_regex(item_list)
            except TypeError:
                use_regex = False
            if use_regex:
                src = self.copy()
                src.sink(otq.Passthrough(fields=','.join(items_to_passthrough), use_regex=True))
                return src
        if isinstance(item, list):
            # ---------
            # TABLE
            # ---------
            items = []
            for it in item:
                if isinstance(it, _Column):
                    items.append(it.name)
                elif isinstance(it, str):
                    items.append(it)
                else:
                    raise ValueError(f"It is not supported to filter '{it}' object of '{type(it)}' type")
            # validation
            for item in items:
                if item not in self.schema:
                    existing_columns = ", ".join(self.schema.keys())
                    raise AttributeError(
                        f"There is no '{item}' column. There are existing columns: {existing_columns}"
                    )
            columns = {}
            dtypes = self.columns(skip_meta_fields=True)
            for column_name in items:
                if column_name not in ['Time', 'Timestamp', 'TIMESTAMP']:
                    columns[column_name] = dtypes[column_name]
            return self.table(strict=strict, **columns)
        if isinstance(item, dict):
            return self.table(strict=strict, **item)
        # way to set type
        if isinstance(item, tuple):
            name, dtype = item
            warnings.warn('Using tuple with name and type in otp.Source.__getitem__() is not supported anymore,'
                          ' change your code to use otp.Source.schema object instead.', DeprecationWarning)
            return self._set_field_by_tuple(name, dtype)
        name = item
        if name not in self.__dict__:
            raise KeyError(f'Column name {name} is not in the schema. Please, check that this column '
                           'is in the schema or add it using the .schema property')
        if not isinstance(self.__dict__[name], _Column):
            raise AttributeError(f"There is no '{name}' column")
        return self.__dict__[name] 
    def _set_field_by_tuple(self, name, dtype):
        # TODO: move this logic to self.set_schema()
        warnings.warn('Using _set_field_by_tuple() is not recommended,'
                      ' change your code to use otp.Source.schema object.', DeprecationWarning)
        if name not in self.__dict__:
            if dtype is None:
                raise KeyError(f'Column name {name} is not in the schema. Please, check that this column '
                               'is in the schema or add it using the .schema property')
            if name == 0 or name == 1:
                raise ValueError(f"constant {name} are not supported for indexing for now, please use otp.Empty")
            if type(name) in (int, float):
                raise ValueError("integer indexes are not supported")
            self.__dict__[name] = _Column(name, dtype, self)
        else:
            if not isinstance(self.__dict__[name], _Column):
                raise AttributeError(f"There is no '{name}' column")
            if dtype:
                type1, type2 = self.__dict__[name].dtype, dtype
                b_type1, b_type2 = ott.get_base_type(type1), ott.get_base_type(type2)
                if b_type1 != b_type2:
                    if {type1, type2} == {int, float}:
                        self.__dict__[name]._dtype = float
                    else:
                        raise Warning(
                            f"Column '{name}' was declared as '{type1}', but you want to change it to '{type2}', "
                            "that is not possible without setting type directly via assigning value"
                        )
                else:
                    if issubclass(b_type1, str):
                        t1_length = ott.string.DEFAULT_LENGTH if type1 is str else type1.length
                        t2_length = ott.string.DEFAULT_LENGTH if type2 is str else type2.length
                        self.__dict__[name]._dtype = type2 if t1_length < t2_length else type1
                    if {type1, type2} == {ott.nsectime, ott.msectime}:
                        self.__dict__[name]._dtype = ott.nsectime
        return self.__dict__[name]
    def _get_integer_slice(self, item: slice) -> Optional['Source']:
        """
        Treat otp.Source object as a sequence of ticks
        and apply common python integer slicing logic to it.
        """
        start, stop, step = item.start, item.stop, item.step
        for v in (start, stop, step):
            if v is not None and not isinstance(v, int):
                return None
        # let's filter out cases that we don't want to support
        if step is not None and step <= 0:
            raise ValueError("step value can't be negative or zero")
        if stop is not None and stop == 0:
            raise ValueError("stop value can't be zero")
        if start and stop and start > 0 and stop > 0 and start >= stop:
            raise ValueError("stop value can't be less than start")
        if start and start < 0 and stop and stop > 0:
            raise ValueError("start value can't be negative when start value is positive")
        def add_counter(src, force=False):
            if '__NUM__' not in src.schema or force:
                if '__NUM__' in src.schema:
                    src = src.drop('__NUM__')
                src = src.agg({'__NUM__': otp.agg.count()}, running=True, all_fields=True)
            return src
        result = self.copy()
        if start:
            if start > 0:
                result = add_counter(result)
                result, _ = result[result['__NUM__'] > start]
            if start < 0:
                result = result.last(-start)
        if stop:
            if stop > 0:
                result = add_counter(result)
                result, _ = result[result['__NUM__'] <= stop]
            if stop < 0:
                result = add_counter(result)
                last_ticks = result.last(-stop)
                last_ticks['__FLAG__'] = 1
                last_ticks = last_ticks[['__FLAG__', '__NUM__']]
                result = otp.join(result, last_ticks,
                                  on=result['__NUM__'] == last_ticks['__NUM__'],
                                  how='outer', rprefix='RIGHT')
                result, _ = result[result['__FLAG__'] == 0]
                result = result.drop(['__FLAG__', 'RIGHT___NUM__'])
        if step:
            if step > 0:
                # resetting counter
                result = add_counter(result, force=True)
                result, _ = result[(result['__NUM__'] - 1) % step == 0]
        if '__NUM__' in result.schema:
            result = result.drop('__NUM__')
        return result
[docs]    def __setitem__(self, key, value):
        """
        Add new column to the source or update existing one.
        Parameters
        ----------
        key: str
            The name of the new or existing column.
        value: int, str, float, datetime, date, \
               :py:class:`~onetick.py.Column`, :py:class:`~onetick.py.Operation`, :py:class:`~onetick.py.string`, \
               :py:class:`otp.date <onetick.py.date>`, :py:class:`otp.datetime <onetick.py.datetime>`, \
               :py:class:`~onetick.py.nsectime`, :py:class:`~onetick.py.msectime`
            The new value of the column.
        See also
        --------
        | **ADD_FIELD** OneTick event processor
        | **UPDATE_FIELD** OneTick event processor
        Examples
        --------
        >>> data = otp.Tick(A='A')
        >>> data['D'] = otp.datetime(2022, 2, 2)
        >>> data['X'] = 1
        >>> data['Y'] = data['X']
        >>> data['X'] = 12345
        >>> data['Z'] = data['Y'].astype(str) + 'abc'
        >>> otp.run(data)
                Time  A          D      X  Y     Z
        0 2003-12-01  A 2022-02-02  12345  1  1abc
        """
        return self.__setattr__(key, value) 
[docs]    @inplace_operation
    def update(self, if_set, else_set=None, where=1, inplace=False) -> 'Source':
        """
        Update field of the Source or state variable.
        Parameters
        ----------
        if_set: dict
            Dictionary <field name>: <expression>.
        else_set: dict, optional
            Dictionary <field name>: <expression>
        where: expression, optional
            Condition of updating.
            If ``where`` is True the fields from ``if_set`` will be updated with corresponding expression.
            If ``where`` is False, the fields from ``else_set`` will be updated with corresponding expression.
        inplace: bool
            A flag controls whether operation should be applied inplace.
            If ``inplace=True``, then it returns nothing. Otherwise method
            returns a new modified object.
        Returns
        -------
        :class:`Source` or ``None``.
        See also
        --------
        **UPDATE_FIELD** and **UPDATE_FIELDS** OneTick event processors
        Examples
        --------
        Columns can be updated with this method:
        >>> # OTdirective: snippet-name: Arrange.conditional update;
        >>> t = otp.Ticks({'X': [1, 2, 3],
        ...                'Y': [4, 5, 6],
        ...                'Z': [1, 0, 1]})
        >>> t = t.update(if_set={'X': t['X'] + t['Y']},
        ...              else_set={'X': t['X'] - t['Y']},
        ...              where=t['Z'] == 1)
        >>> otp.run(t) # OTdirective: snippet-example;
                             Time  X  Y  Z
        0 2003-12-01 00:00:00.000  5  4  1
        1 2003-12-01 00:00:00.001 -3  5  0
        2 2003-12-01 00:00:00.002  9  6  1
        State variables can be updated too:
        >>> t = otp.Ticks({'X': [1, 2, 3],
        ...                'Y': [4, 5, 6],
        ...                'Z': [1, 0, 1]})
        >>> t.state_vars['X'] = 0
        >>> t = t.update(if_set={t.state_vars['X']: t['X'] + t['Y']},
        ...              else_set={t.state_vars['X']: t['X'] - t['Y']},
        ...              where=t['Z'] == 1)
        >>> t['UX'] = t.state_vars['X']
        >>> otp.run(t)
                             Time  X  Y  Z  UX
        0 2003-12-01 00:00:00.000  1  4  1   5
        1 2003-12-01 00:00:00.001  2  5  0  -3
        2 2003-12-01 00:00:00.002  3  6  1   9
        """
        if else_set is None:
            else_set = {}
        if len(if_set) == 0 or not isinstance(if_set, dict):
            raise ValueError(
                f"'if_set' parameter should be non empty dict, but got '{if_set}' of type '{type(if_set)}'"
            )
        def _prepare(to_prepare):
            result = {}
            for in_obj, out_obj in to_prepare.items():
                if isinstance(in_obj, _StateColumn):
                    result[in_obj] = out_obj
                elif isinstance(in_obj, _Column):
                    result[in_obj.name] = out_obj
                elif isinstance(in_obj, str):
                    result[in_obj.strip()] = out_obj
                else:
                    raise AttributeError(
                        f"It is not supported to update item '{in_obj}' of type '{type(in_obj)}'"
                    )
            return result
        def _validate(to_validate):
            result = {}
            for in_key, out_obj in to_validate.items():
                if isinstance(in_key, _StateColumn):
                    in_dtype = in_key.dtype
                else:
                    if not (in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column)):
                        raise AttributeError(f"There is no '{in_key}' column to update")
                    if in_key == "Time" or in_key == "TIMESTAMP":
                        raise ValueError("It is not allowed to modify 'Time' column using .update method")
                    in_dtype = self.schema[in_key]
                dtype = ott.get_object_type(out_obj)
                if not ott.is_type_supported(dtype):
                    raise TypeError(f"Deduced type of object {out_obj} is not supported: {dtype}")
                # will raise exception if types are incompatible
                _ = ott.get_type_by_objects([dtype, in_dtype])
                if isinstance(in_key, _StateColumn):
                    in_key = str(in_key)
                if isinstance(out_obj, bool):
                    out_obj = int(out_obj)
                result[in_key] = out_obj
            return result
        # prepare and validate
        items = _validate(_prepare(if_set))
        else_items = _validate(_prepare(else_set))
        if isinstance(where, bool):
            where = int(where)
        if not (getattr(where, "dtype", None) is bool
                or isinstance(where, int)):
            raise ValueError(f"Where has not supported type '{type(where)}'")
        # apply
        set_rules = [f"{key}=({ott.value2str(value)})" for key, value in items.items()]
        else_set_rules = [f"{key}=({ott.value2str(value)})" for key, value in else_items.items()]
        self.sink(otq.UpdateFields(set=",".join(set_rules), else_set=",".join(else_set_rules), where=str(where)))
        return self 
[docs]    @docstring(parameters=[_agg_doc, _running_doc, _all_fields_with_policy_doc, _bucket_interval_doc, _bucket_time_doc,
                           _bucket_units_doc, _bucket_end_condition_doc, _end_condition_per_group_doc,
                           _boundary_tick_bucket_doc, _group_by_doc], add_self=True)
    def agg(self, aggs, *args, **kwargs) -> 'Source':
        """
        Applies composition of :ref:`otp.agg <aggregations_funcs>` aggregations
        See Also
        --------
        | :ref:`Aggregations <aggregations_funcs>`
        | **COMPUTE** OneTick event processor
        Examples
        --------
        By default the whole data is aggregated:
        >>> data = otp.Ticks(X=[1, 2, 3, 4])
        >>> data = data.agg({'X_SUM': otp.agg.sum('X')})
        >>> otp.run(data)
                Time  X_SUM
        0 2003-12-04     10
        Multiple aggregations can be applied at the same time:
        >>> data = otp.Ticks(X=[1, 2, 3, 4])
        >>> data = data.agg({'X_SUM': otp.agg.sum('X'),
        ...                  'X_MEAN': otp.agg.average('X')})
        >>> otp.run(data)
                Time  X_SUM  X_MEAN
        0 2003-12-04     10     2.5
        Aggregation can be used in running mode:
        >>> data = otp.Ticks(X=[1, 2, 3, 4])
        >>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True)
        >>> otp.run(data)
                             Time  CUM_SUM
        0 2003-12-01 00:00:00.000        1
        1 2003-12-01 00:00:00.001        3
        2 2003-12-01 00:00:00.002        6
        3 2003-12-01 00:00:00.003       10
        Aggregation can be split in buckets:
        >>> data = otp.Ticks(X=[1, 2, 3, 4])
        >>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks')
        >>> otp.run(data)
                             Time  X_SUM
        0 2003-12-01 00:00:00.001      3
        1 2003-12-01 00:00:00.003      7
        Running aggregation can be used with buckets too. In this case (all_fields=False and running=True) output ticks
        are created when a tick enters or leaves the sliding window (that's why for this example there are 8 output
        ticks for 4 input ticks):
        >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600])
        >>> data = data.agg(dict(X_MEAN=otp.agg.average("X"),
        ...                      X_STD=otp.agg.stddev("X")),
        ...                 running=True, bucket_interval=2)
        >>> otp.run(data)
                             Time  X_MEAN     X_STD
        0 2003-12-01 00:00:00.000     1.0  0.000000
        1 2003-12-01 00:00:01.000     1.5  0.500000
        2 2003-12-01 00:00:01.500     2.0  0.816497
        3 2003-12-01 00:00:02.000     2.5  0.500000
        4 2003-12-01 00:00:03.000     3.0  0.000000
        5 2003-12-01 00:00:03.500     NaN       NaN
        6 2003-12-01 00:00:03.600     4.0  0.000000
        7 2003-12-01 00:00:05.600     NaN       NaN
        If all_fields=True an output tick is generated only for arrival events, but all attributes from the input tick
        causing an arrival event are copied over to the output tick and the aggregation is added as another attribute:
        >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600])
        >>> data = data.agg(dict(X_MEAN=otp.agg.average("X"),
        ...                      X_STD=otp.agg.stddev("X")),
        ...                 all_fields=True, running=True)
        >>> otp.run(data)
                             Time  X  X_MEAN     X_STD
        0 2003-12-01 00:00:00.000  1     1.0  0.000000
        1 2003-12-01 00:00:01.000  2     1.5  0.500000
        2 2003-12-01 00:00:01.500  3     2.0  0.816497
        3 2003-12-01 00:00:03.600  4     2.5  1.118034
        ``all_fields`` parameter can be used when there is need to have all original fields in the output:
        >>> ticks = otp.Ticks(X=[3, 4, 1, 2])
        >>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"),
        ...                       X_STD=otp.agg.stddev("X")),
        ...                  all_fields=True)
        >>> otp.run(data)
                Time  X  X_MEAN     X_STD
        0 2003-12-04  3     2.5  1.118034
        There are different politics for ``all_fields`` parameter:
        >>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"),
        ...                       X_STD=otp.agg.stddev("X")),
        ...                  all_fields="last")
        >>> otp.run(data)
                Time  X  X_MEAN     X_STD
        0 2003-12-04  2     2.5  1.118034
        For low/high policies the field selected as input is set this way:
        >>> data = ticks.agg(dict(X_MEAN=otp.agg.average("X"),
        ...                       X_STD=otp.agg.stddev("X")),
        ...                  all_fields=otp.agg.low_tick(data["X"]))
        >>> otp.run(data)
                Time  X  X_MEAN     X_STD
        0 2003-12-04  1     2.5  1.118034
        """
        aggs = aggs.copy()
        result = self.copy()
        what_to_aggregate = aggregations.compute(
            *args,
            **kwargs,
        )
        for name, ag in aggs.items():
            what_to_aggregate.add(name, ag)
        result = what_to_aggregate.apply(result)
        result._add_table()
        return result 
    def sort_values(self, *args, **kwargs):
        """
        alias of sort
        See Also
        --------
        :meth:`Source.sort`
        """
        return self.sort(*args, **kwargs)
[docs]    @inplace_operation
    def sort(self,
             by: Union[str, Collection[Union[str, 'onetick.py.Column']]],
             ascending=True,
             inplace=False) -> Optional['Source']:
        """ Sort ticks by columns.
        Parameters
        ----------
        by: str, Column or list of them
            Column(s) to sort by. It is possible to pass a list of column, where is the order is important:
            from the left to the right.
        ascending: bool or list
            Order to sort by. If list of columns is specified, then list of ascending values per column is expected.
            (the :class:`nan` is the smallest for ``float`` type fields)
        inplace: bool
            A flag controls whether operation should be applied inplace.
            If ``inplace=True``, then it returns nothing. Otherwise method
            returns a new modified object
        Returns
        -------
        :class:`Source`
        See also
        --------
        **ORDER_BY** OneTick event processor
        Examples
        --------
        Single column examples
        >>> data = otp.Ticks({'X':[     94,   5,   34],
        ...                   'Y':[otp.nan, 3.1, -0.3]})
        >>> data = data.sort(data['X'])
        >>> otp.run(data)
                             Time   X    Y
        0 2003-12-01 00:00:00.001   5  3.1
        1 2003-12-01 00:00:00.002  34 -0.3
        2 2003-12-01 00:00:00.000  94  NaN
        >>> data = otp.Ticks({'X':[     94,   5,   34],
        ...                   'Y':[otp.nan, 3.1, -0.3]})
        >>> data = data.sort(data['Y'])
        >>> otp.run(data)
                             Time   X    Y
        0 2003-12-01 00:00:00.000  94  NaN
        1 2003-12-01 00:00:00.002  34 -0.3
        2 2003-12-01 00:00:00.001   5  3.1
        Inplace
        >>> data = otp.Ticks({'X':[     94,   5,   34],
        ...                   'Y':[otp.nan, 3.1, -0.3]})
        >>> data.sort(data['Y'], inplace=True)  # OTdirective: snippet-name: Arrange.sort.inplace;
        >>> otp.run(data)
                             Time   X    Y
        0 2003-12-01 00:00:00.000  94 NaN
        1 2003-12-01 00:00:00.002  34 -0.3
        2 2003-12-01 00:00:00.001  5   3.1
        Multiple columns
        >>> data = otp.Ticks({'X':[  5,   6,   3,   6],
        ...                   'Y':[1.4, 3.1, 9.1, 5.5]})
        >>> data = data.sort([data['X'], data['Y']])
        >>> otp.run(data)
                             Time  X    Y
        0 2003-12-01 00:00:00.002  3  9.1
        1 2003-12-01 00:00:00.000  5  1.4
        2 2003-12-01 00:00:00.001  6  3.1
        3 2003-12-01 00:00:00.003  6  5.5
        Ascending/descending control
        >>> data = otp.Ticks({'X':[     94,   5,   34],
        ...                   'Y':[otp.nan, 3.1, -0.3]})
        >>> data = data.sort(data['X'], ascending=False)
        >>> otp.run(data)
                             Time   X    Y
        0 2003-12-01 00:00:00.000  94  NaN
        1 2003-12-01 00:00:00.002  34 -0.3
        2 2003-12-01 00:00:00.001   5  3.1
        >>> # OTdirective: snippet-name: Arrange.sort.sort;
        >>> data = otp.Ticks({'X':[  5,   6,   3,   6],
        ...                   'Y':[1.4, 3.1, 9.1, 5.5]})
        >>> data = data.sort([data['X'], data['Y']], ascending=[True, False])
        >>> otp.run(data)
                             Time  X    Y
        0 2003-12-01 00:00:00.002  3  9.1
        1 2003-12-01 00:00:00.000  5  1.4
        2 2003-12-01 00:00:00.003  6  5.5
        3 2003-12-01 00:00:00.001  6  3.1
        """
        columns = by
        if isinstance(columns, list):
            objs = columns
        else:
            objs = [columns]
        if isinstance(ascending, list):
            asc_objs = ascending
        else:
            asc_objs = [ascending]
        items = []
        # -------------------------------
        # Columns processing
        # -------------------------------
        # convert to strings
        # TODO: it seems as a common code, need to move to a separate function
        for obj in objs:
            if isinstance(obj, _Column):
                items.append(obj.name)
            elif isinstance(obj, str):
                items.append(obj)
            else:
                # TODO: cover with tests
                raise TypeError(f"It is not supported to order by '{obj}' of type '{type(obj)}'")
        # validate
        for item in items:
            if item in self.__dict__:
                if not isinstance(self.__dict__[item], _Column):
                    # TODO: cover with tests
                    raise AttributeError(f"There is no '{item}' column")
                #  if
            else:
                # TODO: covert with tests
                raise AttributeError(f"There is no '{item}' column")
        # -------------------------------
        # Asc processing
        # -------------------------------
        asc_items = [True] * len(items)
        def asc_convert(v):
            return "ASC" if v else "DESC"
        for inx in range(len(items)):
            if inx >= len(asc_objs):
                asc_obj = asc_items[inx]
            else:
                asc_obj = asc_objs[inx]
            if isinstance(asc_obj, bool):
                asc_items[inx] = asc_convert(asc_obj)
            elif isinstance(asc_obj, int):
                asc_items[inx] = asc_convert(asc_obj)
            else:
                raise TypeError(f"asc can not be '{asc_obj}' of type '{type(asc_obj)}'")
        # ---------------
        # combine together
        order_by = [column_name + " " + asc for column_name, asc in zip(items, asc_items)]
        self.sink(otq.OrderByEp(order_by=",".join(order_by)))
        return self 
    def _hash(self):
        return self.__hash
    def _merge_tmp_otq(self, source):
        self._tmp_otq.merge(source._tmp_otq)
    def __prepare_graph(self, symbols=None, start=None, end=None, has_output=False):
        # We copy object here, because we will change it according to passed
        # symbols and date ranges. For example, we can add modify_query_times EP
        # if it is necessary
        obj = self.copy()
        if has_output:
            obj.sink(otq.Passthrough())
        start, end, symbols = obj._get_date_range(symbols, start, end)
        if start is adaptive:
            start = None
        if end is adaptive:
            end = None
        if symbols is not None and isinstance(symbols, pd.DataFrame):
            symbols = utils.get_symbol_list_from_df(symbols)
        if symbols is not None and not isinstance(symbols, list):
            symbols = [symbols]
        elif symbols is None:
            symbols = []
        _symbols = []
        for sym in symbols:
            _symbols.append(self._convert_symbol_to_string(sym, tmp_otq=obj._tmp_otq, start=start, end=end))
        return obj, start, end, _symbols
[docs]    def to_otq(self, file_name=None, file_suffix=None, query_name=None, symbols=None, start=None, end=None,
               timezone=None, raw=None, add_passthrough=True):
        """
        Save data source to .otq file and return path to the saved file.
        Parameters
        ----------
        file_name: str
            Absolute or relative path to the saved file.
            If ``None``, create temporary file and name it randomly.
        file_suffix: str
            Suffix to add to the saved file name (including extension).
            Can be specified if ``file_name`` is ``None``
            to distinguish between different temporary files.
            Default: ".to_otq.otq"
        query_name: str
            Name of the main query in the created file.
            If ``None``, take name from this Source object.
            If that name is empty, set name to "query".
        symbols: str, list, :pandas:`DataFrame <pandas.DataFrame>`, :class:`Source`
            symbols to save query with
        start: datetime
            start time to save query with
        end: datetime
            end time to save query with
        timezone: str
            timezone to save query with
        raw
            .. deprecated:: 1.4.17
        add_passthrough: bool
            will add :py:class:`onetick.query.Passthrough` event processor at the end of resulting graph
        Returns
        -------
        result: str
            Relative (if ``file_name`` is relative) or absolute path to the created query
            in the format ``file_name::query_name``
        """
        if raw is not None:
            warnings.warn('The "raw" flag is deprecated and makes no effect', DeprecationWarning)
        if timezone is None:
            timezone = configuration.config.tz
        file_path = str(file_name) if file_name is not None else None
        if file_suffix is None:
            file_suffix = self._name_suffix('to_otq.otq')
        if query_name is None:
            query_name = self.get_name(remove_invalid_symbols=True)
        if query_name is None:
            query_name = 'query'
        obj, start, end, symbols = self.__prepare_graph(symbols, start, end)
        graph = obj._to_graph(add_passthrough=add_passthrough)
        graph.set_symbols(symbols)
        return obj._tmp_otq.save_to_file(query=graph, query_name=query_name, file_path=file_path,
                                         file_suffix=file_suffix, start=start, end=end, timezone=timezone) 
    def _store_in_tmp_otq(self, tmp_otq, operation_suffix="tmp_query", symbols=None, start=None, end=None,
                          raw=None, add_passthrough=True, name=None):
        """
        Adds this source to the tmp_otq storage
        Parameters:
        tmp_otq: TmpOtq
            Storage object
        operation_suffix: str
            Suffix string to be added to the autogenerated graph name in the otq file
        name: str, optional
            If specified, this ``name`` will be used to save query
            and ``suffix`` parameter will be ignored.
        Returns:
        result: str
            String with the name of the saved graph (starting with THIS::)
        """
        # We copy object here, because we will change it according to passed
        # symbols and date ranges. For example, we can add modify_query_times EP
        # if it is necessary
        if raw is not None:
            warnings.warn('The "raw" flag is deprecated and makes no effect', DeprecationWarning)
        obj = self.copy()
        start, end, symbols = obj._get_date_range(symbols, start, end)
        tmp_otq.merge(obj._tmp_otq)
        if symbols and not isinstance(symbols, list):
            symbols = [symbols]
        elif symbols is None:
            symbols = []
        _symbols = []
        for sym in symbols:
            _symbols.append(self._convert_symbol_to_string(sym, tmp_otq))
        if isinstance(start, ott.dt):  # OT save_to_file checks for the datetime time
            start = datetime.fromtimestamp(start.timestamp())
        if isinstance(end, ott.dt):
            end = datetime.fromtimestamp(end.timestamp())
        graph = obj._to_graph(add_passthrough=add_passthrough)
        graph.set_start_time(start)
        graph.set_end_time(end)
        graph.set_symbols(_symbols)
        suffix = self._name_suffix(suffix=operation_suffix, separator='__', remove_invalid_symbols=True)
        return tmp_otq.add_query(graph, suffix=suffix, name=name)
    def _save_as_tmp_otq(self, start=None, end=None, symbols=None, suffix=""):
        tmp_otq = utils.TmpFile(f"{suffix}.otq")
        query_path = self.to_otq(tmp_otq, symbols=symbols, start=start, end=end)
        return query_path
[docs]    def plot(self, y, x='Time', kind='line', **kwargs):
        """
        Executes the query with known properties and builds a plot resulting dataframe.
        Uses the pandas dataframe plot method to plot data.
        Other parameters could be specified through the ``kwargs``.
        Parameters
        ----------
        x: str
            Column name for the X axis
        y: str
            Column name for the Y axis
        kind: str
            The kind of plot
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data.plot(y='X', kind='bar')  # doctest: +SKIP
        """
        result = self.copy()
        return result[[y, x]]().plot(x=x, y=y, kind=kind, **kwargs) 
[docs]    def count(self, **kwargs):
        """
        Returns the number of ticks in the query.
        Adds an aggregation that calculate total ticks count, and executes a query.
        Result is a single value -- number of ticks. Possible application is the jupyter when
        a developer wants to check data presences for example.
        Parameters
        ----------
        kwargs
            parameters that will be passed to :py:func:`otp.run <onetick.py.run>`
        Returns
        -------
        int
        See Also
        --------
        :py:func:`otp.run <onetick.py.run>`
        Source.head, Source.tail, :py:func:`onetick.py.agg.count`
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data.count()
        3
        >>> data = otp.Empty()
        >>> data.count()
        0
        """
        result = self.copy()
        result = otp.run(result.agg({'__num_rows': otp.agg.count()}), **kwargs)
        if result.empty:
            return 0
        return int(result['__num_rows'][0]) 
[docs]    def head(self, n=5, **kwargs) -> 'Union[pd.DataFrame, Source]':
        """
        Executes the query and returns first ``n`` ticks as a pandas dataframe.
        It is useful in the jupyter case when you want to observe first ``n`` values.
        Parameters
        ----------
        n: int, default=5
            number of ticks to return
        kwargs:
            parameters will be passed to :py:func:`otp.run <onetick.py.run>`
        Returns
        -------
        :pandas:`DataFrame <pandas.DataFrame>`
        See Also
        --------
        :py:func:`otp.run <onetick.py.run>`
        Source.tail, Source.count
        Examples
        --------
        >>> data = otp.Ticks(X=list('abcdefgik'))
        >>> data.head()[['X']]
          X
        0 a
        1 b
        2 c
        3 d
        4 e
        """
        result = self.copy()
        result = result.first(n=n)  # pylint: disable=E1123
        return otp.run(result, **kwargs) 
[docs]    def tail(self, n=5, **kwargs) -> Union['pd.DataFrame', 'Source']:
        """
        Executes the query and returns last ``n`` ticks as a pandas dataframe.
        It is useful in the jupyter case when you want to observe last ``n`` values.
        Parameters
        ----------
        n: int
            number of ticks to return
        kwargs:
            parameters will be passed to :py:func:`otp.run <onetick.py.run>`
        Returns
        -------
        :pandas:`DataFrame <pandas.DataFrame>`
        See Also
        --------
        :py:func:`otp.run <onetick.py.run>`
        :meth:`Source.head`, :meth:`Source.count`
        Examples
        --------
        >>> data = otp.Ticks(X=list('abcdefgik'))
        >>> data.tail()[['X']]
          X
        0 e
        1 f
        2 g
        3 i
        4 k
        """
        result = self.copy()
        result = result.last(n=n)  # pylint: disable=E1123
        return otp.run(result, **kwargs) 
    def _rename_impl_(self, columns=None, use_regex=False, fields_to_skip=None):
        # prepare
        items = {}
        out_names = set()
        fields_to_skip = fields_to_skip or []
        for in_obj, out_obj in columns.items():
            if isinstance(in_obj, _Column):
                items[in_obj.name.strip()] = out_obj.strip()
            elif isinstance(in_obj, str):
                items[in_obj.strip()] = out_obj.strip()
            else:
                raise Exception(f"It is not supported to rename item '{in_obj}' of type {type(in_obj)}'")
            if out_obj in out_names:
                raise AttributeError(
                    f"You want to rename '{in_obj}' into '{out_obj}', "
                    f"but also want to rename another column into '{out_obj}'"
                )
            out_names.add(out_obj)
        schema_update_dict = items
        if use_regex:
            schema_update_dict = self._get_columns_names_renaming(items, fields_to_skip)
        # validate
        for in_key, out_key in schema_update_dict.items():
            if " " in out_key:
                raise AttributeError(f"There is space in '{out_key}' column name")
            if in_key in self.__dict__ and isinstance(self.__dict__[in_key], _Column):
                pass
            else:
                raise AttributeError(f"There is no '{in_key}' column to rename")
            if out_key in self.__dict__ and isinstance(self.__dict__[out_key], _Column):
                raise AttributeError(f"Column '{out_key}' is already exist")
        # apply
        for in_key, out_key in schema_update_dict.items():
            self.__dict__[in_key].rename(out_key, update_parent_object=False)
            self.__dict__[out_key] = self.__dict__[in_key]
            del self.__dict__[in_key]
        rename_rules = [key + "=" + value for key, value in items.items()]
        kwargs = dict(rename_fields=",".join(rename_rules))
        if use_regex:
            kwargs['use_regex'] = True
        if fields_to_skip:
            kwargs['fields_to_skip'] = ",".join(fields_to_skip)
        self.sink(otq.RenameFieldsEp(**kwargs))
    def _get_columns_names_renaming(self, rename: Dict[str, str], not_to_rename: List[str]) -> Dict[str, str]:
        """
        We can't be sure python Source has consistent columns cache, because sinking complex event processors
        can change columns unpredictable, so if user will specify regex as a param, we will pass regex
        as an onetick's param, but rename all matched columns from python Source cache.
        Parameters
        ----------
        rename:
            Dict old_name -> new_name. Some of the dictionary's items use regex.
        Returns
        -------
        output_dict:
            Dict old_name -> new_name used for renaming columns in Source. None of this dictionary's items use regex.
        """
        output_dict = {}
        for old_name, new_name in rename.items():
            matching_columns = [col for col in self.schema
                                if re.match(old_name, col) and not self._is_excluded(col, not_to_rename)]
            for col in matching_columns:
                output_dict[col] = re.sub(old_name, new_name, col)
        return output_dict
    @staticmethod
    def _is_excluded(s: str, not_to_rename: List[str]) -> bool:
        for excluded in not_to_rename:
            if re.match(excluded, s):
                return True
        return False
[docs]    @inplace_operation
    def rename(self, columns=None, use_regex=False, fields_to_skip=None, inplace=False) -> Optional['Source']:
        r"""
        Rename columns
        Parameters
        ----------
        columns : dict
            Rules how to rename in the following format: {<column> : <new-column-name>},
            where <column> is either existing column name of str type or reference to a column,
            and <new-column-name> a new column name of str type.
        use_regex: bool
            If true, then old-name=new-name pairs in the `columns` parameter are treated as regular expressions.
            This allows bulk renaming for field names. Notice that regular expressions for old names are treated as
            if both their prefix and their suffix are .*, i.e. the prefix and suffix match any substring.
            As a result, old-name *XX* will match all of *aXX*, *aXXB*, and *XXb*, when `use_regex=true`.
            You can have old-name begin from ^ to indicate that .* prefix does not apply,
            and you can have old name end at $ to indicate that .* suffix does not apply.
            Default: false
        fields_to_skip: list of str
            A list of regular expressions for specifying fields that should be skipped
            (i.e., not be renamed). If a field is matched by one of the specified regular expressions,
            it won't be considered for renaming.
            Default: None
        inplace : bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing. Otherwise, method returns a new modified
            object.
        Returns
        -------
        :class:`Source` or ``None``
        See also
        --------
        **RENAME** OneTick event processor
        Examples
        --------
        >>> data = otp.Ticks(X=[1], Y=[2])
        >>> data = data.rename({'X': 'XX',
        ...                     data['Y']: 'YY'})
        >>> otp.run(data)
                Time  XX  YY
        0 2003-12-01   1   2
        >>> data = otp.Tick(**{'X.X': 1, 'X.Y': 2})
        >>> data = data.rename({r'X\.(.*)': r'\1'}, use_regex=True)
        >>> otp.run(data)
                Time   X   Y
        0 2003-12-01   1   2
        >>> data = otp.Tick(**{'X.X': 1, 'X.Y': 2})
        >>> data = data.rename({r'X\.(.*)': r'\1'}, use_regex=True, fields_to_skip=['X.Y'])
        >>> otp.run(data)
                Time   X X.Y
        0 2003-12-01   1   2
        """
        if columns is None:
            columns = {}
        self._rename_impl_(columns, use_regex, fields_to_skip)
        return self 
[docs]    @inplace_operation
    def execute(self, *operations, inplace=False) -> Optional['Source']:
        """
        Execute operations without returning their values.
        Some operations in onetick.py can be used to modify the state of some object
        (tick sequences mostly) and in that case user may not want to save the result of the
        operation to column.
        Parameters
        ----------
        operations : list of :py:class:`~onetick.py.Operation`
            operations to execute.
        inplace : bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
            object.
        Returns
        -------
        :class:`Source` or ``None``
        See Also
        --------
        **EXECUTE_EXPRESSIONS** OneTick event processor
        Examples
        --------
        >>> data = otp.Tick(A=1)
        >>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'A')
        >>> data = data.execute(data.state_vars['SET'].erase(A=1))
        """
        if not operations:
            raise ValueError('At least one operation must be specified in execute() method')
        op_str = ';'.join(map(str, operations))
        self.sink(otq.ExecuteExpressions(op_str))
        return self 
    def __refresh_hash(self):
        """
        This internal function refreshes hash for every graph modification.
        It is used only in _ProxyNode, because it tracks nodes changes
        """
        self.__hash = uuid.uuid3(uuid.NAMESPACE_DNS, str(self.__hash))
    def _prepare_for_execution(self, symbols=None, start=None, end=None, start_time_expression=None,
                               end_time_expression=None, timezone=None, has_output=None,
                               running_query_flag=None, require_dict=False, node_name=None):
        if has_output is None:
            has_output = self.__source_has_output
        if timezone is None:
            timezone = configuration.config.tz
        obj, start, end, symbols = self.__prepare_graph(symbols, start, end, has_output)
        require_dict = require_dict or _is_dict_required(symbols)
        if node_name is None:
            node_name = 'SOURCE_CALL_MAIN_OUT_NODE'
            obj.node().node_name(node_name)
        graph = obj._to_graph(add_passthrough=False)
        graph.set_symbols(symbols)
        tmp_file = utils.TmpFile(suffix=self._name_suffix('run.otq'))
        query_to_run = obj._tmp_otq.save_to_file(query=graph,
                                                 query_name=self.get_name(remove_invalid_symbols=True)
                                                 if self.get_name(remove_invalid_symbols=True) else "main_query",
                                                 file_path=tmp_file.path,
                                                 start=start, end=end,
                                                 running_query_flag=running_query_flag,
                                                 start_time_expression=start_time_expression,
                                                 end_time_expression=end_time_expression,
                                                 timezone=timezone)
        # symbols and start/end times should be already stored in the query and should not be passed again
        return dict(
            query=query_to_run,
            symbols=None,
            start=None,
            end=None,
            start_time_expression=None,
            end_time_expression=None,
            require_dict=require_dict,
            node_name=node_name,
            time_as_nsec=True,
        )
[docs]    def __call__(self, *args, **kwargs):
        """
        .. deprecated:: 1.48.3
           Use :py:func:`otp.run <onetick.py.run>` instead.
        """
        warnings.warn('__call__() method is deprecated, use otp.run() instead', DeprecationWarning, stacklevel=2)
        return otp.run(self, *args, **kwargs) 
[docs]    def to_df(self, symbols=None, **kwargs):
        """
        .. deprecated:: 1.48.3
           Use :py:func:`otp.run <onetick.py.run>` instead.
        """
        warnings.warn('to_df() method is deprecated, use otp.run() instead', DeprecationWarning, stacklevel=2)
        # For backward compatibility: otp.run() does not accept "symbols" as a non-keyword argument
        if symbols is not None:
            kwargs['symbols'] = symbols
        return otp.run(self, **kwargs) 
    to_dataframe = to_df
    def print_api_graph(self):
        self.node().copy_graph(print_out=True)
    def _add_table(self, strict=False):
        table = otq.Table(
            fields=",".join(
                ott.type2str(dtype) + " " + name for name, dtype in self.columns(skip_meta_fields=True).items()
            ),
            keep_input_fields=not strict,
        )
        self.sink(table)
    def _is_unbound_required(self):
        """ Check whether a graph needs unbound symbol or not """
        for symbol in self.__sources_symbols.values():
            if symbol is adaptive or symbol is adaptive_to_default:
                return True
        return False
    def _get_widest_time_range(self):
        """
        Get minimum start time and maximum end time.
        If time is not found, None is returned.
        """
        start_times = []
        end_times = []
        for start, end in self.__sources_keys_dates.values():
            if start is not adaptive:
                start_times.append(start)
            if end is not adaptive:
                end_times.append(end)
        start = min(start_times) if start_times else None
        end = max(end_times) if end_times else None
        return start, end
    def _get_date_range(self, symbols=None, start=None, end=None, default_start=None, default_end=None):   #noqa
        if default_start is None:
            default_start = configuration.config.get('default_start_time', adaptive)
        if default_end is None:
            default_end = configuration.config.get('default_end_time', adaptive)
        # ------------ #
        # Find symbols
        need_to_bind_symbol = False
        common_symbol = None
        if symbols is None:
            # let's try to understand whether we could use common symbol for all sources
            # or we need to bound symbols instead
            first_symbol = None
            for symbol in self.__sources_symbols.values():
                if first_symbol is None:
                    first_symbol = symbol
                    if isinstance(first_symbol, _ManuallyBoundValue):
                        # Mark that we need to bound, but keep common_symbol equal to None.
                        # It is applicable for the bound symbols inside the merge with bound
                        # symbols, for example.
                        need_to_bind_symbol = True
                    else:
                        common_symbol = symbol
                    continue
                if symbol and symbol != first_symbol:
                    need_to_bind_symbol = True
                    common_symbol = None
                    break
            # symbol is specified nowhere - just set unbound to the default one
            if (first_symbol is adaptive or first_symbol is adaptive_to_default) and (
                common_symbol is adaptive or common_symbol is adaptive_to_default
            ):
                common_symbol = configuration.config.default_symbol
        else:
            # when unbound symbols passed
            common_symbol = symbols
            need_to_bind_symbol = True  # use to check all sources whether some has bound symbols
        # Find max and min for _source data ranges
        sources_start, sources_end = self._get_widest_time_range()
        sources_start = sources_start or default_start
        sources_end = sources_end or default_end
        common_format = "%Y-%m-%d %H:%M:%S"
        for key, date_range in self.__sources_keys_dates.items():
            # find a function that builds _source
            func = self.__sources_base_ep_func[key]
            if func:
                src = func()
                # --------------------------
                # determine whether we have to add modify_query_times to a src
                start_date, end_date = date_range
                if not self.__sources_modify_query_times[key]:
                    if start_date is not adaptive or end_date is not adaptive:
                        # if some of the end is specified, then it means
                        # we need to check whether it is worth to wrap into the
                        # modify_query_times
                        if start_date is adaptive:
                            if start is None:
                                start_date = sources_start
                            else:
                                start_date = start
                        if end_date is adaptive:
                            if end is None:
                                end_date = sources_end
                            else:
                                end_date = end
                        if start_date is not adaptive and end_date is not adaptive:
                            # it might happen when either sources_start/end are adaptive
                            # or start/end are adaptive
                            if (
                                (start is None and sources_start is not adaptive and start_date != sources_start)
                                or (start is not None and start_date != start)
                                or (end is None and sources_end is not adaptive and end_date != sources_end)
                                or (end is not None and end_date != end)
                            ):
                                self.__sources_modify_query_times[key] = True
                                mqt = otq.ModifyQueryTimes(
                                    start_time='parse_time(("'
                                               + common_format
                                               + '.%q"),"'
                                               + start_date.strftime(common_format + ".%f")
                                               + '", _TIMEZONE)',
                                    end_time='parse_time(("'
                                             + common_format
                                             + '.%q"),"'
                                             + end_date.strftime(common_format + ".%f")
                                             + '", _TIMEZONE)',
                                    output_timestamp="TIMESTAMP",
                                )
                                src.sink(mqt)
                if need_to_bind_symbol:
                    bound = None
                    if key in self.__sources_symbols:  # TODO: this is wrong, we need to know about symbols
                        #       it happens when we do not copy symbols when apply
                        #       merge with bound symbols.
                        #       Wrong, in that case merge with bound symbol is
                        #       non distinguishable from the manually passed None
                        #       for external queries
                        bound = self.__sources_symbols[key]
                        if isinstance(bound, _ManuallyBoundValue):
                            bound = bound.value
                    if bound and bound is not adaptive and bound is not adaptive_to_default:
                        src.__node.symbol(bound)
                    else:
                        # if key is not in __sources_symbols, then
                        # it means that symbol was not specified, and
                        # therefor use unbound symbol
                        if common_symbol is None:
                            if bound is adaptive_to_default:
                                src.__node.symbol(configuration.config.default_symbol)
                            else:
                                pass
                                # # TODO: write test validated this
                                #
                                # #  raise Exception("One of the branch does not have symbol specified")
                # --------------------------
                # glue _source with the main graph
                self.node().add_rules(src.node().copy_rules())
                self.source_by_key(src.node().copy_graph(), key)
                self._merge_tmp_otq(src)
            else:
                pass
        if start is None:
            start = sources_start
        if end is None:
            end = sources_end
        return start, end, common_symbol
    def _to_graph(self, add_passthrough=True):
        """
        Construct the graph. Only for internal usage.
        It is private, because it constructs the raw graph assuming that a graph
        is already defined, and might confuse an end user, because by default Source
        is not fully defined; it becomes fully defined only when symbols, start and
        end datetime are specified.
        """
        constructed_obj = self.copy()
        # we add it for case when the last EP has a pin output
        if add_passthrough:
            constructed_obj.sink(otq.Passthrough())
        return otq.GraphQuery(constructed_obj.node().get())
[docs]    def to_graph(self, raw=None, symbols=None, start=None, end=None, *, add_passthrough=True):
        """
        Construct an :py:class:`onetick.query.GraphQuery` object.
        Parameters
        ----------
        raw:
            .. deprecated:: 1.4.17 has no effect
        symbols:
            symbols query to add to otq.GraphQuery
        start: datetime
            start time of a query
        end: datetime
            end time of a query
        add_passthrough: bool
            add additional :py:class:`onetick.query.Passthrough` event processor to the end of a resulted graph
        Returns
        -------
        otq.GraphQuery
        See Also
        --------
        :meth:`render`
        """
        if raw is not None:
            warnings.warn('The "raw" flag is deprecated and makes not effect', DeprecationWarning)
        _obj, _start, _end, _symbols = self.__prepare_graph(symbols, start, end)
        if _obj._tmp_otq.queries:
            warnings.warn('Using .to_graph() for a Source object that uses sub-queries! '
                          'This operation is deprecated and is not guaranteed to work as expected. '
                          'Such a Source should be executed using otp.run() or saved to disk using to_otq()',
                          DeprecationWarning)
            _obj.sink(otq.Passthrough().output_pin_name('OUT_FOR_TO_GRAPH'))
            _graph = _obj._to_graph(add_passthrough=False)
            _graph.set_start_time(_start)
            _graph.set_end_time(_end)
            _graph.set_symbols(_symbols)
            query = _obj._tmp_otq.save_to_file(
                query=_graph, file_suffix='_to_graph.otq')
            query_path, query_name = query.split('::')
            query_params = get_query_parameter_list(query_path, query_name)
            source_with_nested_query = otp.Query(otp.query(query,
                                                           **{param: f'${param}' for param in query_params}),
                                                 out_pin='OUT_FOR_TO_GRAPH')
            return source_with_nested_query.to_graph(
                symbols=_symbols, start=_start, end=_end,
                add_passthrough=add_passthrough)
        else:
            return _obj._to_graph(add_passthrough=add_passthrough) 
[docs]    def render(self, **kwargs):
        """
        Renders a calculation graph using the ``graphviz`` library.
        Every node is the onetick query language event processor.
        Nodes in nested queries, first stage queries and eval queries are not shown.
        Could be useful for debugging and in jupyter to learn the underlying graph.
        Note that it's required to have :graphviz:`graphviz <>` package installed.
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data1, data2 = data[(data['X'] > 2)]
        >>> data = otp.merge([data1, data2])
        >>> data.render()  # doctest: +SKIP
        .. graphviz:: ../../static/render_example.dot
        """
        kwargs.setdefault('verbose', True)
        self._to_graph().render(**kwargs) 
[docs]    @inplace_operation
    def write(self,
              db: Union[str, 'otp.DB'],
              symbol: Union[str, 'otp.Column', None] = None,
              tick_type: Union[str, 'otp.Column', None] = None,
              date: Union[date, Type[adaptive], None] = adaptive,
              start: Optional[date] = None,
              end: Optional[date] = None,
              append: bool = True,
              keep_symbol_and_tick_type: Union[bool, Type[adaptive]] = adaptive,
              propagate: bool = True,
              out_of_range_tick_action: Literal['exception', 'ignore', 'load'] = 'exception',
              timestamp: Optional['otp.Column'] = None,
              keep_timestamp: bool = True,
              correction_type: Optional['otp.Column'] = None,
              replace_existing_time_series: bool = False,
              allow_concurrent_write: bool = False,
              context: Union[str, Type[adaptive]] = adaptive,
              use_context_of_query: bool = False,
              inplace: bool = False,
              **kwargs) -> Optional['Source']:
        """
        Saves data result to OneTick database.
        Note
        ----
        This method does not save anything. It adds instruction in query to save.
        Data will be saved when query will be executed.
        Using ``start``+``end`` parameters instead of single ``date`` have some limitations:
            * ``inplace`` is not supported
            * if ``DAY_BOUNDARY_TZ`` and ``DAY_BOUNDARY_OFFSET`` specified against
              individual locations of database, then day boundary could be calculated incorrectly.
            * ``out_of_range_tick_action`` could be only ``exception`` or ``ignore``
        Parameters
        ----------
        db: str or :py:class:`otp.DB <onetick.py.DB>`
            database name or object.
        symbol: str or Column
            resulting symbol name string or column to get symbol name from.
            If this parameter is not set, then ticks _SYMBOL_NAME pseudo-field is used.
            If it is empty, an attempt is made to retrieve
            the symbol name from the field named SYMBOL_NAME.
        tick_type: str or Column
            resulting tick type string or column to get tick type from.
            If this parameter is not set, the _TICK_TYPE pseudo-field is used.
            If it is empty, an attempt is made to retrieve
            the tick type from the field named TICK_TYPE.
        date: datetime or None
            date where to save data.
            Should be set to `None` if writing to accelerator or memory database.
            By default, it is set to `otp.config.default_date`.
        start: datetime or None
            Start date for data to save. It is inclusive.
            Cannot be used with ``date`` parameter.
            Also cannot be used with ``inplace`` set to ``True``.
            Should be set to `None` if writing to accelerator or memory database.
            By default, None.
        end: datetime or None
            End date for data to save. It is exclusive, so be sure to set
            it to the next day after the last day in data.
            Cannot be used with ``date`` parameter.
            Also cannot be used with ``inplace`` set to ``True``.
            Should be set to `None` if writing to accelerator or memory database.
            By default, None.
        append: bool
            If False - data will be rewritten for this ``date``
            or range of dates (from ``start`` to ``end``),
            otherwise data will be appended: new symbols are added,
            existing symbols can be modified (append new ticks, modify existing ticks).
            This option is not valid for accelerator databases.
        keep_symbol_and_tick_type: bool
            keep fields containing symbol name and tick type when writing ticks
            to the database or propagating them.
            By default, this parameter is adaptive.
            If ``symbol`` or ``tick_type`` are column objects, then it's set to True.
            Otherwise, it's set to False.
        propagate: bool
            Propagate ticks after that event processor or not.
        out_of_range_tick_action: str
            Action to be executed if tick's timestamp's date is not ``date`` or between ``start`` or ``end``:
                * `exception`: runtime exception will be raised
                * `ignore`: tick will not be written to the database
                * `load`: writes tick to the database anyway.
                  Can be used only with ``date``, not with ``start``+``end``.
            Default: `exception`
        timestamp: Column
            Field that contains the timestamp with which the ticks will be written to the database.
            By default, the TIMESTAMP pseudo-column is used.
        keep_timestamp: bool
            If ``timestamp`` parameter is set and this parameter is set to True,
            then timestamp column is removed.
        correction_type: Column
            The name of the column that contains the correction type.
            This column will be removed.
            If this parameter is not set, no corrections will be submitted.
        replace_existing_time_series: bool
            If ``append`` is set to True, setting this option to True instructs the loader
            to replace existing time series, instead of appending to them.
            Other time series will remain unchanged.
        allow_concurrent_write: bool
            Allows different queries running on the same server to load concurrently into the same database.
        context: str
            The server context used to look up the database.
            By default, `otp.config.context` is used if ``use_context_of_query`` is not set.
        use_context_of_query: bool
            If this parameter is set to True and the ``context`` parameter is not set,
            the context of the query is used instead of the default value of the ``context`` parameter.
        inplace: bool
            A flag controls whether operation should be applied inplace.
            If ``inplace=True``, then it returns nothing.
            Otherwise, method returns a new modified object.
            Cannot be ``True`` if ``start`` and ``end`` are set.
        kwargs:
            .. deprecated:: use named parameters instead.
        Returns
        -------
        :class:`Source` or None
        See also
        --------
        **WRITE_TO_ONETICK_DB** OneTick event processor
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data = data.write('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE')
        >>> otp.run(data)
                             Time  X
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.001  2
        2 2003-12-01 00:00:00.002  3
        >>> data = otp.DataSource('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE')
        >>> otp.run(data)
                             Time  X
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.001  2
        2 2003-12-01 00:00:00.002  3
        """
        if 'append_mode' in kwargs:
            warnings.warn("Parameter 'append_mode' is deprecated, use 'append'", DeprecationWarning)
            append = kwargs.pop('append_mode')
        if 'timestamp_field' in kwargs:
            warnings.warn("Parameter 'timestamp_field' is deprecated, use 'timestamp'", DeprecationWarning)
            timestamp = kwargs.pop('timestamp_field')
        if 'keep_timestamp_field' in kwargs:
            warnings.warn("Parameter 'keep_timestamp_field' is deprecated, use 'keep_timestamp'", DeprecationWarning)
            keep_timestamp = kwargs.pop('keep_timestamp_field')
        if kwargs:
            raise TypeError(f'write() got unexpected arguments: {list(kwargs)}')
        kwargs = {}
        if date is not adaptive and (start or end):
            raise ValueError('date cannot be used with start+end')
        if date is adaptive and (start and end) and inplace:
            # join_with_query and merge are used for multiple dates, so inplace is not supported
            raise ValueError('cannot run on multiple dates if inplace is True,'
                             ' use one value for date instead of start+end')
        if (start and not end) or (not start and end):
            raise ValueError('start and end should be both specified or both None')
        if date is adaptive:
            date = configuration.config.default_date
        if symbol is not None:
            if isinstance(symbol, _Column):
                kwargs['symbol_name_field'] = str(symbol)
                if keep_symbol_and_tick_type is adaptive:
                    keep_symbol_and_tick_type = True
            else:
                kwargs.setdefault('symbol_name_field', '_SYMBOL_NAME_FIELD_')
                self[kwargs['symbol_name_field']] = symbol
        if tick_type is not None:
            if isinstance(tick_type, _Column):
                kwargs['tick_type_field'] = str(tick_type)
                if keep_symbol_and_tick_type is adaptive:
                    keep_symbol_and_tick_type = True
            else:
                kwargs.setdefault('tick_type_field', '_TICK_TYPE_FIELD_')
                self[kwargs['tick_type_field']] = tick_type
        if keep_symbol_and_tick_type is adaptive:
            keep_symbol_and_tick_type = False
        if timestamp is not None:
            kwargs['timestamp_field'] = str(timestamp)
        if correction_type is not None:
            kwargs['correction_type_field'] = str(correction_type)
        if context is not adaptive:
            kwargs['context'] = context
        elif not use_context_of_query:
            kwargs['context'] = otp.config.context
        if out_of_range_tick_action.upper() == 'IGNORE':
            pass
        elif out_of_range_tick_action.upper() == 'LOAD':
            if start and end:
                raise ValueError('LOAD out_of_range_tick_action cannot be used with start+end, use date instead')
        elif out_of_range_tick_action.upper() == 'EXCEPTION':
            if start and end:
                # WRITE_TO_ONETICK_DB use DAY_BOUNDARY_TZ and DAY_BOUNDARY_OFFSET
                # to check tick timestamp is out of range or not
                # so we mimic it here with THROW event processor
                src = otp.Source(otq.DbShowConfig(str(db), 'DB_TIME_INTERVALS'))
                src.table(inplace=True,
                          DAY_BOUNDARY_TZ=str,
                          DAY_BOUNDARY_OFFSET=int)
                # DAY_BOUNDARY_OFFSET offset are in seconds
                src['DAY_BOUNDARY_OFFSET'] = src['DAY_BOUNDARY_OFFSET'] * 1000
                src.rename({
                    'DAY_BOUNDARY_TZ': '__DAY_BOUNDARY_TZ',
                    'DAY_BOUNDARY_OFFSET': '__DAY_BOUNDARY_OFFSET'
                }, inplace=True)
                self = self.join_with_query(src,
                                            symbol=f"{str(db)}::DUMMY",
                                            caching='per_symbol')
                start_formatted = start.strftime('%Y-%m-%d')
                end_formatted = end.strftime('%Y-%m-%d')
                convert_timestamp = self['TIMESTAMP'].dt.strftime('%Y%m%d%H%M%S.%J', timezone=self['__DAY_BOUNDARY_TZ'])
                start_op = (
                    otp.dt(start).to_operation(timezone=self['__DAY_BOUNDARY_TZ']) + self['__DAY_BOUNDARY_OFFSET']
                )
                self.throw(
                    where=(self['TIMESTAMP'] < start_op),
                    message=('Timestamp ' + convert_timestamp + ' of a tick, visible or hidden, ' +
                             f'earlier than {start_formatted} in timezone ' + self['__DAY_BOUNDARY_TZ']),
                    inplace=True,
                )
                end_op = otp.dt(end).to_operation(timezone=self['__DAY_BOUNDARY_TZ']) + self['__DAY_BOUNDARY_OFFSET']
                self.throw(
                    where=(self['TIMESTAMP'] >= end_op),
                    message=('Timestamp ' + convert_timestamp + ' of a tick, visible or hidden, ' +
                             f'later than {end_formatted} in timezone ' + self['__DAY_BOUNDARY_TZ']),
                    inplace=True,
                )
        else:
            raise ValueError(f'Unknown out_of_range_tick_action: {out_of_range_tick_action}.'
                             ' Possible values are: "ignore", "exception"')
        branches = []
        if propagate:
            branches = [self]
        kwargs = dict(
            **kwargs,
            database=str(db),
            append_mode=append,
            keep_symbol_name_and_tick_type=keep_symbol_and_tick_type,
            keep_timestamp_field=keep_timestamp,
            replace_existing_time_series=replace_existing_time_series,
            allow_concurrent_write=allow_concurrent_write,
            use_context_of_query=use_context_of_query,
        )
        if start and end:
            days = (end - start).days + 1
            for i in range(days):
                branch = self.copy()
                branch.sink(
                    otq.WriteToOnetickDb(
                        date=(start + otp.Day(i)).strftime('%Y%m%d'),
                        propagate_ticks=False,
                        out_of_range_tick_action='IGNORE',
                        **kwargs
                    )
                )
                branches.append(branch)
            self = otp.merge(branches)
        else:
            self.sink(
                otq.WriteToOnetickDb(
                    date=date.strftime('%Y%m%d') if date else '',  # type: ignore[union-attr]
                    propagate_ticks=propagate,
                    out_of_range_tick_action=out_of_range_tick_action.upper(),
                    **kwargs
                )
            )
        for col in ('_SYMBOL_NAME_FIELD_', '_TICK_TYPE_FIELD_'):
            if col in self.schema:
                self.drop(col, inplace=True)
        to_drop: Set[str] = set()
        if not keep_symbol_and_tick_type:
            to_drop.update((kwargs['symbol_name_field'], kwargs['tick_type_field']))
        if not keep_timestamp and timestamp is not None and str(timestamp) not in {'Time', 'TIMESTAMP'}:
            to_drop.add(str(timestamp))
        if correction_type is not None:
            to_drop.add(str(correction_type))
        self.schema.set(**{
            k: v for k, v in self.schema.items()
            if k not in to_drop
        })
        return self 
[docs]    def copy(self, ep=None, columns=None, deep=False) -> 'Source':
        """
        Build an object with copied calculation graph.
        Every node of the resulting graph has the same id as in the original. It means that
        if the original and copied graphs are merged or joined together further then all common
        nodes (all that created before the .copy() method) will be glued.
        For example, let's imagine that you have the following calculation graph ``G``
        .. graphviz::
           digraph {
             rankdir="LR";
             A -> B;
           }
        where ``A`` is a source and ``B`` is some operation on it.
        Then we copy it to the ``G'`` and assign a new operation there
        .. graphviz::
           digraph {
             rankdir="LR";
             A -> B -> C;
           }
        After that we decided to merge ``G`` and ``G'``. The resulting calculation graph will be:
        .. graphviz::
           digraph {
             rankdir="LR";
             A -> B -> C -> MERGE;
             B -> MERGE;
           }
        Please use the :meth:`Source.deepcopy` if you want to get the following calculation graph after merges and joins
        .. graphviz::
           digraph {
             rankdir="LR";
             A -> B -> C -> MERGE;
             "A'" -> "B'" -> "C'" -> MERGE;
           }
        Returns
        -------
        Source
        See Also
        --------
        Source.deepcopy
        """
        if columns is None:
            columns = self.columns()
        if ep:
            result = self.__class__(node=ep, **columns)
            result.source(self.node().copy_graph())
            # we need to clean it, because ep is not a _source
            result._clean_sources_dates()
        else:
            result = self.__class__(node=self.node(), **columns)
        result.node().add_rules(self.node().copy_rules(deep=deep))
        result._set_sources_dates(self)
        if deep:
            # generating all new uuids for node history and for sources
            # after they were initialized
            keys = defaultdict(uuid.uuid4)  # type: ignore
            result.node().rebuild_graph(keys)
            result._change_sources_keys(keys)
        # add state
        result._copy_state_vars_from(self)
        result._tmp_otq = self._tmp_otq.copy()
        result.__name = self.__name     #noqa
        result._copy_properties_from(self)
        return result 
[docs]    def deepcopy(self, ep=None, columns=None) -> 'onetick.py.Source':
        """
        Copy all graph and change ids for every node.
        More details could be found in :meth:`Source.copy`
        See Also
        --------
        Source.copy
        """
        return self.copy(ep, columns, deep=True) 
    def _copy_properties_from(self, obj):
        # needed if we are doing copy of a child with custom properties
        for attr in set(self.__class__._PROPERTIES) - set(Source._PROPERTIES):
            setattr(self, attr, getattr(obj, attr))
    def _copy_state_vars_from(self, objs):
        self.__dict__["_state_vars"] = StateVars(self, objs)
[docs]    def split(self, expr, cases, default=False) -> Tuple['Source', ...]:
        """
        The method splits data using passed expression ``expr`` for several
        outputs by passed ``cases``. The method is the alias for the :meth:`Source.switch`
        Parameters
        ----------
        expr : Operation
            column or column based expression
        cases : list
            list of values or :py:class:`onetick.py.range` objects to split by
        default : bool
            ``True`` adds the default output
        Returns
        -------
        Outputs according to passed cases, number of outputs is number of cases plus one if ``default=True``
        See also
        --------
        | :meth:`Source.switch`
        | **SWITCH** OneTick event processor
        Examples
        --------
        >>> # OTdirective: snippet-name: Source operations.split;
        >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4])
        >>> r1, r2, r3 = data.split(data['X'], [otp.nan, otp.range(0, 100)], default=True)
        >>> otp.run(r1)
                         Time   X
        0 2003-12-01 00:00:00.002 NaN
        >>> otp.run(r2)
                             Time     X
        0 2003-12-01 00:00:00.000  0.33
        1 2003-12-01 00:00:00.003  9.40
        >>> otp.run(r3)
                             Time    X
        0 2003-12-01 00:00:00.001 -5.1
        See Also
        --------
        Source.switch
        :py:class:`onetick.py.range`
        """
        output_num = len(cases)
        # format cases
        def to_str(v):
            if isinstance(v, onetick.py.utils.range):
                return "[" + str(v.start) + "," + str(v.stop) + "]"
            elif isinstance(v, str):
                return '"' + v + '"'
            elif isinstance(v, tuple):
                return ",".join(map(to_str, list(v)))
            return str(v)
        cases = [f"{to_str(cases[inx])}:OUT{inx}" for inx in range(output_num)]
        # create ep
        params = dict(switch=str(expr), cases=";".join(cases))
        if default:
            params["default_output"] = "DEF_OUT"
        switch = self.copy(ep=otq.SwitchEp(**params))
        # construct results
        result = []
        for inx in range(output_num):
            res = switch.copy()
            res.node().out_pin(f"OUT{inx}")
            res.sink(otq.Passthrough())
            result.append(res)
        if default:
            res = switch.copy()
            res.node().out_pin("DEF_OUT")
            res.sink(otq.Passthrough())
            result.append(res)
        return tuple(result) 
[docs]    def switch(self, expr, cases, default=False) -> Tuple['Source', ...]:
        """
        The method splits data using passed expression for several
        outputs by passed cases. This method is an alias for
        :meth:`Source.split` method.
        Parameters
        ----------
        expr : Operation
            column or column based expression
        cases : list
            list of values or :py:class:`onetick.py.range` objects to split by
        default : bool
            ``True`` adds the default output
        Returns
        -------
        Outputs according to passed cases, number of outputs is number of cases plus one if ``default=True``
        See also
        --------
        | :meth:`Source.split`
        | **SWITCH** OneTick event processor
        Examples
        --------
        >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4])
        >>> r1, r2, r3 = data.switch(data['X'], [otp.nan, otp.range(0, 100)], default=True)
        >>> otp.run(r1)
                         Time   X
        0 2003-12-01 00:00:00.002 NaN
        >>> otp.run(r2)
                             Time     X
        0 2003-12-01 00:00:00.000  0.33
        1 2003-12-01 00:00:00.003  9.40
        >>> otp.run(r3)
                             Time    X
        0 2003-12-01 00:00:00.001 -5.1
        See Also
        --------
        Source.split
        :py:class:`onetick.py.range`
        """
        return self.split(expr, cases, default) 
    def columns(self, skip_meta_fields=False):
        """
        Return columns in data source
        Parameters
        ----------
        skip_meta_fields: bool, default=False
            do not add meta fields
        Returns
        -------
        dict
        """
        result = {}
        for key, value in self.__dict__.items():
            if skip_meta_fields and key in self.__class__.meta_fields:
                continue
            if key in self.__class__._PROPERTIES:
                continue
            if isinstance(value, _Column):
                result[value.name] = value.dtype
        return result
    def drop_columns(self):
        """
        Method removes all columns in the python representation, but don't
        drop columns on the data.
        It is used when external query is applied, because we don't know how
        data schema has changed.
        """
        items = []
        for key, value in self.__dict__.items():
            if key in self.__class__.meta_fields or key in self.__class__._PROPERTIES:
                continue
            if isinstance(value, _Column):
                items.append(key)
        for item in items:
            del self.__dict__[item]
    def node(self):
        return self.__node
    def tick_type(self, tt):
        self.__node.tick_type(tt)
        return self
    def symbol(self, symbol):
        """
        Apply symbol to graph
        .. deprecated:: 1.3.31
        """
        warnings.warn("symbol method is deprecated, please specify symbol during creation", DeprecationWarning)
        self.__node.symbol(symbol)
        return self
    def node_name(self, name=None, key=None):
        return self.__node.node_name(name, key)
    def __add__(self, other) -> 'Source':
        return onetick.py.functions.merge([self, other])
[docs]    @inplace_operation
    def table(self, inplace=False, strict: bool = True, **schema) -> Optional['Source']:
        """
        Set the OneTick and python schemas levels according to the ``schema``
        parameter. The ``schema`` should contain either (field_name -> type) pairs
        or (field_name -> default value) pairs; ``None`` means no specified type, and
        OneTick considers it's as a double type.
        Resulting ticks have the same order as in the ``schema``. If only partial fields
        are specified (i.e. when the ``strict=False``) then fields from the ``schema`` have
        the most left position.
        Parameters
        ----------
        inplace: bool
            The flag controls whether operations should be applied inplace
        strict: bool
            If set to ``False``, all fields present in an input tick will be present in the output tick.
            If ``True``, then only fields specified in the ``schema``.
        schema:
            field_name -> type or field_name -> default value pairs that should be applied on the source.
        Returns
        -------
        :class:`Source` or ``None``
        See Also
        --------
        | :attr:`Source.schema`
        | :meth:`__getitem__`: the table shortcut
        | **TABLE** OneTick event processor
        Examples
        --------
        Selection case
        >>> data = otp.Ticks(X1=[1, 2, 3],
        ...                  X2=[3, 2, 1],
        ...                  A1=["A", "A", "A"])
        >>> data = data.table(X2=int, A1=str)   # OTdirective: snippet-name: Arrange.set schema;
        >>> otp.run(data)
                             Time  X2 A1
        0 2003-12-01 00:00:00.000   3  A
        1 2003-12-01 00:00:00.001   2  A
        2 2003-12-01 00:00:00.002   1  A
        Defining default values case (note the order)
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data = data.table(Y=0.5, strict=False)
        >>> otp.run(data)
                             Time   Y   X
        0 2003-12-01 00:00:00.000  0.5  1
        1 2003-12-01 00:00:00.001  0.5  2
        2 2003-12-01 00:00:00.002  0.5  3
        """
        def is_time_type_or_nsectime(obj):
            return ott.is_time_type(obj) or isinstance(obj, ott.nsectime)
        def transformer(name, obj):
            if obj is None:
                return name
            res = f'{ott.type2str(ott.get_object_type(obj))} {name}'
            if not isinstance(obj, Type) and not is_time_type_or_nsectime(obj):
                res += f' ({ott.value2str(obj)})'
            return res
        def get_type(value):
            if value is None:
                return float
            return ott.get_object_type(value)
        for c_name in list(schema.keys()):
            if c_name in self.__class__.meta_fields:
                # meta fields should not be propagated to Table ep
                # otherwise new user-defined field with the same name will appear in schema
                # and using this field will raise an "ambiguous use" error in OneTick
                warnings.warn(f"Meta field '{c_name}' should not be used in .table() method.")
                schema.pop(c_name)
        if not schema:
            return self
        schema_to_set = {c_name: get_type(c_value)
                         for c_name, c_value in schema.items()}
        if strict:
            self.schema.set(**schema_to_set)
        else:
            self.schema.update(**schema_to_set)
        fields = ','.join([transformer(c_name, c_value)
                           for c_name, c_value in schema.items()])
        self.sink(otq.Table(fields=fields, keep_input_fields=not strict))
        for c_name, c_value in schema.items():
            # datetime and nsetime values require onetick built-in functions to be initialized
            # but built-in functions can't be used in table ep so updating columns after the table
            if is_time_type_or_nsectime(c_value):
                self.update({c_name: c_value}, where=self[c_name] == 0, inplace=True)
        self._fix_varstrings()
        return self 
    def _fix_varstrings(self):
        """
        PY-556: converting to varstring results in string with null-characters
        """
        varstring_columns = {
            name: self[name]
            for name, dtype in self.schema.items()
            if dtype is ott.varstring
        }
        # just updating the column removes null-characters
        if varstring_columns:
            self.update(varstring_columns, inplace=True)
[docs]    @inplace_operation
    def drop(self, columns: List[Any], inplace=False) -> Optional['Source']:
        r"""
        Remove a list of columns from the Source.
        If column with such name wasn't found the error will be raised, if the regex was specified and there aren't
        any matched columns, do nothing.
        Regex is any string containing any of characters ***+?\:[]{}()**,
        dot is a valid symbol for OneTick identifier,
        so **a.b** will be passed to OneTick as an identifier,
        if you want specify such regex use parenthesis - **(a.b)**
        Parameters
        ----------
        columns : str, Column or list of them
            Column(s) to remove. You could specify a regex or collection of regexes, in such case columns with
            match names will be deleted.
        inplace: bool
            A flag controls whether operation should be applied inplace.
            If inplace=True, then it returns nothing. Otherwise method
            returns a new modified object.
        Returns
        ----------
        :class:`Source` or ``None``
        See Also
        --------
        **PASSTHROUGH** OneTick event processor
        Examples
        --------
        >>> data = otp.Ticks(X1=[1, 2, 3],
        ...                  X2=[3, 2, 1],
        ...                  A1=["A", "A", "A"])
        >>> data = data.drop("X1")  # OTdirective: snippet-name: Arrange.drop.one field;
        >>> otp.run(data)
                             Time  X2 A1
        0 2003-12-01 00:00:00.000   3  A
        1 2003-12-01 00:00:00.001   2  A
        2 2003-12-01 00:00:00.002   1  A
        Regexes also could be specified in such case all matched columns will be deleted
        >>> data = otp.Ticks(X1=[1, 2, 3],
        ...                  X2=[3, 2, 1],
        ...                  A1=["A", "A", "A"])
        >>> data = data.drop(r"X\d+")   # OTdirective: snippet-name: Arrange.drop.regex;
        >>> otp.run(data)
                             Time A1
        0 2003-12-01 00:00:00.000  A
        1 2003-12-01 00:00:00.001  A
        2 2003-12-01 00:00:00.002  A
        Several parameters can be specified
        >>> # OTdirective: snippet-name: Arrange.drop.multiple;
        >>> data = otp.Ticks(X1=[1, 2, 3],
        ...                  X2=[3, 2, 1],
        ...                  Y1=[1, 2, 3],
        ...                  Y2=[3, 2, 1],
        ...                  YA=["a", "b", "c"],
        ...                  A1=["A", "A", "A"])
        >>> data = data.drop([r"X\d+", "Y1", data["Y2"]])
        >>> otp.run(data)
                             Time YA A1
        0 2003-12-01 00:00:00.000  a  A
        1 2003-12-01 00:00:00.001  b  A
        2 2003-12-01 00:00:00.002  c  A
        **a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)**
        >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1],
        ...                   "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]})
        >>> data = data.drop("COLUMN.A")    # OTdirective: skip-snippet:;
        >>> otp.run(data)
                             Time  COLUMN1A COLUMN1B COLUMN2A
        0 2003-12-01 00:00:00.000         3        a        c
        1 2003-12-01 00:00:00.001         2        b        b
        2 2003-12-01 00:00:00.002         1        c        a
        >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1],
        ...                   "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]})
        >>> data = data.drop("(COLUMN.A)")  # OTdirective: skip-snippet:;
        >>> otp.run(data)
                             Time COLUMN1B
        0 2003-12-01 00:00:00.000        a
        1 2003-12-01 00:00:00.001        b
        2 2003-12-01 00:00:00.002        c
        """
        self.__delitem__(columns)
        return self 
[docs]    @inplace_operation
    def dropna(self,
               how: Literal["any", "all"] = "any",
               subset: Optional[List[Any]] = None,
               inplace=False) -> Optional['Source']:
        """
        Drops ticks that contain NaN values according to the policy in the ``how`` parameter
        Parameters
        ----------
        how: "any" or "all"
            ``any`` - filters out ticks if at least one field has NaN value
            ``all`` - filters out ticks if all fields have NaN values.
        subset: list of str
            list of columns to check for NaN values. If ``None`` then all columns are checked.
        inplace: bool
            the flag controls whether operation should be applied inplace.
        Returns
        -------
        :class:`Source` or ``None``
        Examples
        --------
        Drop ticks where **at least one** field has ``nan`` value.
        >>> data = otp.Ticks([[     'X',     'Y'],
        ...                   [     0.0,     1.0],
        ...                   [ otp.nan,     2.0],
        ...                   [     4.0, otp.nan],
        ...                   [ otp.nan, otp.nan],
        ...                   [     6.0,    7.0]])
        >>> data = data.dropna()
        >>> otp.run(data)[['X', 'Y']]
            X   Y
        0 0.0 1.0
        1 6.0 7.0
        Drop ticks where **all** fields have ``nan`` values.
        >>> data = otp.Ticks([[     'X',     'Y'],
        ...                   [     0.0,     1.0],
        ...                   [ otp.nan,     2.0],
        ...                   [     4.0, otp.nan],
        ...                   [ otp.nan, otp.nan],
        ...                   [     6.0,    7.0]])
        >>> data = data.dropna(how='all')
        >>> otp.run(data)[['X', 'Y']]
            X   Y
        0 0.0 1.0
        1 NaN 2.0
        2 4.0 NaN
        3 6.0 7.0
        Drop ticks where **all** fields in **subset** of columns have ``nan`` values.
        >>> data = otp.Ticks([[     'X',     'Y',    'Z'],
        ...                   [     0.0,     1.0, otp.nan],
        ...                   [ otp.nan,     2.0, otp.nan],
        ...                   [     4.0, otp.nan, otp.nan],
        ...                   [ otp.nan, otp.nan, otp.nan],
        ...                   [     6.0,     7.0, otp.nan]])
        >>> data = data.dropna(how='all', subset=['X', 'Y'])
        >>> otp.run(data)[['X', 'Y', 'Z']]
            X   Y   Z
        0 0.0 1.0 NaN
        1 NaN 2.0 NaN
        2 4.0 NaN NaN
        3 6.0 7.0 NaN
        """
        if how not in ["any", "all"]:
            raise ValueError(f"It is expected to see 'any' or 'all' values for 'how' parameter, but got '{how}'")
        condition = None
        columns = self.columns(skip_meta_fields=True)
        if subset is not None:
            for column_name in subset:
                if column_name not in columns:
                    raise ValueError(f"There is no '{column_name}' column in the source")
                if columns[column_name] is not float:
                    raise ValueError(f"Column '{column_name}' is not float type")
        for column_name, dtype in columns.items():
            if subset is not None and column_name not in subset:
                continue
            if dtype is float:
                if condition is None:
                    condition = self[column_name] != ott.nan
                else:
                    if how == "any":
                        condition &= self[column_name] != ott.nan
                    elif how == "all":
                        condition |= self[column_name] != ott.nan
        self.sink(otq.WhereClause(where=str(condition)))
        return self 
    def __delitem__(self, obj):
        if isinstance(obj, list) or isinstance(obj, tuple):
            objs = obj
        else:
            objs = (obj,)
        items_to_passthrough, regex = self._columns_names_regex(objs, drop=True)
        self.sink(otq.Passthrough(drop_fields=True, fields=",".join(items_to_passthrough), use_regex=regex))
    def _validate_columns_names(self, names_of_columns):
        # TODO: is it used anywhere else?
        #  we definitely have the same logic of checking columns somewhere else too, need to refactor
        for item in names_of_columns:
            if item not in self.__dict__ or not isinstance(self.__dict__[item], _Column):
                raise AttributeError(f"There is no '{item}' column")
    def _columns_names_regex(self, objs: Tuple[Union[_Column, str]], drop: bool = False) -> Tuple[List[str], bool]:
        """
        We can't be sure python Source has consistent columns cache, because sinking complex event processors
        can change columns unpredictable, so if user will specify regex as a param, we will pass regex
        as an onetick's param, but pass or delete all matched columns from python Source cache.
        Parameters
        ----------
        objs:
            Tuple of _Columns or string to pass or drop. String can be regex.
        drop: bool
            To drop columns from python schema or not.
        Returns
        -------
        items_to_passthrough:
            Items to pass to Passthrough as `field` parameter.
        regex:
            Value to pass to Passthrough as `use_regex` parameter.
        """
        items_to_passthrough = []
        names_of_columns = []
        regex = False
        for obj in objs:
            if isinstance(obj, _Column):
                name: str = obj.name
                items_to_passthrough.append(name)
                names_of_columns.append(name)
            elif isinstance(obj, str):
                items_to_passthrough.append(obj)
                if any(c in r"*+?\:[]{}()^$" for c in obj):  # it is a possible regex
                    regex = True
                    names_of_columns.extend(col for col in self.columns() if re.match(obj, col))
                else:
                    names_of_columns.append(obj)
            else:
                raise TypeError(f"It is not supported to select or delete item '{obj}' of type '{type(obj)}'")
        # remove duplications and meta_fields
        names_of_columns: set[str] = set(names_of_columns) - set(self.__class__.meta_fields)  # type: ignore[no-redef]
        self._validate_columns_names(names_of_columns)
        if drop:
            for item in names_of_columns:
                del self.__dict__[item]
        return items_to_passthrough, regex
    def __from_ep_to_proxy(self, ep):
        in_pin, out_pin = None, None
        if isinstance(ep, otq.graph_components.EpBase.PinnedEp):
            if hasattr(ep, "_output_name"):
                out_pin = getattr(ep, "_output_name")
            else:
                in_pin = getattr(ep, "_input_name")
            ep = getattr(ep, "_ep")
        return ep, uuid.uuid4(), in_pin, out_pin
[docs]    def sink(self, ep, out_pin=None, move_node=True):
        """
        Appends node inplace to the source.
        Connect `out_pin` of this source to `ep`.
        Can be used to connect onetick.query objects to onetick.py source.
        Data schema changes (added or deleted columns) will not be detected automatically
        after applying `sink` function, so the user must change the schema himself
        by updating `schema` property.
        Parameters
        ----------
        ep: otq.graph_components.EpBase,\
            otq.graph_components.EpBase.PinnedEp,\
            Tuple[otq.graph_components.EpBase, uuid.uuid4, Optional[str], Optional[str]]
            onetick.query EP object to append to source.
        out_pin: Optional[str], default=None
            name of the out pin to connect to `ep`
        move_node: bool, default=True
        Returns
        ----------
        result: otq.graph_components.EpBase, otq.graph_components.EpBase.PinnedEp
            Last node of the source
        See Also
        --------
        onetick.py.Source.schema
        onetick.py.core._source.schema.Schema
        Examples
        --------
        Adding column 'B' directly with onetick.query EP.
        >>> data = otp.Tick(A=1)
        >>> data.sink(otq.AddField(field='B', value=2)) # OTdirective: skip-snippet:;
        AddField(field='B',value=2)
        >>> otp.run(data) # OTdirective: skip-snippet:;
                Time  A  B
        0 2003-12-01  1  2
        But we can't use this column with onetick.py methods yet.
        >>> data['C'] = data['B'] # OTdirective: skip-snippet:; # doctest: +ELLIPSIS
        Traceback (most recent call last):
         ...
        AttributeError: There is no 'B' column
        We should manually change source's schema
        >>> data.schema.update(B=int) # OTdirective: skip-snippet:;
        >>> data['C'] = data['B']
        >>> otp.run(data)
                Time  A  B  C
        0 2003-12-01  1  2  2
        """
        if not (
            issubclass(type(ep), otq.graph_components.EpBase)
            or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp)
            or type(ep) is tuple
        ):
            raise Exception("sinking is allowed only for EpBase instances")
        if type(ep) is tuple:
            # for already existed EP fetched from _ProxyNode
            return self.__node.sink(out_pin, *ep, move_node)
        else:
            return self.__node.sink(out_pin, *self.__from_ep_to_proxy(ep), move_node) 
    def __rshift__(self, ep):
        """ duplicates sink() method, but returns new object """
        new_source = self.copy()
        new_source.sink(ep)
        return new_source
    def __irshift__(self, ep):
        """ duplicates sink() method, but assigns source new object """
        new_source = self.copy()
        new_source.sink(ep)
        return new_source
    def source(self, ep, in_pin=None):
        """ Add node as source to root node """
        if not (
            issubclass(type(ep), otq.graph_components.EpBase)
            or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp)
            or type(ep) is tuple
        ):
            raise Exception("sourcing is allowed only for EpBase instances")
        if type(ep) is tuple:
            # for already existed EP fetched from _ProxyNode
            return self.__node.source(in_pin, *ep)
        else:
            return self.__node.source(in_pin, *self.__from_ep_to_proxy(ep))
    def source_by_key(self, ep, to_key):
        """ Add node as source to graph node by key"""
        if not (
            issubclass(type(ep), otq.graph_components.EpBase)
            or issubclass(type(ep), otq.graph_components.EpBase.PinnedEp)
            or type(ep) is tuple
        ):
            raise Exception("sourcing is allowed only for EpBase instances")
        if type(ep) is tuple:
            # for already existed EP fetched from _ProxyNode
            return self.__node.source_by_key(to_key, *ep)
        else:
            return self.__node.source_by_key(to_key, *self.__from_ep_to_proxy(ep))
    def apply_query(self, query, in_pin="IN", out_pin="OUT", **params):
        """
        Apply data source to query
        .. deprecated:: 1.3.77
        See Also
        --------
        Source.apply
        """
        warnings.warn(
            'The "apply_query" method is deprecated. Please, use the .apply method instead or '
            "call a reference queries directly.",
            DeprecationWarning
        )
        res = onetick.py.functions.apply_query(query, {in_pin: self}, [out_pin], **params)
        res.node().out_pin(out_pin)
        return res
[docs]    def apply(self, obj) -> Union['otp.Column', 'Source']:
        """
        Apply object to data source.
        Parameters
        ----------
        obj: onetick.py.query, Callable, type, onetick.query.GraphQuery
            - `onetick.py.query` allows to apply external nested query
            - python `Callable` allows to translate python code to similar OneTick's CASE expression.
              There are some limitations to which python operators can be used in this callable.
              See :ref:`Python callables parsing guide <python callable parser>` article for details.
              In :ref:`Remote OTP with Ray<ray-remote>` any `Callable` must be decorated with `@otp.remote` decorator,
              see :ref:`Ray usage examples<apply-remote-context>` for details.
            - `type` allows to apply default type conversion
            - `onetick.query.GraphQuery` allows to apply a build onetick.query.Graph
        Returns
        -------
        Column, Source
        Examples
        --------
        Apply external query to a tick flow. In this case it assumes that query has
        only one input and one output. Check the :class:`query` examples if you
        want to use a query with multiple inputs or outputs.
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> external_query = otp.query('update.otq')
        >>> data = data.apply(external_query)
        >>> otp.run(data)
                             Time  X
        0 2003-12-01 00:00:00.000  2
        1 2003-12-01 00:00:00.001  4
        2 2003-12-01 00:00:00.002  6
        Apply a predicate to a column / operation.
        In this case value passed to a predicate is column values.
        Result is a column.
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data['Y'] = data['X'].apply(lambda x: x * 2)
        >>> otp.run(data)
                             Time  X  Y
        0 2003-12-01 00:00:00.000  1  2
        1 2003-12-01 00:00:00.001  2  4
        2 2003-12-01 00:00:00.002  3  6
        Another example of applying more sophisticated operation
        >>> data = otp.Ticks(X=[1, 2, 3])
        >>> data['Y'] = data['X'].apply(lambda x: 1 if x > 2 else 0)
        >>> otp.run(data)
                             Time  X  Y
        0 2003-12-01 00:00:00.000  1  0
        1 2003-12-01 00:00:00.001  2  0
        2 2003-12-01 00:00:00.002  3  1
        Example of applying a predicate to a Source. In this case value passed
        to a predicate is a whole tick. Result is a column.
        >>> data = otp.Ticks(X=[1, 2, 3], Y=[.5, -0.4, .2])
        >>> data['Z'] = data.apply(lambda tick: 1 if abs(tick['X'] * tick['Y']) > 0.5 else 0)
        >>> otp.run(data)
                             Time  X     Y  Z
        0 2003-12-01 00:00:00.000  1   0.5  0
        1 2003-12-01 00:00:00.001  2  -0.4  1
        2 2003-12-01 00:00:00.002  3   0.2  1
        See Also
        --------
        :py:class:`onetick.py.query`
        :py:meth:`onetick.py.Source.script`
        :ref:`Python callables parsing guide <python callable parser>`
        """
        if isinstance(obj, onetick.py.sources.query):
            graph = obj.graph_info
            if len(graph.nested_inputs) != 1:
                raise Exception(
                    f'It is expected the query "{obj.query_name}" to have one input, but it '
                    f"has {len(graph.nested_inputs)}"
                )
            if len(graph.nested_outputs) > 1:
                raise Exception(
                    f'It is expected the query "{obj.query_name}" to have one or no output, but it '
                    f"has {len(graph.nested_outputs)}"
                )
            in_pin = graph.nested_inputs[0].NESTED_INPUT
            if len(graph.nested_outputs) == 0:
                out_pin = None  # means no output
            else:
                out_pin = graph.nested_outputs[0].NESTED_OUTPUT
            return obj(**{in_pin: self})[out_pin]
        elif isinstance(obj, otq.GraphQuery):
            tmp_file = utils.TmpFile(suffix=".otq")
            obj.save_to_file(
                tmp_file.path,
                "graph_query_to_apply",
                # start and end times don't matter for this query, use some constants
                start=db_constants.DEFAULT_START_DATE,
                end=db_constants.DEFAULT_END_DATE,
            )
            query_info = get_query_info(tmp_file.path)
            if len(query_info.sources) != 1:
                raise Exception(
                    "It is expected the query to have one input, but it " f"has {len(query_info.sources)}"
                )
            if len(query_info.sinks) != 1:
                raise Exception("It is expected the query to have one output, but it " f"has {len(query_info.sinks)}")
            add_pins(
                tmp_file.path,
                "graph_query_to_apply",
                [(query_info.sources[0], 1, "IN"), (query_info.sinks[0], 0, "OUT")],
            )
            return onetick.py.sources.query(tmp_file.path + "::graph_query_to_apply")(IN=self)["OUT"]
        return apply_lambda(obj, _EmulateObject(self)) 
[docs]    def dump(self,
             label: Optional[str] = None,
             where: Optional['otp.Operation'] = None,
             columns: Union[str, Tuple[str], List[str], None] = None,
             callback: Optional[Callable] = None):
        """
        Dumps the ``columns`` from ticks into std::out in runtime if they fit the ``where`` condition.
        Every dump has a corresponding header that always includes the TIMESTAMP field. Other fields
        could be configured using the ``columns`` parameter. A header could be augmented with a ``label`` parameter;
        this label is an addition column that helps to distinguish ticks
        from multiple dumps with the same schema, because ticks from different dumps could be mixed.
        It might happen because of the OneTick multithreading, and there is the operating system
        buffer between the OneTick and the actual output.
        This method is helpful for debugging.
        Parameters
        ----------
        label: str
            A label for a dump. It adds a special column **_OUT_LABEL_** for all ticks and set to the
            specified value. It helps to distinguish ticks from multiple dumps, because actual
            output could contain mixed ticks due the concurrency. ``None`` means no label.
        where: Operation
            A condition that allows to filter ticks to dump. ``None`` means no filtering.
        columns: str, tupel or list
            List of columns that should be in the output. ``None`` means dump all columns.
        callback : callable
            Callable, which preprocess source before printing.
        See also
        --------
        **WRITE_TEXT** OneTick event processor
        Examples
        --------
        >>> # OTdirective: skip-snippet:;
        >>> data.dump(label='Debug point', where=data['PRICE'] > 99.3, columns=['PRICE', 'QTY'])    # doctest: +SKIP
        >>> data.dump(columns="X", callback=lambda x: x.first(), label="first")     # doctest: +SKIP
        """
        self_c = self.copy()
        if callback:
            self_c = callback(self_c)
        if where is not None:  # can't be simplified because the _Operation overrides __bool__
            self_c, _ = self_c[(where)]
        if columns:
            self_c = self_c[columns if isinstance(columns, (list, tuple)) else [columns]]
        if label:
            self_c['_OUT_LABEL_'] = label
        self_c.sink(otq.WriteText(formats_of_fields='TIMESTAMP=%|' + configuration.config.tz + '|%d-%m-%Y %H:%M:%S.%J',
                                  prepend_timestamp=False,
                                  prepend_symbol_name=False,
                                  propagate_ticks=True))
        # print <no data> in case there are 0 ticks
        self_c = self_c.agg({'COUNT': otp.agg.count()})
        self_c, _ = self_c[self_c['COUNT'] == 0]
        self_c['NO_DATA'] = '<no data>'
        self_c = self_c[['NO_DATA']]
        self_c.sink(otq.WriteText(output_headers=False,
                                  prepend_timestamp=False,
                                  prepend_symbol_name=False,
                                  propagate_ticks=True))
        # Do not propagate ticks then, because we want just to write them into
        # the std::out. We have to do that, because otherwise these ticks would
        # go to a query output, that would mess real output.
        self_c, _ = self_c[self_c['Time'] != self_c['Time']]
        # We have to merge the branch back to the main branch even that these
        # branch does not generate ticks, because we do not introduce one more
        # output point, because the OneTick would add it to the final output
        # datastructure.
        self.sink(otq.Merge(identify_input_ts=False))
        self.source(self_c.node().copy_graph())
        self.node().add_rules(self_c.node().copy_rules())
        self._merge_tmp_otq(self_c) 
[docs]    def script(self, func: Union[Callable[['Source'], Optional[bool]], str], inplace=False) -> 'Source':
        # TODO: need to narrow Source object to get rid of undesired methods like aggregations
        """
        Implements a script for every tick.
        Allows to pass a ``func`` that will be applied per every tick.
        A ``func`` can be python callable in this case it will be translated to per tick script.
        In order to use it in Remote OTP with Ray, the function should be decorated with ``@otp.remote``,
        see :ref:`Ray usage examples<apply-remote-context>` for details.
        See :ref:`Per Tick Script Guide <python callable parser>` for more detailed description
        of python to OneTick code translation and per-tick script features.
        The script written in per tick script language can be passed itself as a string or path to a file with the code.
        onetick-py doesn't validate the script, but configure the schema accordingly.
        Parameters
        ----------
        func: callable, str or path
            - a callable that takes only one parameter - actual tick that behaves like a `Source` instance
            - or the script on per-tick script language
            - or a path to file with onetick script
        Returns
        -------
        :class:`Source`
        See also
        --------
        **PER_TICK_SCRIPT** OneTick event processor
        Examples
        --------
        >>> t = otp.Ticks({'X': [1, 2, 3], 'Y': [4, 5, 6]})
        >>> def fun(tick):
        ...     tick['Z'] = 0
        ...     if tick['X'] + tick['Y'] == 5:
        ...         tick['Z'] = 1
        ...     elif tick['X'] + tick['Y'] * 2 == 15:
        ...         tick['Z'] = 2
        >>> t = t.script(fun)
        >>> otp.run(t)
                             Time  X  Y  Z
        0 2003-12-01 00:00:00.000  1  4  1
        1 2003-12-01 00:00:00.001  2  5  0
        2 2003-12-01 00:00:00.002  3  6  2
        See also
        --------
        :py:meth:`onetick.py.Source.apply`
        :ref:`Per-Tick Script Guide <python callable parser>`
        """
        res = self if inplace else self.copy()
        changed_tick_lists = {}
        if callable(func):
            _new_columns, _script = apply_script(func, _EmulateObject(self))
            changed_tick_lists = _EmulateObject.get_changed_tick_lists()
        elif isinstance(func, str):
            if os.path.isfile(func):
                # path to the file with script
                with open(func) as file:
                    _script = file.read()
            else:
                _script = func
            _new_columns = self._extract_columns_from_script(_script)
        else:
            raise ValueError("Wrong argument was specify, please use callable or string with either script on per tick "
                             "language or path to it")
        res.sink(otq.PerTickScript(script=_script))
        res.schema.update(**_new_columns)
        for tick_list_name, tick_list_schema in changed_tick_lists.items():
            res.state_vars[tick_list_name]._schema = tick_list_schema
        return res 
    def _extract_columns_from_script(self, script):
        result = {}
        supported_types = {'byte', 'short', 'uint', 'long', 'ulong', 'int', 'float', 'double',
                           'string', 'time32', 'nsectime', 'msectime', 'varstring', 'decimal'}
        types = r"(?P<type>varstring|byte|short|uint|int|ulong|long|" \
                
r"float|double|decimal|string|time32|msectime|nsectime|matrix)"
        length = r"(\[(?P<length>\d+)\])?"
        name = r"\s+(?P<name>\w+)\s*=\s*"
        pattern = re.compile(types + length + name)
        for p in re.finditer(pattern, script):
            groupdict = p.groupdict()
            type = ott.str2type(groupdict["type"]) if groupdict["type"] in supported_types else None
            if type:
                length = groupdict["length"]
                if length:
                    length = int(length)
                    if type is str and length != ott.string.DEFAULT_LENGTH:
                        type = ott.string[length]
                result[groupdict["name"]] = type
            else:
                warnings.warn(f"{groupdict['type']} isn't supported for now, so field {groupdict['name']} won't "
                              f"be added to schema.")
        return result
[docs]    def to_symbol_param(self):
        """
        Creates a read-only instance with the same columns except Time.
        It is used as a result of a first stage query with symbol params.
        See also
        --------
        :ref:`Symbol Parameters Objects`
        :ref:`Symbol parameters`
        Examples
        --------
        >>> symbols = otp.Ticks({'SYMBOL_NAME': ['S1', 'S2'], 'PARAM': ['A', 'B']})
        >>> symbol_params = symbols.to_symbol_param()
        >>> t = otp.DataSource('SOME_DB', tick_type='TT')
        >>> t['S_PARAM'] = symbol_params['PARAM']
        >>> result = otp.run(t, symbols=symbols)
        >>> result['S1']
                             Time  X S_PARAM
        0 2003-12-01 00:00:00.000  1       A
        1 2003-12-01 00:00:00.001  2       A
        2 2003-12-01 00:00:00.002  3       A
        """
        return _SymbolParamSource(**self.columns()) 
    @staticmethod
    def _convert_symbol_to_string(symbol, tmp_otq=None, start=None, end=None, timezone=None):
        if start is adaptive:
            start = None
        if end is adaptive:
            end = None
        if isinstance(symbol, Source):
            symbol = otp.eval(symbol).to_eval_string(tmp_otq=tmp_otq,
                                                     start=start, end=end, timezone=timezone,
                                                     operation_suffix='symbol',
                                                     query_name=None,
                                                     file_suffix=symbol._name_suffix('symbol.otq'))
        if isinstance(symbol, onetick.py.sources.query):
            return symbol.to_eval_string()
        else:
            return symbol
    @staticmethod
    def _construct_multi_branch_graph(branches):
        # TODO: add various checks, e.g. that branches have common parts
        main = branches[0].copy()
        for branch in branches[1:]:
            main.node().add_rules(branch.node().copy_rules())
            main._merge_tmp_otq(branch)
        return main
    def _apply_side_branches(self, side_branches):
        for side_branch in side_branches:
            self.node().add_rules(side_branch.node().copy_rules())
            self._merge_tmp_otq(side_branch)
            self.__sources_keys_dates.update(side_branch.__sources_keys_dates)
            self.__sources_modify_query_times.update(side_branch.__sources_modify_query_times)
            self.__sources_base_ep_func.update(side_branch.__sources_base_ep_func)
            self.__sources_symbols.update(side_branch.__sources_symbols)
    def symbols_for(self, func, *, symbol="SYMBOL_NAME", start="_PARAM_START_TIME_NANOS", end="_PARAM_END_TIME_NANOS"):
        """
        Apply ticks from the current _source as symbols for
        calculations in the 'func'.
        Parameters
        ----------
        func: callable
            A callable object that takes only one parameter, that
            references to the current symbol.
         symbol: str
            The column name that contains symbols. Default is 'SYMBOL_NAME'.
            The mandatory column.
        start: str
            The column name that contains start time for a symbol. Default
            is '_PARAM_START_TIME_NANOS'. The optional column.
        end: str
            The column name that contains end time for a symbol. Default
            is '_PARAM_END_TIME_NANOS'. The optional column.
        Returns
        -------
            A multi symbol ticks _source.
        """
        symbols = self.copy()
        if str(symbol) != "SYMBOL_NAME":
            symbols["SYMBOL_NAME"] = symbols[str(symbol)]
        if str(start) != "_PARAM_START_TIME_NANOS":
            symbols["_PARAM_START_TIME_NANOS"] = symbols[str(start)]
        if str(end) != "_PARAM_END_TIME_NANOS":
            symbols["_PARAM_END_TIME_NANOS"] = symbols[str(end)]
        if "SYMBOL_NAME" not in symbols.columns():
            raise Exception("Ticks do not have the SYMBOL_NAME column, but this is mandatory column.")
        # -------------- #
        num_params = len(inspect.signature(func).parameters)
        if num_params == 0:
            logic = func()
        elif num_params == 1:
            logic = func(symbols.to_symbol_param())
        else:
            raise ValueError(
                f"It is expected only one parameter from the callback, but {num_params} passed"
            )  # TODO: test this case
        # -------------- #
        # Find date range for the logic query
        # If date range is not specified for at least one _source, then
        # try to deduce date range - set to adaptive
        # TODO: we don't need to get common symbol from _get_date_range,
        #  refactor this function, so we don't get an error if default symbol is not set
        logic_start, logic_end, _ = logic.copy()._get_date_range(default_start=adaptive, default_end=adaptive)
        if logic_start is adaptive:
            logic_start = onetick.py.utils.INF_TIME
        if logic_end is adaptive:
            logic_end = onetick.py.ZERO_TIME
        # The same as previous, but for the symbols
        symbols_start, symbols_end, _ = symbols.copy()._get_date_range(default_start=adaptive, default_end=adaptive)
        if symbols_start is adaptive:
            symbols_start = onetick.py.utils.INF_TIME
        if symbols_end is adaptive:
            symbols_end = onetick.py.ZERO_TIME
        # Query interval should be as wider as possible
        start = min(logic_start, symbols_start)
        end = max(logic_end, symbols_end)
        # If nothing is specified, then just use default
        if start == onetick.py.utils.INF_TIME:
            start = None
        if end == onetick.py.ZERO_TIME:
            end = None
        return _MultiSymbolsSource(symbols.copy(), logic, start=start, end=end)
    @staticmethod
    def _columns_to_params_for_joins(columns, query_params=False):
        """
        Converts a dictionary of columns into a parameters string.
        This is mainly used for join_with_query and join_with_collection.
        query_params control whether resulting string should be considered query params or symbol params
        (as it impacts some of the conversion rules)
        """
        params_list = []
        for key, value in columns.items():
            dtype = ott.get_object_type(value)
            convert_rule = "'" + key + "=' + "
            if key == '_PARAM_SYMBOL_TIME' and not query_params:
                # this symbol parameter has to be formatted differently because Onetick treats this parameter
                # in a special way
                if dtype is otp.nsectime:
                    convert_rule += f'NSECTIME_FORMAT("%Y%m%d%H%M%S.%J",{ott.value2str(value)},_TIMEZONE)'
                elif dtype is str:
                    if are_strings(getattr(value, "dtype", None)):
                        convert_rule += str(value)
                    else:
                        convert_rule += '"' + value + '"'
                else:
                    raise ValueError('Parameter symbol_time has to be a datetime value!')
            elif dtype is str:
                if are_strings(getattr(value, "dtype", None)):
                    convert_rule += str(value)
                else:
                    convert_rule += '"' + value + '"'
            elif dtype is otp.msectime:
                convert_rule += "tostring(GET_MSECS(" + str(value) + "))"
            elif dtype is otp.nsectime:
                if key == '_SYMBOL_TIME' and query_params:
                    # hack to support passing _SYMBOL_TIME to called query as a parameter
                    warnings.warn('Query parameter _SYMBOL_TIME passed to join_with_query! '
                                  'This is deprecated. Please use a dedicated `symbol_time` parameter of the '
                                  'join_with_query function')
                    convert_rule += "tostring(GET_MSECS(" + str(value) + "))"
                elif query_params:
                    # this can be used for query params but cannot be used for symbol params
                    # overall it's better
                    convert_rule += "'NSECTIME('+tostring(NSECTIME_TO_LONG(" + str(value) + "))+')'"
                else:
                    # this matches the common way onetick converts nanoseconds to symbol parameters
                    convert_rule += "tostring(GET_MSECS(" + str(value) + \
                                    
"))+'.'+SUBSTR(NSECTIME_FORMAT('%J'," + str(value) + ",_TIMEZONE),3,6)"
            else:
                convert_rule += "tostring(" + str(value) + ")"
            params_list.append(convert_rule)
        return "+','+".join(params_list)
    @staticmethod
    def _get_default_fields_for_outer_join_str(default_fields_for_outer_join, how, sub_source_schema):
        """
        Default fields for outer join definition.
        Used by join_with_query() and join_with_collection()
        """
        default_fields_for_outer_join_str = ''
        if default_fields_for_outer_join:
            if how != 'outer':
                raise ValueError('The `default_fields_for_outer_join` parameter can be used only for outer join')
            for field, expr in default_fields_for_outer_join.items():
                if field not in sub_source_schema.keys():
                    raise KeyError(f'Field {field} is specified in `default_fields_for_outer_join` parameter, '
                                   'but is not present in the joined source schema!')
                if default_fields_for_outer_join_str != '':
                    default_fields_for_outer_join_str += ','
                default_fields_for_outer_join_str += f'{field}={ott.value2str(expr)}'
        return default_fields_for_outer_join_str
[docs]    def join_with_collection(
        self,
        collection_name,
        query_func=None,
        how="outer",
        params=None,
        start=None,
        end=None,
        prefix=None,
        caching=None,
        keep_time=None,
        default_fields_for_outer_join=None,
    ) -> 'Source':
        """
        For each tick uses ``query_func`` to join ticks from ``collection_name`` tick collection
        (tick set, unordered tick set, tick list, or tick deque).
        Parameters
        ----------
        collection_name: str
            Name of the collection state variable from which to join ticks. Collections are the following types:
            :py:class:`TickSet <onetick.py.core._internal._state_objects.TickSet>`,
            :py:class:`TickSetUnordered <onetick.py.core._internal._state_objects.TickSetUnordered>`,
            :py:class:`TickList <onetick.py.core._internal._state_objects.TickList>` and
            :py:class:`TickDeque <onetick.py.core._internal._state_objects.TickDeque>`.
        query_func: callable
            Callable ``query_func`` should return :class:`Source`. If passed, this query will be used on ticks
            from collection before joining them.
            In this case, ``query_func`` object will be evaluated by OneTick (not python)
            for every input tick. Note that python code will be executed only once,
            so all python's conditional expressions will be evaluated only once too.
            Callable should have ``source`` parameter. When callable is called, this parameter
            will have value of a :class:`Source` object representing ticks loaded directly from the collection.
            Any operation applied to this source will be applied to ticks from the collection
            before joining them.
            Also, callable should have the parameters with names
            from ``params`` if they are specified in this method.
            If ``query_func`` is not passed, then all ticks from the collection will be joined.
        how: 'inner', 'outer'
            Type of join. If **inner**, then output tick is propagated
            only if some ticks from the collection were joined to the input tick.
        params: dict
            Mapping of the parameters' names and their values for the ``query_func``.
            :py:class:`Columns <onetick.py.Column>` can be used as a value.
        start: datetime, Operation
            Start time to select ticks from collection.
            If specified, only ticks in collection that have higher or equal timestamp will be processed.
            If not passed, then there will be no lower time bound for the collection ticks.
            This means that even ticks with TIMESTAMP lower than _START_TIME of the main query will be joined.
        end: datetime, Operation
            End time to select ticks from collection.
            If specified, only ticks in collection that have lower timestamp will be processed.
            If not passed, then there will be no upper time bound for the collection ticks.
            This means that even ticks with TIMESTAMP higher than _END_TIME of the main query will be joined.
        prefix : str
            Prefix for the names of joined tick fields.
        caching : str
            If `None` caching is disabled. You can specify caching by using values:
                * 'per_symbol': cache is different for each symbol.
        keep_time : str
            Name for the joined timestamp column. `None` means no timestamp column will be joined.
        default_fields_for_outer_join : dict
            When you use outer join, all output ticks will have fields from the schema of the joined source.
            If nothing was joined to a particular output tick, these fields will have default values for their type.
            This parameter allows to override the values that would be added to ticks for which nothing was joined.
            Dictionary keys should be field names, and dictionary values should be constants
            or :class:`Operation` expressions
        Returns
        -------
        :class:`Source`
                Source with joined ticks from ``collection_name``
        See also
        --------
        **JOIN_WITH_COLLECTION_SUMMARY** OneTick event processor
        Examples
        --------
        >>> # OTdirective: snippet-name: Special functions.join with collection.without query;
        >>> src = otp.Tick(A=1)
        >>> src.state_vars['TICK_SET'] = otp.state.tick_set('LATEST_TICK', 'B', otp.eval(otp.Tick(B=1, C='STR')))
        >>> src = src.join_with_collection('TICK_SET')
        >>> otp.run(src)[["A", "B", "C"]]
           A  B    C
        0  1  1  STR
        >>> # OTdirective: snippet-name: Special functions.join with collection.with query and params;
        >>> src = otp.Ticks(A=[1, 2, 3, 4, 5],
        ...                 B=[2, 2, 3, 3, 3])
        >>> src.state_vars['TICK_LIST'] = otp.state.tick_list()
        >>> def fun(tick): tick.state_vars['TICK_LIST'].push_back(tick)
        >>> src = src.script(fun)
        >>>
        >>> def join_fun(source, param_b):
        ...     source = source.agg(dict(VALUE=otp.agg.sum(source['A'])))
        ...     source['VALUE'] = source['VALUE'] + param_b
        ...     return source
        >>>
        >>> src = src.join_with_collection('TICK_LIST', join_fun, params=dict(param_b=src['B']))
        >>> otp.run(src)[["A", "B", "VALUE"]]
           A  B  VALUE
        0  1  2      3
        1  2  2      5
        2  3  3      9
        3  4  3     13
        4  5  3     18
        Join last standing quote from each exchange to trades:
        >>> # OTdirective: snippet-name: Special functions.join with collection.standing quotes per exchange;
        >>> trd = otp.Ticks(offset=[1000, 2000, 3000, 4000, 5000],
        ...                 PRICE=[10.1, 10.2, 10.15, 10.23, 10.4],
        ...                 SIZE=[100, 50, 100, 60, 200])
        >>>
        >>> qte = otp.Ticks(offset=[500, 600, 1200, 2500, 3500, 3600, 4800],
        ...                 EXCHANGE=['N', 'C', 'Q', 'Q', 'C', 'N', 'C'],
        ...                 ASK_PRICE=[10.2, 10.18, 10.18, 10.15, 10.31, 10.32, 10.44],
        ...                 BID_PRICE=[10.1, 10.17, 10.17, 10.1, 10.23, 10.31, 10.4])
        >>>
        >>> trd['TICK_TYPE'] = 'TRD'
        >>> qte['TICK_TYPE'] = 'QTE'
        >>>
        >>> trd_qte = trd + qte
        >>> trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'] = otp.state.tick_set(
        ...     'LATEST', 'EXCHANGE',
        ...     schema=['EXCHANGE', 'ASK_PRICE', 'BID_PRICE'])
        >>>
        >>> trd_qte = trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].update(where=trd_qte['TICK_TYPE'] == 'QTE',
        ...                                                                value_fields=['ASK_PRICE', 'BID_PRICE'])
        >>> trd, _ = trd_qte[trd_qte['TICK_TYPE'] == 'TRD']
        >>> trd.drop(['ASK_PRICE', 'BID_PRICE', 'EXCHANGE'], inplace=True)
        >>> trd = trd.join_with_collection('LAST_QUOTE_PER_EXCHANGE')
        >>> otp.run(trd)[['PRICE', 'SIZE', 'EXCHANGE', 'ASK_PRICE', 'BID_PRICE']]
            PRICE  SIZE EXCHANGE  ASK_PRICE  BID_PRICE
        0   10.10   100        N      10.20      10.10
        1   10.10   100        C      10.18      10.17
        2   10.20    50        N      10.20      10.10
        3   10.20    50        C      10.18      10.17
        4   10.20    50        Q      10.18      10.17
        5   10.15   100        N      10.20      10.10
        6   10.15   100        C      10.18      10.17
        7   10.15   100        Q      10.15      10.10
        8   10.23    60        N      10.32      10.31
        9   10.23    60        C      10.31      10.23
        10  10.23    60        Q      10.15      10.10
        11  10.40   200        N      10.32      10.31
        12  10.40   200        C      10.44      10.40
        13  10.40   200        Q      10.15      10.10
        """
        # check that passed collection is good
        if collection_name not in self.state_vars.names:
            raise KeyError(f'Collection with name {collection_name} is not in the list of available state variables')
        if not isinstance(self.state_vars[collection_name], _TickSequence):
            raise ValueError(f'State variable {collection_name} is not a tick collection! '
                             'Only TickSet, TickSetUnordered, TickList and TickDeque objects are supported '
                             'as data sources for join_with_collection')
        if params is None:
            params = {}
        special_params = ('source', '__fixed_start_time', '__fixed_end_time')
        for sp_param in special_params:
            if sp_param in params.keys():
                raise ValueError(f'Parameter name "{sp_param}" is special and cannot be used for params '
                                 'of join_with_collection function. Please, select a different name.')
        # JOIN_WITH_COLLECTION_SUMMARY has START_TIME and END_TIME parameters with the precision of millisecond.
        # So, here we add a workaround on onetick.py side to support nsectime precision
        # "start" and "end" parameters of the EP are kept as they may be necessary
        # for performance reasons
        if start is not None:
            params['__fixed_start_time'] = start
            start = start - otp.Milli(1)
        if end is not None:
            params['__fixed_end_time'] = end
            end = end + otp.Milli(1)
        # prepare temporary file
        # ------------------------------------ #
        # TODO: this should be a common code somewhere
        collection_schema = {
            key: value for key, value in self.state_vars[collection_name].schema.items()
            if key not in self.__class__.meta_fields and key not in self.__class__._PROPERTIES
        }
        join_source_root = otp.DataSource(db=otp.config.default_db,
                                          tick_type="ANY",
                                          schema_policy="manual",
                                          **collection_schema)
        if query_func is None:
            query_func = lambda source: source  # noqa
        converted_params = prepare_params(**params)
        fixed_start_time = None
        fixed_end_time = None
        if '__fixed_start_time' in converted_params.keys():
            fixed_start_time = converted_params['__fixed_start_time']
            del converted_params['__fixed_start_time']
        if '__fixed_end_time' in converted_params.keys():
            fixed_end_time = converted_params['__fixed_end_time']
            del converted_params['__fixed_end_time']
        sub_source = query_func(source=join_source_root, **converted_params)
        if fixed_start_time is not None:
            sub_source = sub_source[sub_source['TIMESTAMP'] >= fixed_start_time][0]
        if fixed_end_time is not None:
            sub_source = sub_source[sub_source['TIMESTAMP'] < fixed_end_time][0]
        sub_source = self._process_keep_time_param(keep_time, sub_source)
        params_str = self._columns_to_params_for_joins(params, query_params=True)
        sub_source_schema = sub_source.schema.copy()
        columns = {}
        columns.update(self._get_columns_with_prefix(sub_source, prefix))
        columns.update(self.columns(skip_meta_fields=True))
        res = self.copy(columns=columns)
        res._merge_tmp_otq(sub_source)
        query_name = sub_source._store_in_tmp_otq(
            res._tmp_otq,
            symbols='_NON_EXISTING_SYMBOL_',
            operation_suffix="join_with_collection"
        )
        # ------------------------------------ #
        default_fields_for_outer_join_str = self._get_default_fields_for_outer_join_str(
            default_fields_for_outer_join, how, sub_source_schema
        )
        join_params = dict(
            collection_name=str(self.state_vars[collection_name]),
            otq_query=f'"THIS::{query_name}"',
            join_type=how.upper(),
            otq_query_params=params_str,
            default_fields_for_outer_join=default_fields_for_outer_join_str,
        )
        self._fill_aux_params_for_joins(
            join_params, caching, end, prefix, start,
            symbol_name=None, timezone=None, join_with_collection=True
        )
        res.sink(otq.JoinWithCollectionSummary(**join_params))
        res._add_table()
        res.sink(otq.Passthrough(fields="TIMESTAMP", drop_fields=True))
        return res 
[docs]    def join_with_query(
        self,
        query,
        how="outer",
        symbol=None,
        params=None,
        start=None,
        end=None,
        timezone=None,
        prefix=None,
        caching=None,
        keep_time=None,
        where=None,
        default_fields_for_outer_join=None,
        symbol_time=None,
        concurrency=None,
        **kwargs,
    ) -> 'Source':
        """
        For each tick executes ``query``.
        Parameters
        ----------
        query: callable, Source
            Callable ``query`` should return :class:`Source`. This object will be evaluated by OneTick (not python)
            for every tick. Note python code will be executed only once, so all python's conditional expressions
            will be evaluated only once too.
            Callable should have ``symbol`` parameter and the parameters with names
            from ``params`` if they are specified in this method.
            If ``query`` is a :class:`Source` object then it will be propagated as a query to OneTick.
        how: 'inner', 'outer'
            Type of join. If **inner**, then each tick is propagated
            only if its ``query`` execution has a non-empty result.
        params: dict
            Mapping of the parameters' names and their values for the ``query``.
            :py:class:`Columns <onetick.py.Column>` can be used as a value.
        symbol: str, Operation, dict, Source, or Tuple[Union[str, Operation], Union[dict, Source]]
            Symbol name to use in ``query``. In addition, symbol params can be passed along with symbol name.
            Symbol name can be passed as a string or as an :class:`Operation`.
            Symbol parameters can be passed as a dictionary. Also, the main :class:`Source` object,
            or the object containing a symbol parameter list, can be used as a list of symbol parameter.
            Special symbol parameters (`_PARAM_START_TIME_NANOS` and `_PARAM_END_TIME_NANOS`)
            will be ignored and will not be propagated to ``query``.
            ``symbol`` will be interpreted as a symbol name or as symbol parameters, depending on its type.
            You can pass both as a tuple.
            If symbol name is not passed, then symbol name from the main source is used.
        start: datetime, Operation
            Start time of ``query``.
            By default, start time of the main source is used.
        end: datetime, Operation
            End time of ``query``.
            By default, end time of the main source is used.
        start_time:
            .. deprecated:: 1.48.4
                The same as ``start``.
        end_time:
            .. deprecated:: 1.48.4
                The same as ``end``.
        timezone : Optional, str
            Timezone of ``query``.
            By default, timezone of the main source is used.
        prefix : str
            Prefix for the names of joined tick fields.
        caching : str
            If `None` caching is disabled. You can specify caching by using values:
                * 'cross_symbol': cache is the same for all symbols
                * 'per_symbol': cache is different for each symbol.
        keep_time : str
            Name for the joined timestamp column. `None` means no timestamp column will be joined.
        where : Operation
            Condition to filter ticks for which the result of the ``query`` will be joined.
        default_fields_for_outer_join : dict
            When you use outer join, all output ticks will have fields from the schema of the joined source.
            If nothing was joined to a particular output tick, these fields will have default values for their type.
            This parameter allows to override the values that would be added to ticks for which nothing was joined.
            Dictionary keys should be field names, and dictionary values should be constants
            or :class:`Operation` expressions
        symbol_time : datetime, Operation
            Time that will be used by Onetick to map the symbol with which ``query`` is executed to the reference data.
            This parameter is only necessary if the query is expected to perform symbology conversions.
        concurrency : int
            Specifies number of threads for asynchronous processing of ``query`` per unbound symbol list.
            By default, the number of threads is 1.
        Returns
        -------
        :class:`Source`
                Source with joined ticks from ``query``
        See also
        --------
        **JOIN_WITH_QUERY** OneTick event processor
        Examples
        --------
        >>> # OTdirective: snippet-name: Special functions.join with query.with an otp data source;
        >>> d = otp.Ticks(Y=[-1])
        >>> d = d.update(dict(Y=1), where=(d.Symbol.name == "a"))
        >>> data = otp.Ticks(X=[1, 2],
        ...                  S=["a", "b"])
        >>> res = data.join_with_query(d, how='inner', symbol=data['S'])
        >>> otp.run(res)[["X", "Y", "S"]]
           X  Y  S
        0  1  1  a
        1  2 -1  b
        >>> d = otp.Ticks(ADDED=[-1])
        >>> d = d.update(dict(ADDED=1), where=(d.Symbol.name == "3"))  # symbol name is always string
        >>> data = otp.Ticks(A=[1, 2], B=[2, 4])
        >>> res = data.join_with_query(d, how='inner', symbol=(data['A'] + data['B']))  # OTdirective: skip-snippet:;
        >>> df = otp.run(res)
        >>> df[["A", "B", "ADDED"]]
           A  B  ADDED
        0  1  2      1
        1  2  4     -1
        Constants as symbols are also supported
        >>> d = otp.Ticks(ADDED=[d.Symbol.name])
        >>> data = otp.Ticks(A=[1, 2], B=[2, 4])
        >>> res = data.join_with_query(d, how='inner', symbol=1)    # OTdirective: skip-snippet:;
        >>> df = otp.run(res)
        >>> df[["A", "B", "ADDED"]]
           A  B ADDED
        0  1  2     1
        1  2  4     1
        Function object as query is also supported (Note it will be executed only once in python's code)
        >>> def func(symbol):
        ...    d = otp.Ticks(TYPE=["six"])
        ...    d = d.update(dict(TYPE="three"), where=(symbol.name == "3"))  # symbol is always converted to string
        ...    d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST']
        ...    return d
        >>> # OTdirective: snippet-name: Special functions.join with query.with a function
        >>> data = otp.Ticks(A=[1, 2], B=[2, 4])
        >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B'], dict(PREF="_", POST="$")))
        >>> df = otp.run(res)
        >>> df[["A", "B", "TYPE"]]
           A  B     TYPE
        0  1  2  _three$
        1  2  4    _six$
        It's possible to pass the source itself as a list of symbol parameters, which will make all of its fields
        accessible through the "symbol" object
        >>> def func(symbol):
        ...    d = otp.Ticks(TYPE=["six"])
        ...    d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST']
        ...    return d
        >>> # OTdirective: snippet-name: Source operations.join with query.source as symbol;
        >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["_", "$"], POST=["$", "_"])
        >>> res = data.join_with_query(func, how='inner', symbol=data)
        >>> df = otp.run(res)
        >>> df[["A", "B", "TYPE"]]
           A  B   TYPE
        0  1  2  _six$
        1  2  4  $six_
        The examples above can be rewritten by using onetick query parameters instead of symbol parameters.
        OTQ parameters are global for query, while symbol parameters can be redefined by bound symbols.
        >>> def func(symbol, pref, post):
        ...     d = otp.Ticks(TYPE=["six"])
        ...     d = d.update(dict(TYPE="three"), where=(symbol.name == "3"))  # symbol is always converted to string
        ...     d["TYPE"] = pref + d["TYPE"] + post
        ...     return d
        >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params;
        >>> data = otp.Ticks(A=[1, 2], B=[2, 4])
        >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']),
        ...                            params=dict(pref="_", post="$"))
        >>> df = otp.run(res)
        >>> df[["A", "B", "TYPE"]]
           A  B     TYPE
        0  1  2  _three$
        1  2  4    _six$
        Some or all onetick query parameters can be column or expression also
        >>> def func(symbol, pref, post):
        ...     d = otp.Ticks(TYPE=["six"])
        ...     d = d.update(dict(TYPE="three"), where=(symbol.name == "3"))  # symbol is always converted to string
        ...     d["TYPE"] = pref + d["TYPE"] + post
        ...     return d
        >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params from fields;   # noqa
        >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["^", "_"], POST=["!", "$"])
        >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']),
        ...                            params=dict(pref=data["PREF"] + ".", post=data["POST"]))
        >>> df = otp.run(res)
        >>> df[["A", "B", "TYPE"]]
           A  B      TYPE
        0  1  2  ^.three!
        1  2  4    _.six$
        You can specify start and end time of the query to select specific ticks from db
        >>> # OTdirective: snippet-name: Special functions.join with query.passing start/end times;
        >>> d = otp.Ticks(Y=[1, 2])
        >>> data = otp.Ticks(X=[1, 2])
        >>> start = datetime(2003, 12, 1, 0, 0, 0, 1000, tzinfo=pytz.timezone("EST5EDT"))
        >>> end = datetime(2003, 12, 1, 0, 0, 0, 3000, tzinfo=pytz.timezone("EST5EDT"))
        >>> res = data.join_with_query(d, how='inner', start=start, end=end)
        >>> otp.run(res)
                             Time  Y  X
        0 2003-12-01 00:00:00.000  1  1
        1 2003-12-01 00:00:00.000  2  1
        2 2003-12-01 00:00:00.001  1  2
        3 2003-12-01 00:00:00.001  2  2
        Use keep_time param to keep or rename original timestamp column
        >>> # OTdirective: snippet-name: Special functions.join with query.keep the timestamps of the joined ticks;
        >>> d = otp.Ticks(Y=[1, 2])
        >>> data = otp.Ticks(X=[1, 2])
        >>> res = data.join_with_query(d, how='inner', keep_time="ORIG_TIME")
        >>> otp.run(res)
                             Time  Y               ORIG_TIME  X
        0 2003-12-01 00:00:00.000  1 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.000  2 2003-12-01 00:00:00.001  1
        2 2003-12-01 00:00:00.001  1 2003-12-01 00:00:00.000  2
        3 2003-12-01 00:00:00.001  2 2003-12-01 00:00:00.001  2
        """
        # TODO: check if join_with_query checks schema of joined source against primary source,
        # by itself or with process_by_group
        if params is None:
            params = {}
        # "symbol" parameter can contain a symbol name (string, field, operation etc),
        # a symbol parameter list (dict, Source, _SymbolParamSource),
        # or both together as a tuple
        def _check_and_convert_symbol(symbol):
            if isinstance(symbol, _Operation):  # TODO: PY-35
                return True, f"tostring({symbol})"
            elif isinstance(symbol, str):
                return True, f"'{symbol}'"
            elif type(symbol) in {int, float}:  # constant
                return True, f"tostring({symbol})"
            elif symbol is None:
                # this is necessary to distinguish None (which is valid value for symbol) from invalid values
                return True, None
            else:
                return False, None
        def _convert_symbol_param_and_columns(symbol_param):
            """
            We need to create two objects from a symbol param (a dict, a Source or a _SymbolParamSource):
            1. Dictionary of columns to generate list of symbol parameters for the JOIN_WITH_QUERY EP
            2. _SymbolParamSource object to pass to the source function if necessary
            """
            if isinstance(symbol_param, dict):
                converted_symbol_param_columns = symbol_param
                converted_symbol_param = _SymbolParamSource(**{key: ott.get_object_type(column)
                                                               for key, column in symbol_param.items()})
            elif isinstance(symbol_param, _Source):
                converted_symbol_param_columns = {field_name: symbol_param[field_name]
                                                  for field_name in symbol_param.columns(skip_meta_fields=True).keys()}
                converted_symbol_param = symbol_param.to_symbol_param()
            elif isinstance(symbol_param, _SymbolParamSource):
                converted_symbol_param_columns = {field_name: symbol_param[field_name]
                                                  for field_name in symbol_param.schema.keys()}
                converted_symbol_param = symbol_param
            else:
                return None, None
            # we want to pass all the fields to the joined query as symbol parameters,
            # except for some special fields that would override explicitly set parameters
            ignore_symbol_fields = [
                '_PARAM_START_TIME_NANOS',
                '_PARAM_END_TIME_NANOS',
            ]
            filtered_converted_symbol_param_columns = {}
            for field_name, field_value in converted_symbol_param_columns.items():
                if field_name in ignore_symbol_fields:
                    warnings.warn(f'Special symbol parameter "{field_name}" was passed to the joined query! '
                                  'This parameter would be ignored. Please, use parameters of the `join_with_query` '
                                  'function itself to set it.')
                else:
                    filtered_converted_symbol_param_columns[field_name] = field_value
            filtered_converted_symbol_param = _SymbolParamSource(
                **{field_name: field_value for field_name, field_value in converted_symbol_param.schema.items()
                    if field_name not in ignore_symbol_fields}
            )
            return filtered_converted_symbol_param_columns, filtered_converted_symbol_param
        # if "symbol" is tuple, we unpack it
        if isinstance(symbol, tuple) and len(symbol) == 2:
            symbol_name, symbol_param = symbol
        else:
            # see if "symbol" contains symbol name or symbol params
            is_symbol, _ = _check_and_convert_symbol(symbol)
            if is_symbol:
                symbol_name = symbol
                symbol_param = {}
            else:
                symbol_name = None
                symbol_param = symbol
        _, converted_symbol_name = _check_and_convert_symbol(symbol_name)
        # default symbol name should be this: _SYMBOL_NAME if it is not empty else _NON_EXISTING_SYMBOL_
        # this way we will force JWQ to substitute symbol with any symbol parameters we may have passed
        # otherwise (if an empty symbol name is passed to JWQ), it will not substitute either symbol name
        # or symbol parameters, and so symbol parameters may get lost
        # see BDS-263
        if converted_symbol_name is None:
            converted_symbol_name = "CASE(_SYMBOL_NAME,'','_NON_EXISTING_SYMBOL',_SYMBOL_NAME)"
        converted_symbol_param_columns, converted_symbol_param = _convert_symbol_param_and_columns(symbol_param)
        if converted_symbol_param is None:
            # we couldn't interpret "symbols" as either symbol name or symbol parameters
            raise ValueError('"symbol" parameter has a wrong format! It should be a symbol name, a symbol parameter '
                             'object (dict or Source), or a tuple containing both')
        # adding symbol time
        if '_PARAM_SYMBOL_TIME' in converted_symbol_param_columns.keys():
            warnings.warn('"_PARAM_SYMBOL_TIME" explicitly passed among join_with_query symbol parameters! '
                          'This is deprecated - please use symbol_time parameter instead. '
                          'If you specify symbol_time parameter, it will override the explicitly passed value')
        if symbol_time is not None:
            if ott.get_object_type(symbol_time) is not otp.nsectime and ott.get_object_type(symbol_time) is not str:
                raise ValueError(f'Parameter of type {ott.get_object_type(symbol_time)} passed as symbol_time! '
                                 'This parameter only supports datetime values or strings')
            converted_symbol_param_columns['_PARAM_SYMBOL_TIME'] = symbol_time
        # prepare temporary file
        # ------------------------------------ #
        converted_params = prepare_params(**params)
        if isinstance(query, Source):
            sub_source = query
        else:
            # inspect function
            # -------
            sig = inspect.signature(query)
            if "symbol" in sig.parameters:
                if "symbol" in converted_params.keys():
                    raise AttributeError('"params" contains key "symbol", which is reserved for symbol parameters. '
                                         'Please, rename this parameter to another name')
                converted_params["symbol"] = converted_symbol_param  # type: ignore
            sub_source = query(**converted_params)
        sub_source = self._process_keep_time_param(keep_time, sub_source)
        if not sub_source._is_unbound_required():
            sub_source += onetick.py.sources.Empty()
        params_str = self._columns_to_params_for_joins(params, query_params=True)
        symbol_params_str = self._columns_to_params_for_joins(converted_symbol_param_columns)
        sub_source_schema = sub_source.schema.copy()
        columns = {}
        columns.update(self._get_columns_with_prefix(sub_source, prefix))
        columns.update(self.columns(skip_meta_fields=True))
        res = self.copy(columns=columns)
        res._merge_tmp_otq(sub_source)
        query_name = sub_source._store_in_tmp_otq(
            res._tmp_otq,
            symbols='_NON_EXISTING_SYMBOL_',
            operation_suffix="join_with_query"
        )  # TODO: combine with _convert_symbol_to_string
        # ------------------------------------ #
        if where is not None and how != 'outer':
            raise ValueError('The `where` parameter can be used only for outer join')
        default_fields_for_outer_join_str = self._get_default_fields_for_outer_join_str(
            default_fields_for_outer_join, how, sub_source_schema
        )
        join_params = dict(
            otq_query=f'"THIS::{query_name}"',
            join_type=how.upper(),
            otq_query_params=params_str,
            symbol_params=symbol_params_str,
            where=str(where._make_python_way_bool_expression()) if where is not None else '',
            default_fields_for_outer_join=default_fields_for_outer_join_str,
        )
        if concurrency is not None:
            if type(concurrency) is not int or concurrency <= 0:
                raise ValueError('Wrong value of concurrency parameter passed! '
                                 'concurrency should be a positive integer')
            join_params['shared_thread_count'] = concurrency
        start_time = kwargs.get('start_time', start)
        end_time = kwargs.get('end_time', end)
        self._fill_aux_params_for_joins(
            join_params, caching, end_time, prefix, start_time, converted_symbol_name, timezone
        )
        res.sink(otq.JoinWithQuery(**join_params))
        res._add_table()
        res.sink(otq.Passthrough(fields="TIMESTAMP", drop_fields=True))
        return res 
    @property
    def state_vars(self) -> StateVars:
        """
        Provides access to state variables
        Returns
        -------
        State Variables: Dict[str, state variable]
            State variables, you can access one with its name.
        See Also
        --------
        | `State Variables \
         <../../static/getting_started/variables_and_data_structures.html#variables-and-data-structures>`_
        | **DECLARE_STATE_VARIABLES** OneTick event processor
        """
        return self.__dict__['_state_vars']
    __invalid_query_name_symbols_regex = re.compile('[^a-zA-Z0-9_]')
    def __remove_invalid_symbols(self, s):
        """
        Replaces symbols that cannot be put in query names with '_'
        """
        return self.__invalid_query_name_symbols_regex.sub('_', s)
    def get_name(self, remove_invalid_symbols=False):
        """
        Returns source name.
        If remove_invalid_symbols == True, returned name only contains symbols that can be put in query names.
        """
        if remove_invalid_symbols and self.__name:
            return self.__remove_invalid_symbols(self.__name)
        else:
            return self.__name
    def set_name(self, new_name):
        """
        Sets source name.
        Source name cannot be an empty string but can be None.
        """
        assert isinstance(new_name, str) or new_name is None, "Source name must be a string or None."
        if new_name is not None:
            assert new_name != '', "Source name must be a non-empty string."
        self.__name = new_name
    def _name_suffix(self, suffix, separator='.', remove_invalid_symbols=False):
        if remove_invalid_symbols:
            suffix = self.__remove_invalid_symbols(suffix)
            separator = self.__remove_invalid_symbols(separator)
            name = self.get_name(remove_invalid_symbols=True)
        else:
            name = self.__name
        return f'{separator}{name}{separator}{suffix}' if name else f'{separator}{suffix}'
    @property
    def schema(self) -> Schema:
        """
        Represents actual python data schema in the column-name -> type format.
        For example, could be used after the :meth:`Source.sink` to adjust
        the schema.
        Returns
        -------
        Schema
        See Also
        --------
        Source.sink
        Examples
        --------
        >>> data = otp.Ticks([['X', 'Y',   'Z'],
        ...                   [  1, 0.5, 'abc']])
        >>> data['T'] = data['Time']
        >>> data.schema
        {'X': <class 'int'>, 'Y': <class 'float'>, 'Z': <class 'str'>, 'T': <class 'onetick.py.types.nsectime'>}
        >>> data.schema['X']
        <class 'int'>
        >>> data.schema['X'] = float
        >>> data.schema['X']
        <class 'float'>
        >>> 'W' in data.schema
        False
        >>> data.schema['W'] = otp.nsectime
        >>> 'W' in data.schema
        True
        >>> data.schema['W']
        <class 'onetick.py.types.nsectime'>
        """
        schema = self.columns(skip_meta_fields=True)
        hidden_columns = {'Time': ott.nsectime, 'TIMESTAMP': ott.nsectime}
        return Schema(_base_source=self, _hidden_columns=hidden_columns, **schema)
    def set_schema(self, **kwargs):
        """
        Set schema of the source.
        Note: this method affect python part only and won't make any db queries. It used to set schema after db reading/
        complex query.
        .. deprecated:: please use the :property:`Source.schema` to access and adjust the schema.
        Parameters
        ----------
        kwargs
            schema in the column_name=type format
        Examples
        --------
        Python can't follow low level change of column, e.g. complex query or pertick script can be sink.
        >>> data = otp.Ticks(dict(A=[1, 2], B=["a", "b"]))
        >>> data.sink(otq.AddField(field='Z', value='5'))   # doctest: +SKIP
        >>> data.columns(skip_meta_fields=True)
        {'A': <class 'int'>, 'B': <class 'str'>}
        >>> # OTdirective: snippet-name: Arrange.schema.set;
        >>> data.set_schema(A=int, B=str, Z=int)
        >>> data.columns(skip_meta_fields=True)
        {'A': <class 'int'>, 'B': <class 'str'>, 'Z': <class 'int'>}
        """
        self.drop_columns()
        for name, dtype in kwargs.items():
            dtype = ott.get_source_base_type(dtype)
            self._set_field_by_tuple(name, dtype)
    def _process_keep_time_param(self, keep_time, sub_source):
        if keep_time == "TIMESTAMP":
            raise ValueError("TIMESTAMP is reserved OneTick name, please, specify another one.")
        if keep_time in self.columns():
            raise ValueError(f"{keep_time} column is already presented.")
        sub_source = sub_source.copy()
        if keep_time:
            sub_source[keep_time] = sub_source["Time"]
        return sub_source
    def _get_columns_with_prefix(self, sub_source, prefix) -> dict:
        sub_source_columns = sub_source.schema
        if prefix is None:
            prefix = ""
        if not isinstance(prefix, str):
            raise ValueError("Only string constants are supported for now.")
        new_columns = {prefix + name: dtype for name, dtype in sub_source_columns.items()}
        same_names = set(new_columns) & set(self.schema)
        if same_names:
            raise ValueError(f"After applying prefix some columns aren't unique: {', '.join(same_names)}.")
        return new_columns
    def _fill_aux_params_for_joins(
        self, join_params, caching, end_time, prefix, start_time, symbol_name, timezone, join_with_collection=False
    ):
        if symbol_name and not join_with_collection:
            join_params["symbol_name"] = symbol_name
        if prefix is not None:
            join_params["prefix_for_output_ticks"] = str(prefix)
        if caching:
            if join_with_collection:
                supported = ("per_symbol",)
            else:
                supported = ("cross_symbol", "per_symbol")
            if caching in supported:
                join_params["caching_scope"] = caching
            else:
                raise ValueError(f"Unknown value for caching param, please use None or any of {supported}.")
        self._fill_time_param_for_jwq(join_params, start_time, end_time, timezone)
        if join_with_collection:
            del join_params['timezone']
    def _fill_time_param_for_jwq(self, join_params, start_time, end_time, timezone):
        self._process_start_or_end_of_jwq(join_params, start_time, "start_timestamp")
        self._process_start_or_end_of_jwq(join_params, end_time, "end_timestamp")
        if timezone:
            join_params["timezone"] = f"'{timezone}'"
        else:
            join_params["timezone"] = "_TIMEZONE"  # this may break something, need to test
    def _process_start_or_end_of_jwq(self, join_params, time, param_name):
        if time is not None:
            if isinstance(time, (datetime, otp.dt)):
                join_params[f"{param_name}"] = time.timestamp() * 1000
            elif isinstance(time, _Operation):
                join_params[f"{param_name}"] = str(time)
            else:
                raise ValueError(f"{param_name} should be datetime.datetime instance or OneTick expression")
[docs]    @inplace_operation
    def transpose(
        self,
        inplace: bool = False,
        direction: Literal['rows', 'columns'] = 'rows',
        n: Optional[int] = None,
    ) -> Optional['Source']:
        """
        Data transposing.
        The main idea is joining many ticks into one or splitting one tick to many.
        Parameters
        ----------
        inplace: bool, default=False
            if `True` method will modify current object,
            otherwise it will return modified copy of the object.
        direction: 'rows', 'columns', default='rows'
            - `rows`: join certain input ticks (depending on other parameters) with preceding ones.
              Fields of each tick will be added to the output tick and their names will be suffixed
              with **_K** where **K** is the positional number of tick (starting from 1) in reverse order.
              So fields of current tick will be suffixed with **_1**, fields of previous tick will be
              suffixed with **_2** and so on.
            - `columns`: the operation is opposite to `rows`. It splits each input tick to several
              output ticks. Each input tick must have fields with names suffixed with **_K**
              where **K** is the positional number of tick (starting from 1) in reverse order.
        n: Optional[int], default=None
            must be specified only if ``direction`` is 'rows'.
            Joins every **n** number of ticks with **n-1** preceding ticks.
        Returns
        -------
        If ``inplace`` parameter is `True` method will return `None`,
        otherwise it will return modified copy of the object.
        See also
        --------
        **TRANSPOSE** OneTick event processor
        Examples
        --------
        Merging two ticks into one.
        >>> data = otp.Ticks(dict(A=[1, 2],
        ...                       B=[3, 4]))
        >>> data = data.transpose(direction='rows', n=2) # OTdirective: skip-snippet:;
        >>> otp.run(data)
                             Time             TIMESTAMP_1  A_1  B_1 TIMESTAMP_2  A_2  B_2
        0 2003-12-01 00:00:00.001 2003-12-01 00:00:00.001    2    4  2003-12-01    1    3
        And splitting them back into two.
        >>> data = data.transpose(direction='columns') # OTdirective: skip-snippet:;
        >>> otp.run(data)
                             Time  A  B
        0 2003-12-01 00:00:00.000  1  3
        1 2003-12-01 00:00:00.001  2  4
        """
        direction_map = {'rows': 'ROWS_TO_COLUMNS', 'columns': 'COLUMNS_TO_ROWS'}
        n: Union[str, int] = '' if n is None else n  # type: ignore[no-redef]
        self.sink(otq.Transpose(direction=direction_map[direction],
                                key_constraint_values=n))
        # TODO: we should change source's schema after transposing
        return self 
[docs]    @inplace_operation
    def process_by_group(self, process_source_func, group_by=None, source_name=None,
                         num_threads=None, inplace=False) -> Union['Source', Tuple['Source', ...], None]:
        """
        Groups data by ``group_by`` and run ``process_source_func`` for each group and merge outputs for every group.
        Note ``process_source_func`` will be converted to Onetick object and passed to query,
        that means that python callable will be called only once.
        Parameters
        ----------
        process_source_func: callable
            ``process_source_func`` should take :class:`Source` apply necessary logic and return it
            or tuple of :class:`Source` in this case all of them should have a common root that is the
            input :class:`Source`.
            The number of sources returned by this method is the same as the number of sources
            returned by ``process_source_func``.
        group_by: list
            A list of field names to group input ticks by.
            If group_by is None then no group_by fields are defined
            and logic of ``process_source_func`` is applied to all input ticks
            at once
        source_name: str
            A name for the source that represents all of group_by sources. Can be passed here or as a name
            of the inner sources; if passed by both ways, should be consistent
        num_threads: int
            If specified and not zero, turns on asynchronous processing mode
            and specifies number of threads to be used for processing input ticks.
            If this parameter is not specified or zero, then input ticks are processed synchronously.
        inplace: bool
            If True - nothing will be returned and changes will be applied to current query
            otherwise changes query will be returned.
            Error is raised if ``inplace`` is set to True
            and multiple sources returned by ``process_source_func``.
        Returns
        -------
        :class:`Source`, Tuple[:class:`Source`] or None:
        See also
        --------
        **GROUP_BY** OneTick event processor
        Examples
        --------
        >>> # OTdirective: snippet-name: Arrange.group.single output;
        >>> d = otp.Ticks(X=[1, 1, 2, 2],
        ...               Y=[1, 2, 3, 4])
        >>>
        >>> def func(source):
        ...     return source.first()
        >>>
        >>> res = d.process_by_group(func, group_by=['X'])
        >>> otp.run(res)[["X", "Y"]]
           X  Y
        0  1  1
        1  2  3
        Set asynchronous processing:
        >>> res = d.process_by_group(func, group_by=['X'], num_threads=2)
        >>> otp.run(res)[['X', 'Y']]
           X  Y
        0  1  1
        1  2  3
        Return multiple outputs, each with unique grouping logic:
        >>> d = otp.Ticks(X=[1, 1, 2, 2],
        ...               Y=[1, 2, 1, 3])
        >>>
        >>> def func(source):
        ...     source['Z'] = source['X']
        ...     source2 = source.copy()
        ...     source = source.first()
        ...     source2 = source2.last()
        ...     return source, source2
        >>> # OTdirective: snippet-name: Arrange.group.multiple output;
        >>> res1, res2 = d.process_by_group(func, group_by=['Y'])
        >>> df1 = otp.run(res1)
        >>> df2 = otp.run(res2)
        >>> df1[['X', 'Y', 'Z']]
           X  Y  Z
        0  1  1  1
        1  1  2  1
        2  2  3  2
        >>> df2[['X', 'Y', 'Z']]    # OTdirective: skip-snippet:;
           X  Y  Z
        0  1  2  1
        1  2  1  2
        2  2  3  2
        """
        if group_by is None:
            group_by = []
        if inplace:
            main_source = self
        else:
            main_source = self.copy()
        input_schema = main_source.columns(skip_meta_fields=True)
        for field in group_by:
            if field not in input_schema:
                raise ValueError(f"Group by field name {field} not present in input source schema")
        process_source_root = onetick.py.sources.Custom(tick_type="ANY", schema_policy="manual", **input_schema)
        if source_name:
            process_source_root.set_name(source_name)
        process_sources = process_source_func(process_source_root)
        if isinstance(process_sources, Source):
            # returned one source
            process_sources = [process_sources]
        elif len(process_sources) == 1:
            # returned one source as an iterable
            pass
        else:
            # returned multiple sources
            if inplace:
                raise ValueError("Cannot use inplace=True with multi-source processing function!")
        num_source = 0
        for process_source in process_sources:
            output_schema = process_source.columns(skip_meta_fields=True)
            if process_source.get_name():
                if not process_source_root.get_name():
                    process_source_root.set_name(process_source.get_name())
                if process_source_root.get_name() != process_source.get_name():
                    warnings.warn("Different strings passed as names for the root source used in "
                                  f"process_by_group: '{process_source.get_name()}' "
                                  f"and '{process_source_root.get_name()}'")
            # removing key fields from output schema since they will be
            # added by the GROUP_BY EP
            process_source.drop([field for field in group_by if field in output_schema], inplace=True)
            process_source.sink(otq.Passthrough().node_name(f"OUT_{num_source}"))
            process_source_root.node().add_rules(process_source.node().copy_rules())
            main_source._merge_tmp_otq(process_source)
            num_source += 1
        query_name = process_source_root._store_in_tmp_otq(
            main_source._tmp_otq, operation_suffix="group_by", add_passthrough=False
        )
        process_path = f'THIS::{query_name}'
        num_outputs = len(process_sources)
        # we shouldn't set named outputs if GROUP_BY EP has only one output due to onetick behaviour
        if num_outputs == 1:
            outputs = ""
        else:
            outputs = ",".join([f"OUT_{i}" for i in range(0, num_outputs)])
        kwargs = {}
        if num_threads is not None:
            if num_threads < 0:
                raise ValueError("Parameter 'num_threads' can't be negative.")
            kwargs['num_threads'] = num_threads
        main_source.sink(
            otq.GroupBy(key_fields=",".join(group_by), query_name=process_path, outputs=outputs, **kwargs)
        )
        output_sources = []
        for num_output in range(0, num_outputs):
            if num_outputs == 1 and inplace:
                output_source = main_source
            else:
                output_source = main_source.copy()
            if num_outputs > 1:
                output_source.node().out_pin(f"OUT_{num_output}")
            # setting schema after processing
            output_schema = process_sources[num_output].columns(skip_meta_fields=True)
            for field in group_by:
                output_schema[field] = input_schema[field]
            for field, field_type in output_schema.items():
                output_source.schema[field] = field_type
            output_source = output_source[[field for field in output_schema]]
            output_source._merge_tmp_otq(main_source)
            output_sources.append(output_source)
        if num_outputs == 1:
            return output_sources[0]
        else:
            return tuple(output_sources) 
    def unite_columns(self, sep="", *, apply_str=False):
        """
        Join values of all columns into one string
        The method unite all fields to one string, just like python ``join`` method. All fields should be strings,
        otherwise the error will be generated. To change this behavior, ``apply_str=True`` argument should be specified,
        in this case all fields will be converted to string type before joining.
        Parameters
        ----------
        sep: str
            Separator between values, empty string be dafault.
        apply_str: bool
            If set every column will be converted to string during operation. False be default.
        Returns
        -------
        result: column
            Column with str type
        Examples
        --------
        >>> # OTdirective: snippet-name: Arrange.join columns as strings;
        >>> data = otp.Ticks(X=[1, 2, 3], A=["A", "A", "A"], B=["A", "B", "C"])
        >>> data["S_ALL"] = data.unite_columns(sep=",", apply_str=True)
        >>> data["S"] = data[["A", "B"]].unite_columns()
        >>> otp.run(data)[["S", "S_ALL"]]
            S  S_ALL
        0  AA  1,A,A
        1  AB  2,A,B
        2  AC  3,A,C
        """
        if apply_str:
            cols = (self[col].apply(str) for col in self.schema)
        else:
            not_str = [name for name, t in self.schema.items() if not are_strings(t)]
            if not_str:
                raise ValueError(f"All joining columns should be strings, while {', '.join(not_str)} "
                                 f"are not. Specify `apply_str=True` for automatic type conversion")
            else:
                cols = (self[col] for col in self.schema)
        return functools.reduce(lambda x, y: x + sep + y, cols)
    # Aggregations copy
    # we need this functions to store and collect documentation
    # copy_method decorator will
    #       set docstring (will compare docstring of donor function and method docstring)
    #       apply same signature from donor function + self
    #       for mimic=True will apply agg function as is
[docs]    @copy_method(high_tick)
    def high(self):
        """
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000])
        >>> data = data.high(['X'], 2)     # OTdirective: snippet-name: Aggregations.high tick;
        >>> otp.run(data)
                             Time  X
        0 2003-12-01 00:00:01.500  3
        1 2003-12-01 00:00:03.000  4
        """
        pass 
[docs]    @copy_method(low_tick)
    def low(self):
        """
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000])
        >>> data = data.low(['X'],2)      # OTdirective: snippet-name: Aggregations.low tick;
        >>> otp.run(data)
                         Time  X
        0 2003-12-01 00:00:00  1
        1 2003-12-01 00:00:01  2
        """
        pass 
[docs]    @copy_method(first_tick)
    def first(self):
        """
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3, 4])
        >>> data = data.first()    # OTdirective: snippet-name: Aggregations.first;
        >>> otp.run(data)
                Time  X
        0 2003-12-01  1
        """
        pass 
[docs]    @copy_method(last_tick)
    def last(self):
        """
        Examples
        --------
        >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000])
        >>> data = data.last()     # OTdirective: snippet-name: Aggregations.last;
        >>> otp.run(data)
                         Time  X
        0 2003-12-01 00:00:03  4
        """
        pass 
[docs]    @copy_method(distinct, mimic=False)
    def distinct(self, *args, **kwargs):
        """
        Examples
        --------
        >>> data = otp.Ticks(dict(x=[1, 3, 1, 5, 3]))
        >>> data = data.distinct('x')   # OTdirective: snippet-name: Aggregations.distinct;
        >>> otp.run(data)
                Time  x
        0 2003-12-04  1
        1 2003-12-04  3
        2 2003-12-04  5
        """
        if 'bucket_interval_units' in kwargs:
            kwargs['bucket_units'] = kwargs.pop('bucket_interval_units')
        agg = distinct(*args, **kwargs)
        return agg.apply(self) 
[docs]    @copy_method(high_time, mimic=False)  # mimic=False for backward compatibility
    def high_time(self, *args, **kwargs):
        """
        Returns timestamp of tick with the highest value of input field
         .. deprecated:: 1.14.5
            Use :py:func:`.high_time` instead
        See Also
        --------
        :py:func:`.high_time`
        """
        warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. "
                      f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead",
                      DeprecationWarning, stacklevel=2)
        agg = high_time(*args, **kwargs)
        return agg.apply(self, 'VALUE') 
[docs]    @copy_method(low_time, mimic=False)  # mimic=False for backward compatibility
    def low_time(self, *args, **kwargs):
        """
        Returns timestamp of tick with the lowest value of input field
         .. deprecated:: 1.14.5
            Use :py:func:`.low_time` instead
        See Also
        --------
        :py:func:`.low_time`
        """
        warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. "
                      f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead",
                      DeprecationWarning, stacklevel=2)
        agg = low_time(*args, **kwargs)
        return agg.apply(self, 'VALUE') 
[docs]    @copy_method(ob_snapshot)
    def ob_snapshot(self):
        """
        Examples
        --------
        >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA')  # doctest: +SKIP
        >>> data = data.ob_snapshot(max_levels=1)  # doctest: +SKIP
        >>> otp.run(data) # doctest: +SKIP
                Time  PRICE             UPDATE_TIME  SIZE  LEVEL  BUY_SELL_FLAG
        0 2003-12-04    2.0 2003-12-01 00:00:00.003     6      1              1
        1 2003-12-04    5.0 2003-12-01 00:00:00.004     7      1              0
        """
        pass 
[docs]    @copy_method(ob_snapshot_wide)
    def ob_snapshot_wide(self):
        """
        Examples
        --------
        >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA')  # doctest: +SKIP
        >>> data = data.ob_snapshot_wide(max_levels=1)  # doctest: +SKIP
        >>> otp.run(data) # doctest: +SKIP
                Time  BID_PRICE         BID_UPDATE_TIME  BID_SIZE  ASK_PRICE         ASK_UPDATE_TIME  ASK_SIZE  LEVEL
        0 2003-12-03        5.0 2003-12-01 00:00:00.004         7        2.0 2003-12-01 00:00:00.003         6      1
        """
        pass 
[docs]    @copy_method(ob_snapshot_flat)
    def ob_snapshot_flat(self):
        """
        Examples
        --------
        >>> data = otp.DataSource(db='SOME_DB', tick_type='PRL', symbols='AA')  # doctest: +SKIP
        >>> data = data.ob_snapshot_flat(max_levels=1)  # doctest: +SKIP
        >>> otp.run(data) # doctest: +SKIP
                Time  BID_PRICE1        BID_UPDATE_TIME1  BID_SIZE1  ASK_PRICE1        ASK_UPDATE_TIME1  ASK_SIZE1
        0 2003-12-03         5.0 2003-12-01 00:00:00.004          7         2.0 2003-12-01 00:00:00.003          6
        """
        pass 
[docs]    @inplace_operation
    def add_prefix(self, prefix, inplace=False, columns=None, ignore_columns=None) -> Optional['Source']:
        """
        Adds prefix to all column names.
        Parameters
        ----------
        prefix : str
            String prefix to add to all columns.
        inplace : bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
            object.
        columns: List[str], optional
            If set, only selected columns will be updated with prefix. Can't be used with ``ignore_columns`` parameter.
        ignore_columns: List[str], optional
            If set, selected columns won't be updated with prefix. Can't be used with ``columns`` parameter.
        Returns
        -------
        :class:`Source` or ``None``
        Examples
        --------
        >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1')
        >>> data = data.add_prefix('test_')
        >>> otp.run(data)
                             Time  test_X
        0 2003-12-01 00:00:00.000       1
        1 2003-12-01 00:00:00.001       2
        2 2003-12-01 00:00:00.002       3
        >>> data.schema
        {'test_X': <class 'int'>}
        >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL')
        >>> data = data.add_prefix('test_')
        >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2))
                             Time  test_PRICE  test_SIZE
        0 2022-03-01 00:00:00.000         1.3        100
        1 2022-03-01 00:00:00.001         1.4         10
        2 2022-03-01 00:00:00.002         1.4         50
        >>> data.schema
        {'test_PRICE': <class 'float'>}
        >>> data = otp.Tick(X=1, XX=2)
        >>> data.add_prefix('X')
        Traceback (most recent call last):
          ...
        AttributeError: Column XX already exists, please, use another prefix
        Parameter ``columns`` specifies columns to be updated with prefix:
        >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
        >>> data = data.add_prefix('test_', columns=['A', 'B', 'C'])
        >>> otp.run(data)
                Time  test_A  test_B  test_C  D  E
        0 2003-12-01       1       2       3  4  5
        Parameter ``ignore_columns`` specifies columns to ignore:
        >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
        >>> data = data.add_prefix('test_', ignore_columns=['A', 'B', 'C'])
        >>> otp.run(data)
                Time  A  B  C  test_D  test_E
        0 2003-12-01  1  2  3       4       5
        Parameters `columns` and `ignore_columns` can't be used at the same time:
        >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
        >>> data.add_prefix('test_', columns=['B', 'C'], ignore_columns=['A'])
        Traceback (most recent call last):
          ...
        ValueError: It is allowed to use only one of `columns` or `ignore_columns` parameters at a time
        """
        return self._add_prefix_and_suffix(
            prefix=prefix,
            columns=columns,
            ignore_columns=ignore_columns,
        ) 
[docs]    @inplace_operation
    def add_suffix(self, suffix, inplace=False, columns=None, ignore_columns=None) -> Optional['Source']:
        """
        Adds suffix to all column names.
        Parameters
        ----------
        suffix : str
            String suffix to add to all columns.
        inplace : bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
            object.
        columns: List[str], optional
            If set, only selected columns will be updated with suffix. Can't be used with `ignore_columns` parameter.
        ignore_columns: List[str], optional
            If set, selected columns won't be updated with suffix. Can't be used with `columns` parameter.
        Returns
        -------
        :class:`Source` or ``None``
        Examples
        --------
        >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1')
        >>> data = data.add_suffix('_test')
        >>> otp.run(data)
                             Time  X_test
        0 2003-12-01 00:00:00.000       1
        1 2003-12-01 00:00:00.001       2
        2 2003-12-01 00:00:00.002       3
        >>> data.schema
        {'X_test': <class 'int'>}
        >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL')
        >>> data = data.add_suffix('_test')
        >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2))
                             Time  PRICE_test  SIZE_test
        0 2022-03-01 00:00:00.000         1.3        100
        1 2022-03-01 00:00:00.001         1.4         10
        2 2022-03-01 00:00:00.002         1.4         50
        >>> data.schema
        {'PRICE_test': <class 'float'>}
        >>> data = otp.Tick(X=1, XX=2)
        >>> data.add_suffix('X')
        Traceback (most recent call last):
          ...
        AttributeError: Column XX already exists, please, use another suffix
        Parameter `columns` specify columns to be updated with suffix:
        >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
        >>> data = data.add_prefix('test_', columns=['A', 'B', 'C'])
        >>> otp.run(data)
                Time  test_A  test_B  test_C  D  E
        0 2003-12-01       1       2       3  4  5
        Parameter `ignore_columns` specify columns to ignore:
        >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
        >>> data = data.add_prefix('test_', ignore_columns=['A', 'B', 'C'])
        >>> otp.run(data)
                Time  A  B  C  test_D  test_E
        0 2003-12-01  1  2  3       4       5
        Parameters `columns` and `ignore_columns` can't be used at the same time:
        >>> data = otp.Tick(A=1, B=2, C=3, D=4, E=5)
        >>> data.add_prefix('test_', columns=['B', 'C'], ignore_columns=['A'])
        Traceback (most recent call last):
          ...
        ValueError: It is allowed to use only one of `columns` or `ignore_columns` parameters at a time
        """
        return self._add_prefix_and_suffix(
            suffix=suffix,
            columns=columns,
            ignore_columns=ignore_columns,
        ) 
    def _add_prefix_and_suffix(
        self,
        prefix='',
        suffix='',
        columns=None,
        ignore_columns=None,
    ) -> Optional['Source']:
        if not prefix and not suffix:
            raise ValueError('Both suffix and prefix are empty')
        if ' ' in prefix:
            raise ValueError(f'There is space in prefix: {prefix}')
        if ' ' in suffix:
            raise ValueError(f'There is space in suffix: {prefix}')
        columns = columns or []
        ignore_columns = ignore_columns or []
        if columns and ignore_columns:
            raise ValueError('It is allowed to use only one of `columns` or `ignore_columns` parameters at a time')
        schema = self.schema
        for column in columns:
            if column not in schema:
                raise AttributeError(f'Column `{column}` does not exist in the schema')
        for column_name in (columns or schema):
            if column_name in ignore_columns:
                continue
            new_column_name = f'{prefix}{column_name}{suffix}'
            if new_column_name in self.__dict__:
                if prefix:
                    raise AttributeError(f'Column {new_column_name} already exists, please, use another prefix')
                else:
                    raise AttributeError(f'Column {new_column_name} already exists, please, use another suffix')
        for column_name in (columns or schema):
            if column_name in ignore_columns:
                continue
            new_column_name = f'{prefix}{column_name}{suffix}'
            self.__dict__[column_name].rename(new_column_name, update_parent_object=False)
            self.__dict__[new_column_name] = self.__dict__[column_name]
            del self.__dict__[column_name]
        if not columns and not ignore_columns:
            self.sink(otq.RenameFieldsEp(rename_fields=f'(.*)={prefix}\\1{suffix}', use_regex=True))
        elif columns:
            renames = [f'{column}={prefix}{column}{suffix}' for column in columns]
            self.sink(otq.RenameFieldsEp(rename_fields=','.join(renames)))
        else:
            renames = [f'{column}={prefix}{column}{suffix}' for column in schema if column not in ignore_columns]
            self.sink(otq.RenameFieldsEp(rename_fields=','.join(renames)))
        return self
[docs]    @inplace_operation
    def time_filter(self,
                    discard_on_match: bool = False,
                    start_time: Union[str, int, time] = 0,
                    end_time: Union[str, int, time] = 0,
                    day_patterns: Union[str, List[str]] = "",
                    timezone=utils.default,  # type: ignore
                    end_time_tick_matches: bool = False,
                    inplace=False) -> Optional['Source']:
        """
        Filters ticks by time.
        Parameters
        ----------
        discard_on_match : bool, optional
            If ``True``, then ticks that match the filter will be discarded.
            Otherwise, only ticks that match the filter will be passed.
        start_time : str or int or datetime.time, optional
            Start time of the filter, string must be in the format ``HHMMSSmmm``.
            Default value is 0.
        end_time : str or int or datetime.time, optional
            End time of the filter, string must be in the format ``HHMMSSmmm``.
            To filter ticks for an entire day, this parameter should be set to 240000000.
            Default value is 0.
        day_patterns : list or str
            Pattern or list of patterns that determines days for which the ticks can be propagated.
            A tick can be propagated if its date matches one or more of the patterns.
            Three supported pattern formats are:
            1. ``month.week.weekdays``, 0 month means any month, 0 week means any week,
               6 week means the last week of the month for a given weekday(s),
               weekdays are digits for each day, 0 being Sunday.
            2. ``month/day``, 0 month means any month.
            3. ``year/month/day``, 0 year means any year, 0 month means any month.
        timezone : str, optional
            Timezone of the filter.
            Default value is ``configuration.config.tz``
            or timezone set in the parameter of :py:func:`onetick.py.run`.
        end_time_tick_matches : bool, optional
            If ``True``, then the end time is inclusive.
            Otherwise, the end time is exclusive.
        inplace : bool, optional
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified
            object. Default value is ``False``.
        Returns
        -------
        :class:`Source` or ``None``
            Returns ``None`` if ``inplace=True``.
        See also
        --------
        **TIME_FILTER** OneTick event processor
        Examples
        --------
        >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL')
        >>> data = data.time_filter(start_time='000000001', end_time='000000003')
        >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2))
                             Time  PRICE  SIZE
        0 2022-03-01 00:00:00.001    1.4    10
        1 2022-03-01 00:00:00.002    1.4    50
        """
        if timezone is utils.default:
            # doesn't work without expr for some reason
            timezone = 'expr(_TIMEZONE)'
        if day_patterns:
            if isinstance(day_patterns, str):
                day_patterns = [day_patterns]
            for day_pattern in day_patterns:
                if not re.match(r"(^\d\d?\.[0-6].\d\d?$)|(^\d\d?\/\d\d?$)|(^\d{1,4}\/\d\d?\/\d\d?$)", day_pattern):
                    raise ValueError(f"Invalid day pattern: {day_pattern}")
        if isinstance(start_time, time):
            start_time = start_time.strftime('%H%M%S%f')[:-3]
        if isinstance(end_time, time):
            end_time = end_time.strftime('%H%M%S%f')[:-3]
        day_patterns = ",".join(day_patterns)
        self.sink(otq.TimeFilter(discard_on_match=discard_on_match,
                                 start_time=start_time,
                                 end_time=end_time,
                                 timezone=timezone,
                                 day_patterns=day_patterns,
                                 end_time_tick_matches=end_time_tick_matches,
                                 ))
        return self 
[docs]    @copy_method(ranking)
    def ranking(self, *args, **kwargs):
        pass 
[docs]    @inplace_operation
    def insert_tick(self,
                    fields=None,
                    where=None,
                    preserve_input_ticks=True,
                    num_ticks_to_insert=1,
                    insert_before=True,
                    inplace=False) -> Optional['Source']:
        """
        Insert tick.
        Parameters
        ----------
        fields: dict of str to :py:class:`onetick.py.Operation`
            Mapping of field names to some expressions or values.
            These fields in inserted ticks will be set to corresponding values or results of expressions.
            If field is presented in input tick, but not set in ``fields`` dict,
            then the value of the field will be copied from input tick to inserted tick.
            If parameter ``fields`` is not set at all, then values for inserted ticks' fields
            will be default values for fields' types from input ticks (0 for integers etc.).
        where: :py:class:`onetick.py.Operation`
            Expression to select ticks near which the new ticks will be inserted.
            By default, all ticks are selected.
        preserve_input_ticks: bool
            A switch controlling whether input ticks have to be preserved in output time series or not.
            While the former case results in fields of input ticks to be present in the output time series
            together with those defined by the ``fields`` parameter,
            the latter case results in only defined fields to be present.
            If a field of the input time series is defined in the ``fields`` parameter,
            the defined value takes precedence.
        num_ticks_to_insert: int
            Number of ticks to insert.
        insert_before: bool
            Insert tick before each input tick or after.
        inplace: bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing.
            Otherwise method returns a new modified object.
        See also
        --------
        **INSERT_TICK** OneTick event processor
        Returns
        -------
        :class:`Source` or ``None``
        Examples
        --------
        Insert tick before each tick with default type values.
        >>> data = otp.Tick(A=1)
        >>> data = data.insert_tick()
        >>> otp.run(data)
                Time  A
        0 2003-12-01  0
        1 2003-12-01  1
        Insert tick before each tick with field `A` copied from input tick
        and field `B` set to specified value.
        >>> data = otp.Tick(A=1)
        >>> data = data.insert_tick(fields={'B': 'b'})
        >>> otp.run(data)
                Time  B  A
        0 2003-12-01  b  1
        1 2003-12-01     1
        Insert two ticks only after first tick.
        >>> data = otp.Ticks(A=[1, 2, 3])
        >>> data = data.insert_tick(where=data['A'] == 1,
        ...                         insert_before=False,
        ...                         num_ticks_to_insert=2)
        >>> otp.run(data)
                             Time  A
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.000  0
        2 2003-12-01 00:00:00.000  0
        3 2003-12-01 00:00:00.001  2
        4 2003-12-01 00:00:00.002  3
        """
        if not isinstance(num_ticks_to_insert, int) or num_ticks_to_insert <= 0:
            raise ValueError("Parameter 'num_ticks_to_insert' must be a positive integer")
        if not preserve_input_ticks and not fields:
            raise ValueError("Parameter 'fields' must be set if 'preserve_input_ticks' is False")
        where = '' if where is None else str(where)
        fields = fields or {}
        update_schema = {}
        for field, value in fields.items():
            dtype = ott.get_object_type(value)
            if field not in self.schema:
                update_schema[field] = dtype
            elif dtype is not self.schema[field]:
                raise ValueError(f"Incompatible types for field '{field}': {self.schema[field]} --> {dtype}")
            dtype = ott.type2str(dtype)
            if isinstance(value, type):
                value = ott.default_by_type(value)
            value = ott.value2str(value)
            fields[field] = (dtype, value)
        fields = ','.join(
            f'{field} {dtype}={value}' if value else f'{field} {dtype}'
            for field, (dtype, value) in fields.items()
        )
        self.sink(
            otq.InsertTick(
                fields=fields,
                where=where,
                preserve_input_ticks=preserve_input_ticks,
                num_ticks_to_insert=num_ticks_to_insert,
                insert_before=insert_before,
            )
        )
        if preserve_input_ticks:
            self.table(inplace=True, strict=False, **update_schema)
        else:
            self.table(inplace=True, strict=True, **update_schema)
        return self 
[docs]    @inplace_operation
    def modify_query_times(self,
                           start=None,
                           end=None,
                           output_timestamp=None,
                           propagate_heartbeats=True,
                           inplace=False):
        """
        Modify ``start`` and ``end`` time of the query.
        * query times are changed for all operations
          only **before** this method up to the source of the graph.
        * all ticks' timestamps produced by this method
          **must** fall into original time range of the query.
        It is possible to change ticks' timestamps with parameter ``output_timestamp``,
        so they will stay inside the original time range.
        Note
        ----
        Due to how OneTick works internally, tick generators
        :py:class:`otp.Tick <onetick.py.Tick>` and :py:func:`otp.Ticks <onetick.py.Ticks>`
        are not affected by this method.
        Parameters
        ----------
        start: :py:class:`onetick.py.datetime` or \
               :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation`
            Expression to replace query start time.
            By default, start time is not changed.
            Note that expression in this parameter can't depend on ticks, thus only
            :py:class:`~onetick.py.core.source.MetaFields` and constants can be used.
        end: :py:class:`onetick.py.datetime` or \
               :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation`
            Expression to replace query end time.
            By default, end time is not changed.
            Note that expression in this parameter can't depend on ticks, thus only
            :py:class:`~onetick.py.core.source.MetaFields` and constants can be used.
        output_timestamp: :py:class:`onetick.py.Operation`
            Expression that produces timestamp for each tick.
            By default, the following expression is used: ``orig_start + orig_timestamp - start``
            This expression covers cases when start time of the query is changed and keeps
            timestamp inside original time range.
            Note that it doesn't cover cases, for example, if end time was increased,
            you have to handle such cases yourself.
        propagate_heartbeats: bool
            Controls heartbeat propagation.
        inplace: bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing.
            Otherwise method returns a new modified object.
        See also
        --------
        | **MODIFY_QUERY_TIMES** OneTick event processor
        | :py:meth:`onetick.py.Source.time_interval_shift`
        Returns
        -------
        :class:`Source` or ``None``
        Examples
        --------
        >>> start = otp.dt(2022, 3, 2)
        >>> end = otp.dt(2022, 3, 2) + otp.Milli(3)
        >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD')
        By default, method does nothing:
        >>> t = data.modify_query_times()
        >>> otp.run(t, start=start, end=end)
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.000    1.0   100
        1 2022-03-02 00:00:00.001    1.1   101
        2 2022-03-02 00:00:00.002    1.2   102
        See how ``_START_TIME`` and ``_END_TIME`` meta fields are changed.
        They are changed *before* ``modify_query_times``:
        >>> t = data.copy()
        >>> t['S_BEFORE'] = t['_START_TIME']
        >>> t['E_BEFORE'] = t['_END_TIME']
        >>> t = t.modify_query_times(start=t['_START_TIME'] + otp.Milli(1),
        ...                          end=t['_END_TIME'] - otp.Milli(1))
        >>> t['S_AFTER'] = t['_START_TIME']
        >>> t['E_AFTER'] = t['_END_TIME']
        >>> otp.run(t, start=start, end=end)
                Time  PRICE  SIZE                S_BEFORE                E_BEFORE    S_AFTER                 E_AFTER
        0 2022-03-02    1.1   101 2022-03-02 00:00:00.001 2022-03-02 00:00:00.002 2022-03-02 2022-03-02 00:00:00.003
        You can decrease time interval without problems:
        >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1),
        ...                             end=data['_END_TIME'] - otp.Milli(1))
        >>> otp.run(t, start=start, end=end)
                Time  PRICE  SIZE
        0 2022-03-02    1.1   101
        Note that the timestamp of the tick was changed with default expression.
        In this case we can output original timestamps,
        because they fall into original time range:
        >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1),
        ...                             end=data['_END_TIME'] - otp.Milli(1),
        ...                             output_timestamp=data['TIMESTAMP'])
        >>> otp.run(t, start=start, end=end)
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.001    1.1   101
        But it will not work if new time range is wider than original:
        >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1),
        ...                             output_timestamp=data['TIMESTAMP'])
        >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) # doctest: +ELLIPSIS
        Traceback (most recent call last):
        Exception...timestamp is falling out of initial start/end time range...
        In this case default ``output_timestamp`` expression would work just fine:
        >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1))
        >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1))
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.001    1.0   100
        1 2022-03-02 00:00:00.002    1.1   101
        2 2022-03-02 00:00:00.003    1.2   102
        But it doesn't work, for example, if end time has crossed the borders of original time range.
        In this case other ``output_timestamp`` expression must be specified:
        >>> t = data.modify_query_times(
        ...     start=data['_START_TIME'] - otp.Milli(2),
        ...     output_timestamp=otp.math.min(data['TIMESTAMP'] + otp.Milli(2), data['_END_TIME'])
        ... )
        >>> otp.run(t, start=start + otp.Milli(2), end=end)
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.002    1.0   100
        1 2022-03-02 00:00:00.003    1.1   101
        2 2022-03-02 00:00:00.003    1.2   102
        Remember that ``start`` and ``end`` parameters can't depend on ticks:
        >>> t = data.copy()
        >>> t['X'] = 12345
        >>> t = t.modify_query_times(start=t['_START_TIME'] + t['X'] - t['X'],
        ...                          end=t['_END_TIME'] - otp.Milli(1))
        >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS
        Traceback (most recent call last):
        Exception...parameter must not depend on ticks...
        Constant datetime values can be used as parameters too:
        >>> t = data.modify_query_times(start=start + otp.Milli(1),
        ...                             end=end - otp.Milli(1))
        >>> otp.run(t, start=start, end=end)
                Time  PRICE  SIZE
        0 2022-03-02    1.1   101
        Note that some graph patterns are not allowed when using this method.
        For example, modifying query times for a branch that will be merged later:
        >>> t1, t2 = data[data['PRICE'] > 1.3]
        >>> t2 = t2.modify_query_times(start=start + otp.Milli(1))
        >>> t = otp.merge([t1, t2])
        >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS
        Traceback (most recent call last):
        Exception...Invalid graph...time bound to a node...an intermediate node in one of the cycles in graph...
        """
        start = ott.value2str(start) if start is not None else ''
        end = ott.value2str(end) if end is not None else ''
        output_timestamp = ott.value2str(output_timestamp) if output_timestamp is not None else ''
        self.sink(
            otq.ModifyQueryTimes(
                start_time=start,
                end_time=end,
                output_timestamp=output_timestamp,
                propagate_heartbeats=propagate_heartbeats,
            )
        )
        return self 
[docs]    def time_interval_shift(self, shift, inplace=False):
        """
        Shifting time interval for a source.
        The whole data flow is shifted all the way up to the source of the graph.
        The start and end times of the query will be changed for all operations before this method,
        and will stay the same after this method.
        WARNING: The ticks' timestamps *are changed* automatically so they fit into original time range.
        You will get different set of ticks from the database, but the timestamps of the ticks
        from that database will not be the same as in the database.
        They need to be changed so they fit into the original query time range.
        See details in :py:meth:`onetick.py.Source.modify_query_times`.
        Parameters
        ----------
        shift: int or :ref:`datetime offset<Datetime offsets>`
            Offset to shift the whole time interval.
            Can be positive or negative.
            Positive value moves time interval into the future, negative -- to the past.
            int values are interpreted as milliseconds.
            Timestamps of the ticks will be changed so they fit into the original query time range
            by subtracting ``shift`` from each timestamp.
        inplace: bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing.
            Otherwise method returns a new modified object.
        Returns
        -------
        :class:`Source` or ``None``
        See also
        --------
        | :py:meth:`onetick.py.Source.modify_query_times`
        | :py:meth:`onetick.py.Source.time_interval_change`
        Examples
        --------
        --> Also see use-case using :py:meth:`time_interval_shift` for calculating
        `Markouts <../../static/getting_started/use_cases.html#point-in-time-benchmarks-bbo-at-different-markouts>`_
        >>> start = otp.dt(2022, 3, 2)
        >>> end = otp.dt(2022, 3, 2) + otp.Milli(3)
        >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD')
        Default data:
        >>> otp.run(data, start=start, end=end)
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.000    1.0   100
        1 2022-03-02 00:00:00.001    1.1   101
        2 2022-03-02 00:00:00.002    1.2   102
        Get window for a third tick:
        >>> otp.run(data, start=start + otp.Milli(2), end=start + otp.Milli(3))
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.002    1.2   102
        Shifting time window will result in different set of ticks,
        but the ticks will have their timestamps changed to fit into original time range.
        Let's shift time 2 milliseconds back and thus get the first tick:
        >>> t = data.time_interval_shift(shift=-otp.Milli(2))
        >>> otp.run(t, start=start + otp.Milli(2), end=start + otp.Milli(3))
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.002    1.0   100
        Here we are querying empty time interval, but shifting one second back to get ticks.
        >>> t = data.time_interval_shift(shift=-otp.Second(1))
        >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1))
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:01.000    1.0   100
        1 2022-03-02 00:00:01.001    1.1   101
        2 2022-03-02 00:00:01.002    1.2   102
        Note that tick generators
        :py:class:`otp.Tick <onetick.py.Tick>` and :py:func:`otp.Ticks <onetick.py.Ticks>`
        are not really affected by this method, they will have the same timestamps:
        >>> t = otp.Tick(A=1)
        >>> otp.run(t)
                Time  A
        0 2003-12-01  1
        >>> t = t.time_interval_shift(shift=otp.Second(1))
        >>> otp.run(t)
                Time  A
        0 2003-12-01  1
        """
        start = self['_START_TIME'] + shift
        end = self['_END_TIME'] + shift
        # change timestamps so they fit into original time range
        output_timestamp = self['TIMESTAMP'] - shift
        return self.modify_query_times(start=start,
                                       end=end,
                                       output_timestamp=output_timestamp,
                                       inplace=inplace) 
[docs]    def time_interval_change(self,
                             start_change=0,
                             end_change=0,
                             inplace=False):
        """
        Changing time interval by making it bigger or smaller.
        All timestamps of ticks that are crossing the border of original time range
        will be set to original start time or end time depending on their original time.
        Parameters
        ----------
        start_change: int or :ref:`datetime offset<Datetime offsets>`
            Offset to shift start time.
            Can be positive or negative.
            Positive value moves start time into the future, negative -- to the past.
            int values are interpreted as milliseconds.
        end_change: int or :ref:`datetime offset<Datetime offsets>`
            Offset to shift end time.
            Can be positive or negative.
            Positive value moves end time into the future, negative -- to the past.
            int values are interpreted as milliseconds.
        inplace: bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing.
            Otherwise method returns a new modified object.
        Returns
        -------
        :class:`Source` or ``None``
        See also
        --------
        | :py:meth:`onetick.py.Source.modify_query_times`
        | :py:meth:`onetick.py.Source.time_interval_shift`
        Examples
        --------
        >>> start = otp.dt(2022, 3, 2)
        >>> end = otp.dt(2022, 3, 2) + otp.Milli(3)
        >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD')
        By default, ``time_interval_change()`` does nothing:
        >>> t = data.time_interval_change()
        >>> otp.run(t, start=start, end=end)
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.000    1.0   100
        1 2022-03-02 00:00:00.001    1.1   101
        2 2022-03-02 00:00:00.002    1.2   102
        Decreasing time range will not change ticks' timestamps:
        >>> t = data.time_interval_change(start_change=otp.Milli(1), end_change=-otp.Milli(1))
        >>> otp.run(t, start=start, end=end)
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.001    1.1   101
        Increasing time range will change timestamps of the ticks that crossed the border.
        In this case first tick's timestamp will be set to original start time,
        and third tick's to original end time.
        >>> t = data.time_interval_change(start_change=-otp.Milli(1), end_change=otp.Milli(1))
        >>> otp.run(t, start=start + otp.Milli(1), end=start + otp.Milli(2))
                             Time  PRICE  SIZE
        0 2022-03-02 00:00:00.001    1.0   100
        1 2022-03-02 00:00:00.001    1.1   101
        2 2022-03-02 00:00:00.002    1.2   102
        Here we are querying empty time interval, but changing start time one second back to get ticks.
        >>> t = data.time_interval_change(start_change=-otp.Second(1))
        >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1))
                         Time  PRICE  SIZE
        0 2022-03-02 00:00:01    1.0   100
        1 2022-03-02 00:00:01    1.1   101
        2 2022-03-02 00:00:01    1.2   102
        """
        start = self['_START_TIME'] + start_change
        end = self['_END_TIME'] + end_change
        # change ticks' timestamps only if they are out of bounds
        output_timestamp = self['TIMESTAMP']
        output_timestamp = otp.math.min(output_timestamp, self['_END_TIME'])
        output_timestamp = otp.math.max(output_timestamp, self['_START_TIME'])
        return self.modify_query_times(start=start,
                                       end=end,
                                       output_timestamp=output_timestamp,
                                       inplace=inplace) 
    def mean(self, *columns):
        """
        Get average value of the specified columns.
        Do not confuse it with :py:func:`otp.agg.average <onetick.py.agg.average>`,
        this method gets average of the columns' values from single row, not doing
        aggregation of all rows.
        Parameters
        ----------
        columns: str, :class:`Column`
            Columns names or columns objects.
            All columns must have compatible types.
        Returns
        -------
        :class:`~onetick.py.Operation`
        Examples
        --------
        Integers and floating point values can be mixed:
        >>> t = otp.Tick(A=1, B=3.3)
        >>> t['AVG'] = t.mean('A', 'B')
        >>> otp.run(t)
                Time  A    B   AVG
        0 2003-12-01  1  3.3  2.15
        You can get the average for datetime values too:
        >>> t = otp.Tick(START_TIME=otp.dt(2022, 1, 1), END_TIME=otp.dt(2023, 1, 1))
        >>> t['MID_TIME'] = t.mean('START_TIME', 'END_TIME')
        >>> otp.run(t, timezone='GMT')
                Time  START_TIME    END_TIME             MID_TIME
        0 2003-12-01  2022-01-01  2023-01-01  2022-07-02 12:00:00
        Note that things may get confusing for datetimes when
        the timezone with daylight saving time is used:
        >>> t = otp.Tick(START_TIME=otp.dt(2022, 1, 1), END_TIME=otp.dt(2023, 1, 1))
        >>> t['MID_TIME'] = t.mean('START_TIME', 'END_TIME')
        >>> otp.run(t, timezone='EST5EDT')
                Time  START_TIME    END_TIME             MID_TIME
        0 2003-12-01  2022-01-01  2023-01-01  2022-07-02 13:00:00
        """
        dtypes = set()
        for column_name in map(str, columns):
            if column_name not in self.schema:
                raise ValueError(f"There is no '{column_name}' column in the schema")
            dtypes.add(self.schema[column_name])
        if not are_numerics(*dtypes) and not are_time(*dtypes):
            raise ValueError('Only int, float and datetime columns are supported. '
                             'Numeric and datetime columns should not be mixed.')
        op = None
        for column_name in map(str, columns):
            column = self[column_name]
            dtype = self.schema[column_name]
            if dtype is otp.msectime and otp.nsectime in dtypes:
                column = column.astype(otp.nsectime)
            if are_time(dtype):
                column = column.astype(int)
            if op is None:
                op = column
            else:
                op += column
        op = op / len(columns)
        dtype = ott.get_type_by_objects(dtypes)
        if are_time(dtype):
            # can't convert float to datetime, converting to int first
            op = op.astype(int)
        op = op.astype(dtype)
        return op
[docs]    @inplace_operation
    def show_symbol_name_in_db(self, inplace=False):
        """
        Adds the **SYMBOL_NAME_IN_DB** field to input ticks,
        indicating the symbol name of the tick in the database.
        Parameters
        ----------
        inplace: bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing.
            Otherwise method returns a new modified object.
        See also
        --------
        **SHOW_SYMBOL_NAME_IN_DB** OneTick event processor
        Returns
        -------
        :class:`Source` or ``None``
        Examples
        --------
        For example, it can be used to display
        the actual symbol name for the contract (e.g., **ESM23**)
        instead of the artificial continuous name **ES_r_tdi**.
        Notice how actual symbol name can be different for each tick,
        e.g. in this case it is different for each month.
        >>> data = otp.DataSource('TDI_FUT', tick_type='TRD')  # doctest: +SKIP
        >>> data = data[['PRICE']]  # doctest: +SKIP
        >>> data = data.first(bucket_interval=31, bucket_units='days')  # doctest: +SKIP
        >>> data['SYMBOL_NAME'] = data.Symbol.name  # doctest: +SKIP
        >>> data = data.show_symbol_name_in_db()  # doctest: +SKIP
        >>> otp.run(data,  # doctest: +SKIP
        ...         symbols='ES_r_tdi', symbol_date=otp.dt(2023, 3, 1),
        ...         start=otp.dt(2023, 3, 1), end=otp.dt(2023, 5, 1))
                             Time    PRICE SYMBOL_NAME SYMBOL_NAME_IN_DB
        0 2023-03-01 00:00:00.549  3976.75    ES_r_tdi             ESH23
        1 2023-04-02 18:00:00.039  4127.00    ES_r_tdi             ESM23
        """
        if 'SYMBOL_NAME_IN_DB' in self.schema:
            raise ValueError("Column 'SYMBOL_NAME_IN_DB' already exists.")
        self.sink(otq.ShowSymbolNameInDb())
        self.schema['SYMBOL_NAME_IN_DB'] = str
        return self 
[docs]    @inplace_operation
    def throw(self,
              message='', where=1,
              scope='query', error_code=1,
              throw_before_query_execution=False,
              inplace=False):
        """
        Propagates error or warning or throws an exception (depending on ``scope`` parameter)
        when condition provided by ``where`` parameter evaluates to True.
        Throwing an exception will abort query execution,
        propagating error will stop tick propagation and
        propagating warning will not change the data processing.
        Parameters
        ----------
        message: str or Operation
            Message that will be thrown in exception or returned in error message.
        where: Operation
            Logical expression that specifies condition when exception or error will be thrown.
        scope: 'query' or 'symbol'
            'query' will throw an exception and 'symbol' will propagate an error or warning
            depending on ``error_code`` parameter.
        error_code: int
            When ``scope='symbol'``, values from interval ``[1, 500]`` indicate warnings
            and values from interval ``[1500, 2000]`` indicate errors.
            Note that tick propagation will not stop when warning is raised,
            and will stop when error is raised.
            This parameter is not used when ``scope='query'``.
        throw_before_query_execution: bool
            If set to ``True``, the exception will be thrown before the execution of the query.
            This option is intended for supplying placeholder queries that must always throw.
        inplace: bool
            The flag controls whether operation should be applied inplace or not.
            If ``inplace=True``, then it returns nothing.
            Otherwise method returns a new modified object.
        See also
        --------
        **THROW** OneTick event processor
        Returns
        -------
        :class:`Source` or ``None``
        Examples
        --------
        By default, this method will throw an exception on the first tick with empty message:
        >>> t = otp.Tick(A=1)
        >>> t = t.throw()
        >>> otp.run(t)  # doctest: +ELLIPSIS
        Traceback (most recent call last):
          ...
        Exception: ...: In THROW: ...
          ...
        You can specify exception message and condition (note that exception now is raised on second tick):
        >>> t = otp.Ticks(A=[1, 2])
        >>> t = t.throw(message='A is ' + t['A'].apply(str), where=(t['A']==2))
        >>> otp.run(t)  # doctest: +ELLIPSIS
        Traceback (most recent call last):
          ...
        Exception: ...: In THROW: A is 2. ...
          ...
        Note that exception will not be thrown if there are no ticks:
        >>> t = otp.Empty()
        >>> t = t.throw()
        >>> otp.run(t)
        Empty DataFrame
        Columns: []
        Index: []
        If you need exception to be thrown always, you can use ``throw_before_query_execution`` parameter:
        >>> t = otp.Empty()
        >>> t = t.throw(throw_before_query_execution=True)
        >>> otp.run(t)  # doctest: +ELLIPSIS
        Traceback (most recent call last):
          ...
        Exception: ...: In THROW: ...
        You can throw OneTick errors and warnings instead of exceptions.
        Raising error codes from 1 to 500 indicates warnings.
        Raising error codes from 1500 to 2000 indicates errors.
        Tick propagation stops only when error is raised.
        >>> t = otp.Ticks(A=[1, 2, 3, 4])
        >>> t = t.throw(message='warning A=1', scope='symbol', error_code=2, where=(t['A']==1))
        >>> t = t.throw(message='error A=3', scope='symbol', error_code=1502, where=(t['A']==3))
        >>> otp.run(t)
                             Time  A
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.001  2
        Right now the only supported interface to get errors and warnings
        is via ``map`` output structure and its methods:
        >>> otp.run(t, symbols='AAPL', output_structure='map').output('AAPL').error
        [(2, 'warning A=1'), (1502, 'error A=3')]
        """
        if isinstance(message, str):
            treat_message_as_per_tick_expr = False
        else:
            treat_message_as_per_tick_expr = True
        if scope not in {'query', 'symbol'}:
            raise ValueError("Parameter 'scope' can only be set to 'query' or 'symbol'")
        if not (1 <= error_code <= 500 or 1500 <= error_code <= 2000):
            raise ValueError("Parameter 'error_code' can only take values "
                             "from interval [1, 500] for warnings and from interval [1500, 2000] for errors")
        self.sink(
            otq.Throw(
                message=str(message),
                treat_message_as_per_tick_expr=treat_message_as_per_tick_expr,
                where=str(where),
                scope=scope.upper(),
                error_code=error_code,
                throw_before_query_execution=throw_before_query_execution,
            )
        )
        return self  
_Source = Source    # Backward compatibility