import itertools
import warnings
import inspect
import re
from collections import defaultdict, Counter
from functools import singledispatch
from itertools import chain, zip_longest, repeat
from typing import List, Union
from enum import Enum
import onetick.query as otq
from onetick.py.configuration import config
from onetick.py.core.eval_query import _QueryEvalWrapper
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.utils import get_type_that_includes, adaptive, default
import onetick.py.types as ott
from onetick.py.sources import query
from onetick.py.core.column import Column
from onetick.py.core.column_operations.base import Operation
from onetick.py.core.cut_builder import _QCutBuilder, _CutBuilder
__all__ = ['merge', 'join', 'join_by_time', 'apply_query', 'apply', 'cut', 'qcut', 'coalesce', 'corp_actions', 'format']
def output_type_by_index(sources, index):
    if index is None:
        from onetick.py.core.source import _Source
        return _Source
    return type(sources[index])
[docs]def merge(sources, align_schema=True, symbols=None, identify_input_ts=False,
          presort=adaptive, concurrency=default, batch_size=default, output_type_index=None):
    """
    Merges ticks from the ``sources`` into a single output ordered by the timestamp
    Parameters
    ----------
    sources : list
        List of sources to merge
    align_schema : bool
        If set to True, then table is added right after merge.
        We recommended to keep True to prevent problems with
        different tick schemas. Default: True
    symbols: str, list of str or functions, :class:`Source`
        Symbol(s) to run the query for passed as a string, a list of strings, or as a "symbols" query which results
        include the ``SYMBOL_NAME`` column. The start/end times for the
        symbols query will taken from the :meth:`run` params.
        See :ref:`symbols <Symbols>` for more details.
    identify_input_ts: bool
        If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks.
    presort: bool
        Add the presort EP in case of bound symbols.
        Applicable only when ``symbols`` is not None.
        By default, it is set to True if ``symbols`` are set
        and to False otherwise.
    concurrency: int
        Specifies number of CPU cores to utilize for the ``presort``
        By default, the value from otp.config.default_concurrency is used.
    batch_size: int
        Specifies the query batch size for the ``presort``.
        By default, the value from otp.config.default_batch_size is used.
    output_type_index: int
        Specifies index of source in ``sources`` from which type and properties of output will be taken.
        Useful when merging sources that inherited from :class:`Source`.
        By default, output object type will be :class:`Source`.
    Return
    ------
    :class:`Source` or same class as ``sources[output_type_index]``
        A time series of ticks.
    See also
    --------
    **MERGE** and **PRESORT** OneTick event processors
    Examples
    --------
    ``merge`` is used to merge different data sources
    >>> data1 = otp.Ticks(X=[1, 2], Y=['a', 'd'])
    >>> data2 = otp.Ticks(X=[-1, -2], Y=['*', '-'])
    >>> data = otp.funcs.merge([data1, data2])      # OTdirective: snippet-name:merge.as list;
    >>> otp.run(data)
                         Time  X  Y
    0 2003-12-01 00:00:00.000  1  a
    1 2003-12-01 00:00:00.000 -1  *
    2 2003-12-01 00:00:00.001  2  d
    3 2003-12-01 00:00:00.001 -2  -
    Merge series from multiple symbols into one series
    >>> # OTdirective: snippet-name:merge.bound symbols;
    >>> data = otp.Ticks(X=[1])
    >>> data['SYMBOL_NAME'] = data['_SYMBOL_NAME']
    >>> symbols = otp.Ticks(SYMBOL_NAME=['A', 'B'])
    >>> data = otp.merge([data], symbols=symbols)
    >>> otp.run(data)
            Time  X SYMBOL_NAME
    0 2003-12-01  1           A
    1 2003-12-01  1           B
    Adding symbols param before merge
    >>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'], param=[1, -1])
    >>> def func(symbol):
    ...     pre = otp.Ticks(X=[1])
    ...     pre["SYMBOL_NAME"] = symbol.name
    ...     pre["PARAM"] = symbol.param
    ...     return pre
    >>> data = otp.funcs.merge([func], symbols=symbols)
    >>> otp.run(data)[['PARAM', 'SYMBOL_NAME']]
       PARAM SYMBOL_NAME
    0      1          S1
    1     -1          S2
    """
    from onetick.py.core.source import _Source
    if not sources:
        raise ValueError("Merge should have one or more inputs")
    output_type = output_type_by_index(sources, output_type_index)
    if presort is adaptive:
        presort = True if symbols is not None else False
    if presort and not symbols:
        warnings.warn("Using the `presort` parameter makes effect only when "
                      "symbols are specified in the `symbols` parameter")
    if concurrency is not default and not presort:
        warnings.warn("Using the `concurrency` parameter makes effect only when "
                      "the `presort` parameter is set to True")
    if batch_size is not default and not presort:
        warnings.warn("Using the `batch_size` parameter makes effect only when "
                      "the `presort` parameter is set to True")
    if concurrency is default:
        concurrency = (
            config.default_concurrency
            if config.default_concurrency is not None
            # otq.Presort does not support None
            else ''
        )
    if batch_size is default:
        batch_size = config.default_batch_size
    def _base_ep_for_cross_symbol(symbol, tmp_otq):
        if presort and symbol:
            base_ep = otq.Presort(batch_size=batch_size, max_concurrency=concurrency)
        else:
            base_ep = otq.Merge(identify_input_ts=identify_input_ts)
        if symbol:
            if isinstance(symbol, (query, _QueryEvalWrapper)):
                symbol = symbol.to_eval_string(tmp_otq=tmp_otq)
            elif isinstance(symbol, _Source):
                symbol = symbol._convert_symbol_to_string(symbol, tmp_otq=tmp_otq)
            base_ep = base_ep.symbols(symbol)
        return base_ep
    def _evaluate_functions_in_sources_list(sources, symbols):
        result = []
        if not isinstance(sources, list):
            sources = [sources]
        for s in sources:
            if not isinstance(s, _Source) and callable(s):
                num_params = len(inspect.signature(s).parameters)
                if num_params == 0:
                    s = s()
                elif num_params == 1:
                    s = s(symbols.to_symbol_param() if isinstance(symbols, (_Source, _QueryEvalWrapper))
                          else _SymbolParamSource())
                else:
                    raise ValueError(
                        f"It is expected only one parameter from the callback, but {num_params} passed"
                    )  # TODO: test this case
            if isinstance(s, _Source):
                result.append(s)
            else:
                raise ValueError("Source and functions (returning _source) are expected as preprocessors")
        return result
    sources = _evaluate_functions_in_sources_list(sources, symbols)
    need_table = False
    merged_columns, need_table, used_columns = _collect_merged_columns(need_table, sources)
    need_table = _is_table_after_merge_needed(need_table, used_columns)
    # we need to store internal graphs somewhere while we create base ep from eval
    intermediate_tmp_otq = TmpOtq()
    result = output_type(node=_base_ep_for_cross_symbol(symbols, tmp_otq=intermediate_tmp_otq), **merged_columns)
    result._tmp_otq.merge(intermediate_tmp_otq)
    __copy_sources_on_merge_or_join(result, sources, symbols, output_type_index=output_type_index)
    if presort and symbols:
        result.sink(otq.Merge(identify_input_ts=identify_input_ts))
    if identify_input_ts:
        result[("SYMBOL_NAME", str)]
        result[("TICK_TYPE", str)]
    result = _add_table_after_merge(align_schema, merged_columns, need_table, result)
    result._fix_varstrings()
    return result 
def _add_table_after_merge(add_table, merged_columns, need_table, result):
    if add_table and need_table:
        # a special case, when the add_table parameter is a list of common columns that should
        # be added to a final table
        # it is used internally
        if isinstance(add_table, list):
            merged_columns = {key: value for key, value in merged_columns.items() if key in add_table}
        if len(merged_columns):
            table = otq.Table(
                fields=",".join(ott.type2str(dtype) + " " + name for name, dtype in merged_columns.items()),
                keep_input_fields=True,
            )
            result.sink(table)
    return result
def __copy_sources_on_merge_or_join(result,
                                    sources,
                                    symbols=None,
                                    names=None,
                                    drop_meta=False,
                                    leading=None,
                                    output_type_index=None,
                                    use_rename_ep=True):
    """ copy columns, state vars and other metadata from joining, merging sources
    Parameters
    ----------
    result: _Source
        Source object constructed as join, merge operation, e.g. result = _Source(otq.Merge(sources))
    sources: list of _Source, tuple of _Source
        Sources were joined, merged
    symbols:
        Symbols to copy
    names: list of str or None, tuple of str or None, bool, optional
        - If collection of string or None than add passthrough eps with such name to `sources` if name is specify
            or do not add anything if corresponding item in names is None.
        - If True, than autogenerate such names in __SRC_{number}__ format
        - If None, False than do not add passthrough eps and do not change node names.
    drop_meta : bool, optional
        If True drop TIMESTAMP and OMDSEQ field
    leading : List of str, Tuple of str, Optional
        List of leading sources names
    output_type_index: int, optional
        Specifies index of source in `sources` from which properties of `result` will be taken.
        Useful when merging sources that inherited from otp.Source.
    use_rename_ep: bool
        Use :py:class:`onetick.query.RenameFields` event processor or not.
        This event processor can't be used in generic aggregation.
    Returns
    -------
        None
        Modify result directly
    """
    from onetick.py.core.source import _Source
    result._copy_state_vars_from(sources)
    result._clean_sources_dates()  # because it is not a real _source
    for source in sources:
        result._merge_tmp_otq(source)
        if source.get_name():
            if not result.get_name():
                result.set_name(source.get_name())
            if result.get_name() != source.get_name():
                warnings.warn(f"Merging/joining sources with different names: '{result.get_name()}' "
                              f"and '{source.get_name()}'. Some of those names will be lost")
    if isinstance(symbols, _Source):
        result._merge_tmp_otq(symbols)
    names = __copy_and_rename_nodes_on_merge_join(result, names, sources, symbols)
    if drop_meta:
        to_drop = list(map(lambda x: x + ".TIMESTAMP", names))
        to_drop += list(map(lambda x: x + ".OMDSEQ", names))
        __rename_leading_omdseq(leading, names, result, sources, use_rename_ep=use_rename_ep)
        result.sink(otq.Passthrough(fields=",".join(to_drop), drop_fields=True))
    if output_type_index is not None:
        result._copy_properties_from(sources[output_type_index])
def __rename_fields(source, mapping, use_rename_ep=True):
    """
    Function to rename fields from ``mapping`` in ``source``.
    Note that it is a low-level function that doesn't change python schema of the ``source``.
    Modifies ``source`` inplace, doesn't return anything.
    If ``use_rename_ep`` is `True`, then :py:class:`onetick.query.RenameFields` event processor will be used.
    """
    if use_rename_ep:
        source.sink(otq.RenameFields(','.join(f'{k}={v}' for k, v in mapping.items())))
        return
    # May be needed, because RenameFields ep is not supported in generic aggregation
    for old, new in mapping.items():
        # RenameFields ignores non-existent fields,
        # all this mess is needed to mimic that logic
        source.sink(otq.WhereClause(where=f'UNDEFINED("{old}")'))
        if_branch_graph = source.node().copy_graph()
        if_branch_rules = source.node().copy_rules()
        source.sink(otq.AddField(new, old), out_pin='ELSE')
        source.sink(otq.Passthrough(old, drop_fields=True))
        source.sink(otq.Merge(identify_input_ts=False))
        source.source(if_branch_graph)
        source.node().add_rules(if_branch_rules)
def __rename_leading_omdseq(leading, names, result, sources, use_rename_ep=True):
    if leading is not None:
        if len(leading) == 1:
            leading = leading.pop()
            __rename_fields(result, {f"{leading}.OMDSEQ": "OMDSEQ"}, use_rename_ep=use_rename_ep)
        else:
            number, indexes = __get_number_and_indexes_of_sources_have_field(sources, "OMDSEQ")
            if number == 1:
                __rename_fields(result, {f"{names[indexes.pop()]}.OMDSEQ": "OMDSEQ"}, use_rename_ep=use_rename_ep)
            elif number:
                raise ValueError(
                    "Several sources was specified as leading and OMDSEQ field is presented in more than "
                    "one source. Resulted OMDSEQ can't be derived in such case."
                )
def __get_number_and_indexes_of_sources_have_field(sources, field):
    number = 0
    indexes = []
    for s in sources:
        if field in s.columns():
            indexes.append(number)
            number += 1
    return number, indexes
def __copy_and_rename_nodes_on_merge_join(result, names, sources, symbols):
    # shared eps between sources
    eps = defaultdict()
    if names is True:
        names = [f"__SRC_{n}__" for n in range(len(sources))]
    if not names:
        names = itertools.repeat(None)
    if sources:
        for name, src in zip(names, sources):
            obj = src
            if name:
                obj = src.copy()
                obj.sink(otq.Passthrough())
                obj.node_name(name)
            result.source(obj.node().copy_graph(eps))
            result.node().add_rules(obj.node().copy_rules())
            result._set_sources_dates(obj, copy_symbols=not bool(symbols))
    return names
def _is_table_after_merge_needed(need_table, used_columns):
    if not need_table:
        for key, value in used_columns.items():
            if not value:
                need_table = True
                break
    return need_table
def _collect_merged_columns(need_table, sources):
    merged_columns = sources[0].columns(skip_meta_fields=True)
    used_columns = {key: False for key in merged_columns.keys()}
    for src in sources[1:]:
        for key, value in src.columns(skip_meta_fields=True).items():
            if key in merged_columns:
                orig_type = merged_columns[key]
                try:
                    merged_dtype, merged_need_table = get_type_that_includes([orig_type, value])
                except ValueError as e:
                    raise ValueError(f"Column '{key}' has different types for "
                                     f"different branches: {orig_type} {value}") from e
                need_table |= merged_need_table
                merged_columns[key] = merged_dtype
            else:
                need_table = True
                merged_columns[key] = value
            if key in used_columns:
                used_columns[key] = True
    return merged_columns, need_table, used_columns
def concat(sources=None, add_table=True, symbols=None):
    """ Deprecated: Merges ticks from the sources into a single output _source ordered by the timestamp
    This function is deprecated due the wrong name notation.
    Use 'merge' instead.
    Parameters
    ----------
    sources : list
        List of sources to merge
    align_schema : bool
        If set to True, then table is added right after merge.
        We recommended to keep True to prevent problems with
        different tick schemas. Default: True
    Return
    ------
    A new _source that holds a result of the merged sources
    """
    warnings.warn("This function is deprecated due the wrong name notation. Use `merge` instead.", DeprecationWarning)
    return merge(sources=sources, align_schema=add_table, symbols=symbols)
[docs]def join(left, right, on, how="outer", rprefix="RIGHT", keep_fields_not_in_schema=False, output_type_index=None):
    """
    Joins two sources ``left`` and ``right`` based on ``on`` condition.
    In case you willing to add prefix/suffix to all columns in one of the sources you should use
    :func:`Source.add_prefix` or :func:`Source.add_suffix`
    Parameters
    ----------
    left: :class:`Source`
        left source to join
    right: :class:`Source`
        right source to join
    on: :py:class:`~onetick.py.Operation` or 'all' or 'same_size' or list of strings
        If 'all' joins every tick from ``left`` with every tick from ``right``.
        If 'same_size' and size of sources are same, joins ticks from two sources directly, else raises exception.
        If it is list of strings, then ticks with same ``on`` fields will be joined.
    how: 'inner' or 'outer'
        Joining type.
        Inner join will only produce ticks that matched the ``on`` condition.
        Outer join will also produce the ticks from the ``left`` source
        that didn't match the condition (so it's basically a left-outer join).
        Doesn't matter for ``on='same_size'``.
    rprefix: str
        The name of ``right`` data source. It will be added as prefix to overlapping columns arrived
        from right to result
    keep_fields_not_in_schema: bool
        If True - join function will try to preserve any fields of original sources that are not in the source schema,
        propagating them to output. This means a possibility of runtime error if fields are duplicating.
        If False, will remove all fields that are not in schema.
    output_type_index: int
        Specifies index of source in sources from which type and properties of output will be taken.
        Useful when joining sources that inherited from :class:`Source`.
        By default output object type will be :class:`Source`.
    Note
    ----
    ``join`` does some internal optimization in case of using time-based ``on`` conditions. Optimization doesn't apply
    if ``on`` expression has functions in it. So it is recommended to use addition/subtraction number of
    milliseconds (integers).
    See examples for more details.
    Returns
    -------
    :class:`Source` or same class as ``[left, right][output_type_index]``
        joined data
    See also
    --------
    **JOIN** OneTick event processor
    Examples
    --------
    >>> d1 = otp.Ticks({'ID': [1, 2, 3], 'A': ['a', 'b', 'c']})
    >>> d2 = otp.Ticks({'ID': [2, 3, 4], 'B': ['q', 'w', 'e']})
    Outer join:
    >>> data = otp.join(d1, d2, on=d1['ID'] == d2['ID'], how='outer')
    >>> otp.run(data)
                         Time  ID  A  RIGHT_ID  B
    0 2003-12-01 00:00:00.000   1  a         0
    1 2003-12-01 00:00:00.001   2  b         2  q
    2 2003-12-01 00:00:00.002   3  c         3  w
    Inner join:
    >>> data = otp.join(d1, d2, on=d1['ID'] == d2['ID'], how='inner')
    >>> otp.run(data)
                         Time  ID  A  RIGHT_ID  B
    0 2003-12-01 00:00:00.001   2  b         2  q
    1 2003-12-01 00:00:00.002   3  c         3  w
    Join all ticks:
    >>> data = otp.join(d1, d2, on='all')
    >>> otp.run(data)
                         Time  ID  A  RIGHT_ID  B
    0 2003-12-01 00:00:00.000   1  a         2  q
    1 2003-12-01 00:00:00.000   1  a         3  w
    2 2003-12-01 00:00:00.000   1  a         4  e
    3 2003-12-01 00:00:00.001   2  b         2  q
    4 2003-12-01 00:00:00.001   2  b         3  w
    5 2003-12-01 00:00:00.001   2  b         4  e
    6 2003-12-01 00:00:00.002   3  c         2  q
    7 2003-12-01 00:00:00.002   3  c         3  w
    8 2003-12-01 00:00:00.002   3  c         4  e
    Join same size sources:
    >>> data = otp.join(d1, d2, on='same_size')
    >>> otp.run(data)
                         Time  ID  A  RIGHT_ID  B
    0 2003-12-01 00:00:00.000   1  a         2  q
    1 2003-12-01 00:00:00.001   2  b         3  w
    2 2003-12-01 00:00:00.002   3  c         4  e
    Adding prefix to the right source for all columns:
    >>> d_right = d2.add_prefix('right_')
    >>> data = otp.join(d1, d_right, on=d1['ID'] == d_right['right_ID'])
    >>> otp.run(data)
                         Time  ID  A  right_ID  right_B
    0 2003-12-01 00:00:00.000   1  a         0
    1 2003-12-01 00:00:00.001   2  b         2        q
    2 2003-12-01 00:00:00.002   3  c         3        w
    This condition will be optimized during run time:
    >>> data = otp.join(d1, d2, on=(d1['ID'] == d2['ID']) & (d1['Time'] >= d2['Time']), how='outer')
    >>> otp.run(data)
                         Time  ID  A  RIGHT_ID  B
    0 2003-12-01 00:00:00.000   1  a         0
    1 2003-12-01 00:00:00.001   2  b         2  q
    2 2003-12-01 00:00:00.002   3  c         3  w
    This condition won't be optimized during run time since in transforms addition to time into function.
    So please note, this way of using ``join`` is not recommended.
    >>> data = otp.join(d1, d2, on=(d1['ID'] == d2['ID']) & (d1['Time'] >= d2['Time'] + otp.Milli(1)), how='outer')
    >>> otp.run(data)
                         Time  ID  A  RIGHT_ID  B
    0 2003-12-01 00:00:00.000   1  a         0
    1 2003-12-01 00:00:00.001   2  b         2  q
    2 2003-12-01 00:00:00.002   3  c         3  w
    In such cases (adding/subtracting constants to time) adding/subtraction number of milliseconds should be done.
    This example will return exactly the same result as previous one, but it will be optimized, so runtime will be
    shorter.
    >>> data = otp.join(d1, d2, on=(d1['ID'] == d2['ID']) & (d1['Time'] >= d2['Time'] + 1), how='outer')
    >>> otp.run(data)
                             Time  ID  A  RIGHT_ID  B
        0 2003-12-01 00:00:00.000   1  a         0
        1 2003-12-01 00:00:00.001   2  b         2  q
        2 2003-12-01 00:00:00.002   3  c         3  w
    ``on`` can be list of strings:
    >>> left = otp.Ticks(A=[1, 2, 3], B=[4, 6, 7])
    >>> right = otp.Ticks(A=[2, 3, 4], B=[6, 9, 8], C=[7, 2, 0])
    >>> data = otp.join(left, right, on=['A', 'B'], how='inner')
    >>> otp.run(data)
                             Time  A  B  C
        0 2003-12-01 00:00:00.001  2  6  7
    """
    output_type = output_type_by_index((left, right), output_type_index)
    on_list = []
    if isinstance(on, list):
        for column in on:
            if column not in left.schema:
                raise ValueError(f'`{column}` column does not exist in the left source.')
            if column not in right.schema:
                raise ValueError(f'`{column}` column does not exist in the right source.')
        if len(on) == 0:
            raise ValueError('`on` parameter can not be empty list.')
        on_list = on
        on = (left[on_list[0]] == right[on_list[0]])
        for column in on_list[1:]:
            on = on & (left[column] == right[column])
    timezone_hack = None
    if re.search(r'\b_TIMEZONE\b', str(on)):
        # join does not support using _TIMEZONE pseudo-field in join_criteria,
        # replacing it with temporary fields in the branches
        timezone_hack = '__TIMEZONE_HACK__'
        left[timezone_hack] = left['_TIMEZONE']
        right[timezone_hack] = right['_TIMEZONE']
    if str(on) == "all":
        on = f'1 = 1 or {rprefix}.TIMESTAMP >= 0'
    _LEFT_NODE_NAME = "__SRC_LEFT__"  # this is internal name
    _RIGHT_NODE_NAME = rprefix
    initial_left_use_name_for_column_prefix = left.use_name_for_column_prefix()
    initial_right_use_name_for_column_prefix = right.use_name_for_column_prefix()
    initial_left_source_node_name = left.node_name()
    initial_right_source_node_name = right.node_name()
    left.use_name_for_column_prefix(True)
    left.node_name(_LEFT_NODE_NAME)
    # we have to add _source prefix to all column operations
    # `on` expression is written with right, so we should modify it, we will restore it later
    right.use_name_for_column_prefix(True)  # add prefix to every operation with that table in on
    right.node_name(_RIGHT_NODE_NAME)
    columns_name_set = set()
    columns = {}
    fields_to_skip_right_source = {'TIMESTAMP'}
    for name, dtype in chain(left.columns(skip_meta_fields=True).items(), right.columns(skip_meta_fields=True).items()):
        if name in columns_name_set:
            columns[_RIGHT_NODE_NAME + "_" + name] = dtype
            fields_to_skip_right_source.add(name)
        else:
            columns[name] = dtype
            columns_name_set.add(name)
    if how == "outer":
        join_type = "LEFT_OUTER"
    elif how == "inner":
        join_type = "INNER"
    else:
        raise ValueError("The 'how' parameter has wrong value. Only 'outer' and 'inner' are supported")
    if timezone_hack:
        on = re.sub(r'\._TIMEZONE\b', f'.{timezone_hack}', str(on))
        on = re.sub(r'\b_TIMEZONE\b', f'{_LEFT_NODE_NAME}.{timezone_hack}', str(on))
    # ------------------
    # create objects
    params = {"join_criteria": str(on), "join_type": join_type, "left_source": _LEFT_NODE_NAME}
    # return states of sources back
    left.use_name_for_column_prefix(initial_left_use_name_for_column_prefix)
    left.node_name(initial_left_source_node_name)
    right.use_name_for_column_prefix(initial_right_use_name_for_column_prefix)
    right.node_name(initial_right_source_node_name)
    if str(on) == "same_size":
        result = output_type(node=otq.JoinSameSizeTs(), **columns)
    else:
        result = output_type(node=otq.Join(**params), **columns)
    __copy_sources_on_merge_or_join(result, (left, right),
                                    names=(_LEFT_NODE_NAME, _RIGHT_NODE_NAME),
                                    output_type_index=output_type_index)
    rename_fields_dict = {}
    for lc, rc in zip_longest(left.columns(skip_meta_fields=True), right.columns(skip_meta_fields=True)):
        if lc:
            rename_fields_dict[f"{_LEFT_NODE_NAME}.{lc}"] = lc
        if rc:
            if rc not in fields_to_skip_right_source:
                rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"] = rc
            else:
                rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"] = f"{_RIGHT_NODE_NAME}_{rc}"
    __rename_fields(result, rename_fields_dict)
    result.sink(otq.Passthrough(fields=_LEFT_NODE_NAME + ".TIMESTAMP", drop_fields=True))
    items = []
    for name, dtype in result.columns(skip_meta_fields=True).items():
        items.append(ott.type2str(dtype) + " " + name)
    if keep_fields_not_in_schema:
        # Here we try to preserve fields of original sources that were not in schema
        # in their original form. If there's a duplication of fields or any other problem
        # in runtime, we'll be able to do nothing
        result.sink(otq.Passthrough(fields=_RIGHT_NODE_NAME + ".TIMESTAMP", drop_fields=True))
        result.sink(otq.RenameFieldsEp(rename_fields=rf"{_LEFT_NODE_NAME}\.(.*)=\1,{_RIGHT_NODE_NAME}\.(.*)=\1",
                                       use_regex=True))
        result.sink(otq.Table(fields=",".join(items), keep_input_fields=True))
    else:
        result.sink(otq.Table(fields=",".join(items)))
    if timezone_hack:
        result = result.drop([
            field for field in result.schema
            if field.endswith(timezone_hack)
        ])
        left.drop(timezone_hack, inplace=True)
        right.drop(timezone_hack, inplace=True)
    for column in on_list:
        result.drop(f'{_RIGHT_NODE_NAME}_{column}', inplace=True)
    return result 
[docs]def join_by_time(sources, how="outer", on=None, policy=None, check_schema=True, leading=0,
                 match_if_identical_times=None, output_type_index=None, use_rename_ep=True):
    """
    Joins ticks from multiple input time series, based on input tick timestamps.
    ``leading`` source tick joined with already arrived ticks from other sources.
    >>> leading = otp.Ticks(A=[1, 2], offset=[1, 3])
    >>> other = otp.Ticks(B=[1], offset=[2])
    >>> otp.join_by_time([leading, other])()
                         Time  A  B
    0 2003-12-01 00:00:00.001  1  0
    1 2003-12-01 00:00:00.003  2  1
    In case you willing to add prefix/suffix to all columns in one of the sources you should use
    :func:`Source.add_prefix` or :func:`Source.add_suffix`
    Parameters
    ----------
    sources: Collection[:class:`Source`]
        The collection of Source objects which will be joined
    how: 'outer' or 'inner'
        The method of join (inner or outer)
    on: Collection[:class:`Column`]
        ``on`` add an extra check to join - only ticks with same ``on`` fields will be joined
        >>> leading = otp.Ticks(A=[1, 2], offset=[1, 3])
        >>> other = otp.Ticks(A=[2, 2], B=[1, 2], offset=[0, 2])
        >>> otp.join_by_time([leading, other], on=['A'])()
                             Time  A  B
        0 2003-12-01 00:00:00.001  1  0
        1 2003-12-01 00:00:00.003  2  2
    policy: 'arrival_order', 'latest_ticks', 'each_for_leader_with_first' or 'each_for_leader_with_latest'
        Policy of joining ticks with same timestamps
        >>> leading = otp.Ticks(A=[1, 2], offset=[0, 0], OMDSEQ=[0, 3])
        >>> other = otp.Ticks(B=[1, 2], offset=[0, 0], OMDSEQ=[2, 4])
        Note: in the examples below we assume that all ticks have same timestamps, but order of ticks as in example.
        OMDSEQ is a special field that store order of ticks with same timestamp
        - ``arrival_order``
          output tick generated on arrival of ``leading`` source tick
        >>> data = otp.join_by_time([leading, other], policy='arrival_order')
        >>> otp.run(data)[['Time', 'A', 'B']]
                Time  A  B
        0 2003-12-01  1  0
        1 2003-12-01  2  1
        - ``latest_ticks``
          Tick generated at the time of expiration of a particular timestamp (when all ticks from all sources
          for current timestamp arrived). Only latest tick from ``leading`` source will be used.
        >>> data = otp.join_by_time([leading, other], policy='latest_ticks')
        >>> otp.run(data)[['Time', 'A', 'B']]
                Time  A  B
        0 2003-12-01  2  2
        - ``each_for_leader_with_first``
          Each tick from ``leading`` source will be joined with first tick from other sources for current timestamp
        >>> data = otp.join_by_time(
        ...     [leading, other],
        ...     policy='each_for_leader_with_first'
        ... )
        >>> otp.run(data)[['Time', 'A', 'B']]
                Time  A  B
        0 2003-12-01  1  1
        1 2003-12-01  2  1
        - ``each_for_leader_with_latest``
          Each tick from ``leading`` source will be joined with last tick from other sources for current timestamp
        >>> data = otp.join_by_time(
        ...     [leading, other],
        ...     policy='each_for_leader_with_latest'
        ... )
        >>> otp.run(data)[['Time', 'A', 'B']]
                Time  A  B
        0 2003-12-01  1  2
        1 2003-12-01  2  2
    check_schema: bool
        If True onetick.py will check that all columns names are unambiguous
        and columns listed in `on` param are exists in sources schema.
        Which can lead to false positive error
        in case of some event processors were sink to Source. To avoid this set check_scheme to False.
    leading: int, 'all', :class:`Source`, list of int, list of :class:`Source`
        A list sources or their indexes. If this parameter is 'all', every source is considered to be leading.
    match_if_identical_times: bool
        A True value of this parameter causes an output tick to be formed from input ticks with identical timestamps
        only. If ``on`` is set to 'outer', default values of fields (``otp.nan``, 0, empty string) are propagated for
        sources that did not tick at a given timestamp.
        If this parameter is set to True, the default value of ``policy`` parameter is set to 'latest_ticks'.
    output_type_index: int
        Specifies index of source in ``sources`` from which type and properties of output will be taken.
        Useful when joining sources that inherited from :class:`Source`.
        By default output object type will be :class:`Source`.
    use_rename_ep: bool
        Use :py:class:`onetick.query.RenameFields` event processor or not.
        This event processor can't be used in generic aggregation.
    See also
    --------
    **JOIN_BY_TIME** OneTick event processor
    Examples
    --------
    >>> d1 = otp.Ticks({'A': [1, 2, 3], 'offset': [1, 2, 3]})
    >>> d2 = otp.Ticks({'B': [1, 2, 3], 'offset': [1, 2, 4]})
    >>> otp.join_by_time([d1, d2])()
                         Time  A  B
    0 2003-12-01 00:00:00.001  1  0
    1 2003-12-01 00:00:00.002  2  1
    2 2003-12-01 00:00:00.003  3  2
    >>> otp.join_by_time([d1, d2], leading=1)()
                         Time  A  B
    0 2003-12-01 00:00:00.001  1  1
    1 2003-12-01 00:00:00.002  2  2
    2 2003-12-01 00:00:00.004  3  3
    >>> otp.join_by_time([d1, d2], leading=1, match_if_identical_times=True)()
                         Time  A  B
    0 2003-12-01 00:00:00.001  1  1
    1 2003-12-01 00:00:00.002  2  2
    2 2003-12-01 00:00:00.004  0  3
    Adding prefix to right source for all columns:
    >>> otp.join_by_time([d1, d2.add_prefix('right_')])()
                         Time  A  right_B
    0 2003-12-01 00:00:00.001  1        0
    1 2003-12-01 00:00:00.002  2        1
    2 2003-12-01 00:00:00.003  3        2
    Returns
    -------
    :class:`Source` or same class as ``sources[output_type_index]``
        A time series of ticks.
    """
    output_type = output_type_by_index(sources, output_type_index)
    join_str_keys = []
    # if key is set, then generalize it, ie convert into list;
    # then remove keys from 'columns_count' dict to pass validation after
    if on is not None:
        if isinstance(on, list):
            pass
        elif isinstance(on, Column):
            on = [on]
        elif isinstance(on, str):
            on = [on]
        else:
            raise TypeError(f"It is not supported to have '{type(on)}' type as a key")
        for join_key in on:
            dtypes = set()
            if check_schema:
                for source in sources:
                    try:
                        key_type = source.schema[str(join_key)]
                    except KeyError as e:
                        raise KeyError(f"Column '{join_key}' not found in source schema {source}") from e
                    type_name = ott.type2str(key_type)
                    if type_name == "string[64]":
                        type_name = "string"
                    dtypes.add(type_name)
                if len(dtypes) > 1:
                    raise TypeError(f"Column '{join_key}' has different types in sources: {dtypes}")
            if isinstance(join_key, Column):
                join_str_keys.append(str(join_key))
            elif isinstance(join_key, str):
                join_str_keys.append(join_key)
    if check_schema:
        _check_schema_for_join_by_time(join_str_keys, sources)
    if how not in ["inner", "outer"]:
        raise Exception('Wrong value for the "how" parameter. It is allowed to use "inner" or "outer" values')
    join_type = how.upper()
    # ------------------
    # create objects
    params = {"add_source_prefix": False, "join_type": join_type}
    leading = _fill_leading_sources_param(leading, params, sources)
    if on is not None:
        params["join_keys"] = ",".join(join_str_keys)
    if policy is not None:
        policies = {"arrival_order", "latest_ticks", "each_for_leader_with_first", "each_for_leader_with_latest"}
        if policy.lower() not in policies:
            raise ValueError("Invalid policy. Only the following ones are allowed: " + ", ".join(policies) + ".")
        params["same_timestamp_join_policy"] = policy.upper()
    if match_if_identical_times is not None:
        params["match_if_identical_times"] = match_if_identical_times
    columns = {name: dtype for src in sources for name, dtype in src.columns(skip_meta_fields=True).items()}
    result = output_type(node=otq.JoinByTime(**params), **columns)
    __copy_sources_on_merge_or_join(result, sources,
                                    names=True,
                                    drop_meta=True,
                                    leading=leading,
                                    output_type_index=output_type_index,
                                    use_rename_ep=use_rename_ep)
    if how == "outer":
        items = []
        for name, dtype in result.columns(skip_meta_fields=True).items():
            items.append(ott.type2str(dtype) + " " + name)
        result.sink(otq.Table(fields=",".join(items), keep_input_fields=True))
    return result 
@singledispatch
def _fill_leading_sources_param(leading, params, sources):
    from onetick.py.core.source import _Source
    if isinstance(leading, _Source):  # TODO: PY-104 Get rid of circular dependencies in code to avoid local import
        result = f"__SRC_{__find_by_id(sources, leading)}__"
        params["leading_sources"] = result
        result = [result]
    elif leading == "all":  # all sources are leading which is specified by empty string
        params["leading_sources"] = ""
        result = []
    else:
        raise ValueError(
            "wrong leading param was specified, please use any of int, 'all' literal, " "list of int, list of _Source"
        )
    return result
@_fill_leading_sources_param.register(int)
def _(leading, params, sources):
    if leading < 0:
        leading = len(sources) + leading
    if 0 <= leading < len(sources):
        result = f"__SRC_{leading}__"
        params["leading_sources"] = result
        return [result]
    else:
        raise ValueError(
            f"leading source index should be in range(-len(source), len(source)), " f"but {leading} was specified."
        )
@_fill_leading_sources_param.register(list)  # type: ignore  # _ already defined above
@_fill_leading_sources_param.register(tuple)
def _(leading, params, sources):
    if len(leading) > len(sources):
        raise ValueError("Number of leading sources can't be bigger number of sources")
    elif len(leading) == len(sources):
        Warning(
            "You've specified leading and source lists of the same size, you can specify leading='all' " "instead of it"
        )
    if isinstance(leading[0], int):
        result = leading
    else:
        result = [__find_by_id(sources, lead) for lead in leading]
    indexes = ",".join(f"__SRC_{i}__" for i in result)
    params["leading_sources"] = indexes
    return result
def __find_by_id(collection, item):
    for index, s in enumerate(collection):
        if s is item:
            return index
    raise ValueError("The leading source should be in join sources list")
def _check_schema_for_join_by_time(join_str_keys, sources):
    # check that there aren't matching columns
    columns_count = Counter()
    for src in sources:
        columns_count.update(src.columns(skip_meta_fields=True).keys())
    for join_key in join_str_keys:
        del columns_count[join_key]
    matched = [k for k, value in columns_count.items() if value > 1]
    if "OMDSEQ" in matched:
        # OMDSEQ behaves like the TIMESTAMP field
        matched.remove("OMDSEQ")
    if len(matched):
        raise Exception(f"There are matched columns between sources: {','.join(matched)}")
def apply_query(query,
                in_sources=None,
                output_pins=None,
                shared_state_variables_list=None,
                output_type_index=None,
                **params):
    from onetick.py.sources import query as otp_query
    output_type = output_type_by_index(in_sources, output_type_index)
    output_pins = output_pins if output_pins else []
    in_sources = in_sources if in_sources else {}
    shared_state_variables_list = shared_state_variables_list if shared_state_variables_list else []
    if isinstance(query, str):
        # it seems that path is passed
        query = otp_query(query, **params)
    elif isinstance(query, otp_query):
        if params:
            query.update_params(**params)
    # elif
    columns = {}
    for src in in_sources.values():
        columns.update(src.columns(skip_meta_fields=True))
    str_params = query.str_params
    shared_state_variables = ",".join(shared_state_variables_list)
    inputs_need_unbound_symbols = {in_pin: src._is_unbound_required() for in_pin, src in in_sources.items()}
    if query.graph_info.has_unbound_if_pinned(inputs_need_unbound_symbols):
        symbol = adaptive
    else:
        symbol = None
    nested_src = output_type(
        node=otq.NestedOtq(query.path, str_params, shared_state_variables=shared_state_variables),
        _has_output=len(output_pins) > 0,
        _symbols=symbol,
        **columns,
    )
    eps = defaultdict()
    for in_pin, src in in_sources.items():
        nested_src.source(src.node().copy_graph(eps), in_pin)
        nested_src.node().add_rules(src.node().copy_rules())
        nested_src._set_sources_dates(src)
        nested_src._merge_tmp_otq(src)
    if len(output_pins) == 0:
        # output_pins = ['OUT']
        return nested_src
    if len(output_pins) > 1:
        result = []
        for out_pin in output_pins:
            res_src = nested_src.copy()
            res_src.node().out_pin(out_pin)
            # NOTE: need to comment out this node
            res_src.sink(otq.Passthrough())
            # apply config customization
            query.config.apply(out_pin, res_src)
            result.append(res_src)
        return tuple(result)
    else:
        # TODO: move setting out_pin on the creating step of nested_src
        # It seems as not working now, because seems .copy() of _Source doesnt
        # copy out_pin reference, need to check
        nested_src.node().out_pin(output_pins[0])
        # apply config customization
        query.config.apply(output_pins[0], nested_src)
        return nested_src
def apply(query, *args, **kwargs):
    return apply_query(query.path, *args, **kwargs, **query.params)
[docs]def cut(column: 'Column', bins: Union[int, List[float]], labels: List[str] = None):
    """
    Bin values into discrete intervals (mimics :pandas:`pandas.cut`).
    Parameters
    ----------
    column: :py:class:`~onetick.py.Column`
        Column with numeric data used to build bins.
    bins: int or List[float]
        When List[float] - defines the bin edges.
        When int - Defines the number of equal-width bins in the range of x.
    labels: List[str]
        Labels used to name resulting bins.
        If not set, bins are numeric intervals like (5.0000000000, 7.5000000000].
    Return
    ------
    object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__`
    Examples
    --------
    >>> # OTdirective: snippet-name: Source.functions.cut;
    >>> data = otp.Ticks({"X": [9, 8, 5, 6, 7, 0, ]})
    >>> data['bin'] = otp.cut(data['X'], bins=3, labels=['a', 'b', 'c'])
    >>> otp.run(data)[['X', 'bin']]
       X bin
    0  9   c
    1  8   c
    2  5   b
    3  6   b
    4  7   c
    5  0   a
    """
    src = column.obj_ref
    return _CutBuilder(src, column, bins, labels=labels) 
[docs]def qcut(column: 'Column', q: Union[int, List[float]], labels: List[str] = None):
    """
    Quantile-based discretization function (mimics :pandas:`pandas.qcut`).
    Parameters
    ----------
    column: :py:class:`~onetick.py.Column`
        Column with numeric data used to build bins.
    q: int or List[float]
        When List[float] - array of quantiles, e.g. [0, .25, .5, .75, 1.] for quartiles.
        When int - Number of quantiles. 10 for deciles, 4 for quartiles, etc.
    labels: List[str]
        Labels used to name resulting bins.
        If not set, bins are numeric intervals like (5.0000000000, 7.5000000000].
    Return
    ------
    object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__`
    Examples
    --------
    >>> # OTdirective: snippet-name: Source.functions.qcut;
    >>> data = otp.Ticks({"X": [10, 3, 5, 6, 7, 1]})
    >>> data['bin'] = otp.qcut(data['X'], q=3, labels=['a', 'b', 'c'])
    >>> otp.run(data)[['X', 'bin']]
        X bin
    0  10   c
    1   3   a
    2   5   b
    3   6   b
    4   7   c
    5   1   a
    """
    # TODO when q is a List[float] like [0, .25, .5, .75, 1.]
    src = column.obj_ref
    return _QCutBuilder(src, column, q, labels=labels) 
[docs]def coalesce(sources, max_source_delay: float = 0.0, output_type_index: int = None):
    """
    Used to fill the gaps in one time series with the ticks from one or several other time series.
    This event processor considers ticks that arrive from several sources at the same time as being the same,
    allowing for possible delay across the sources when determining whether the ticks are the same.
    When the same tick arrives from several sources, it is only propagated from the source
    that has the highest priority among those sources.
    Input ticks do not necessarily have the same structure - they can have different fields.
    In order to distinguish time series the event processor adds the SYMBOL_NAME field.
    Also SOURCE field is added to each tick which lacks it to identify the source from which the tick is coming.
    Hence, one must avoid adding SOURCE field in event processors positioned after COALSECE.
    Parameters
    ----------
    sources: list of :class:`Source`
        List of the sources to coalesce. Also, this list is treated as priority order.
        First member of the list has the highest priority when determining whether ticks are the same.
    max_source_delay: float
        The maximum time in seconds by which a tick from one input time series
        can arrive later than the same tick from another time series.
    output_type_index: int
        Specifies index of source in ``sources`` from which type and properties of output will be taken.
        Useful when merging sources that inherited from :class:`Source`.
        By default, output object type will be :class:`Source`.
    Return
    ------
    :class:`Source`
        A time series of ticks.
    See also
    --------
    **COALESCE** OneTick event processor
    Examples
    --------
    If ticks from different sources have the same time,
    only the tick from source with the highest priority will be propagated.
    >>> data1 = otp.Ticks(A=[1, 2])
    >>> data2 = otp.Ticks(A=[3, 4])
    >>> data = otp.coalesce([data2, data1])
    >>> otp.run(data)[['Time', 'A']]
                         Time A
    0 2003-12-01 00:00:00.000 3
    1 2003-12-01 00:00:00.001 4
    We can use ``max_source_delay`` parameter to expand time interval in which
    ticks are considered to have the "same time".
    >>> data1 = otp.Ticks({
    ...     'A': [1, 2, 3],
    ...     'offset': [0, 3000, 6000],
    ... })
    >>> data2 = otp.Ticks({
    ...     'A': [4, 5, 6],
    ...     # 4 is delayed by less than one second from 1
    ...     # 5 is delayed by one second from 2
    ...     # 6 is delayed by more than one second from 3
    ...     'offset': [999, 4000, 7001],
    ... })
    >>> data = otp.coalesce([data2, data1], max_source_delay=1)
    >>> otp.run(data)[['Time', 'A']]
                         Time  A
    0 2003-12-01 00:00:00.999  4
    1 2003-12-01 00:00:04.000  5
    2 2003-12-01 00:00:06.000  3
    3 2003-12-01 00:00:07.001  6
    """
    if not sources:
        raise ValueError("Coalesce should have one or more inputs")
    output_type = output_type_by_index(sources, output_type_index)
    # change node names for sources, COALESCE ep needs them
    new_node_names = [
        f'__COALESCE_SRC_{i}__' for i, source in enumerate(sources, start=1)
    ]
    node = otq.Coalesce(
        priority_order=','.join(new_node_names),
        max_source_delay=max_source_delay,
    )
    columns = {
        # these fields will be added by COALESCE ep
        'SYMBOL_NAME': str,
        'TICK_TYPE': str,
    }
    for source in sources:
        for name in ['SYMBOL_NAME', 'TICK_TYPE']:
            if name in source.schema:
                raise ValueError(f"Field with name '{name}' is already present in the source. "
                                 'Please, rename or delete that field prior to invoking coalesce().')
        shared_columns = set(source.schema).intersection(columns)
        for name in shared_columns:
            type_1, type_2 = source.schema[name], columns[name]
            if type_1 != type_2:
                raise ValueError(f"Conflicting types for field '{name}' in different sources: {type_1}, {type_2}")
        columns.update(source.schema)
    # TODO: do we need field SOURCE (especially when node names are auto-generated)?
    # this field will be added by COALESCE if it's not presented in sources
    columns.setdefault('SOURCE', str)
    result = output_type(node, **columns)
    __copy_sources_on_merge_or_join(result, sources, names=new_node_names, output_type_index=output_type_index)
    return result 
[docs]def corp_actions(source,
                 adjustment_date: Union[ott.date, ott.datetime, int, str, None] = None,
                 adjustment_date_tz: str = default,
                 fields=None,
                 adjust_rule="PRICE",
                 apply_split: bool = True,
                 apply_spinoff: bool = False,
                 apply_cash_dividend: bool = False,
                 apply_stock_dividend: bool = False,
                 apply_security_splice: bool = False,
                 apply_others: str = "",
                 apply_all: bool = False,
                 ):
    """Adjusts values using corporate actions information loaded into OneTick
    from the reference data file. To use it, location of reference database must
    be specified via OneTick configuration.
    Parameters
    ----------
    source : onetick.py.Source
        Source object adjusted by corporate actions information.
    adjustment_date : otp.date, otp.datetime, int, str, None, optional
        The date as of which the values are adjusted. `int` format is YYYYMMDD.
        If it is not set, the values are adjusted as of the end date in the query.
        All corporate actions of the types specified in the parameters
        that lie between the tick timestamp and the adjustment date will be applied to each tick.
        Notice that the adjustment date is not affected neither by _SYMBOL_PARAM._PARAM_END_TIME
        nor by the apply_times_daily setting.
        When parameter is in YYYYMMDD format, the time is assumed to be 17:00:00 GMT.
        By default None
    adjustment_date_tz : str, optional
        Timezone for adjustment date, by default global `otp.config.tz` value used.
        Local timezone can't be used so in this case parameter is set to 'GMT'.
        When `adjustment_date` is in YYYYMMDD format, `adjustment_date_tz` parameter is set to 'GMT'.
    fields : str, optional
        A comma-separated list of fields to be adjusted. If this parameter is not set,
        some default adjustments will take place if appropriately named fields exist in the tick:
        - If the ADJUST_RULE parameter is set to PRICE, and the PRICE field is present,
          it will get adjusted. If the fields ASK_PRICE or BID_PRICE are present, they will get adjusted.
          If fields ASK_VALUE or BID_VALUE are present, they will get adjusted
        - If the ADJUST_RULE parameter is set to SIZE, and the SIZE field is present,
          it will get adjusted. If the fields ASK_SIZE or BID_SIZE are present, they will get adjusted.
          If fields ASK_VALUE or BID_VALUE are present, they will get adjusted.
        By default None
    adjust_rule : str, optional
        When set to PRICE, adjustments are applied under the assumption that fields to be adjusted contain prices
        (adjustment direction is determined appropriately).
        When set to SIZE, adjustments are applied under the assumption that fields contain sizes
        (adjustment direction is opposite to that when the parameter's value is PRICE).
        By default "PRICE"
    apply_split : bool, optional
        If true, adjustments for splits are applied, by default `True`
    apply_spinoff : bool, optional
        If true, adjustments for spin-offs are applied, by default `False`
    apply_cash_dividend : bool, optional
        If true, adjustments for cash dividends are applied, by default `False`
    apply_stock_dividend : bool, optional
        If true, adjustments for stock dividends are applied, by default `False`
    apply_security_splice : bool, optional
        If true, adjustments for security splices are applied, by default `False`
    apply_others : str, optional
        A comma-separated list of names of custom adjustment types to apply, by default ""
    apply_all : bool, optional
        If true, applies all types of adjustments, both built-in and custom, by default `False`
    Returns
    -------
    onetick.py.Source
        A new source object with applied adjustments.
    See also
    --------
    **CORP_ACTIONS** OneTick event processor
    Examples
    --------
    >>> src = otp.DataSource('NYSE_TAQ',
    ...                      tick_type='TRD',
    ...                      start=otp.dt(2022, 5, 20, 9, 30),
    ...                      end=otp.dt(2022, 5, 26, 16))
    >>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22))
    >>> df["PRICE"][0]
    0.0911
    >>> src = otp.corp_actions(src,
    ...                        adjustment_date=otp.date(2022, 5, 22),
    ...                        fields="PRICE",)
    >>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22))
    >>> df["PRICE"][0]
    1.36649931675
    """
    if not isinstance(adjustment_date, (ott.date, int, type(None))):
        adjustment_date = ott.date(adjustment_date)
    if isinstance(adjustment_date, ott.date):
        adjustment_date = int(adjustment_date.to_str())
    adjustment_date_tz_is_default = False
    if adjustment_date_tz is default:
        adjustment_date_tz_is_default = True
        adjustment_date_tz = config.tz
    if not adjustment_date_tz:
        warnings.warn("Local timezone can't be used in parameter 'adjustment_date_tz', setting to 'GMT'.")
        adjustment_date_tz = 'GMT'
    if isinstance(adjustment_date, (ott.date, int, str)) and adjustment_date_tz != 'GMT':
        adjustment_date_tz = 'GMT'
        if not adjustment_date_tz_is_default:
            warnings.warn("`adjustment_date_tz` was changed to 'GMT' since "
                          "it is the only valid value when `adjustment_date` is in YYYYMMDD format.")
    source.sink(otq.CorpActions(
        adjustment_date=adjustment_date,
        adjustment_date_tz=adjustment_date_tz,
        fields=fields,
        adjust_rule=adjust_rule,
        apply_split=apply_split,
        apply_spinoff=apply_spinoff,
        apply_cash_dividend=apply_cash_dividend,
        apply_stock_dividend=apply_stock_dividend,
        apply_security_splice=apply_security_splice,
        apply_others=apply_others,
        apply_all=apply_all,
    ))
    return source 
def save_sources_to_single_file(sources,
                                file_path=None,
                                file_suffix='',
                                start=None,
                                end=None,
                                start_time_expression=None,
                                end_time_expression=None,
                                timezone=None,
                                running_query_flag=None):
    """
    Save onetick.py.Source objects to the single file.
    Parameters
    ----------
    sources: dict or list
        dict of names -> sources or list of sources to merge into single file.
        If it's the list then names will be autogenerated.
        Source can be :class:`otp.Source` object or dictionary with these allowed parameters:
        {
            'source': otp.Source,
            'start': datetime(2022, 1, 1),         # optional
            'end': datetime(2022, 1, 2),           # optional
            'symbols': otp.Source or otp.Symbols,  # optional
        }
    file_path: str, optional
        Path to the file where all sources will be saved.
        If not set, sources will be saved to temporary file and its name will be returned.
    file_suffix: str
        Only used if ``file_path`` is not set.
        This suffix will be added to the name of a generated query file.
    start: datetime, optional
        start time for the resulting query file
    end: datetime, optional
        end time for the resulting query file
    start_time_expression: str, optional
        start time expression for the resulting query file
    end_time_expression: str, optional
        end time expression for the resulting query file
    timezone: str, optional
        timezone for the resulting query file
    running_query_flag: bool, optional
        running query flag for the resulting query file
    Returns
    -------
        If `sources` is list then returns list of full query paths (path_to_file::query_name)
        with autogenerated names corresponding to each source from `sources`.
        If `sources` is dict then the path to the query file is returned.
    """
    if isinstance(sources, dict):
        names = sources.keys()
        sources = sources.values()
        query_names = None
    else:
        names = repeat(None)
        query_names = []
    tmp_otq = TmpOtq()
    for name, source in zip(names, sources):
        query_start = query_end = query_symbols = None
        if isinstance(source, dict):
            query_start = source.get('start')
            query_end = source.get('end')
            query_symbols = source.get('symbols')
            source = source['source']
        query_name = source._store_in_tmp_otq(tmp_otq,
                                              name=name,
                                              start=query_start,
                                              end=query_end,
                                              symbols=query_symbols)
        if query_names is not None:
            query_names.append(query_name)
    file_path = tmp_otq.save_to_file(
        file_path=file_path,
        file_suffix=file_suffix,
        start=start,
        end=end,
        start_time_expression=start_time_expression,
        end_time_expression=end_time_expression,
        timezone=timezone,
        running_query_flag=running_query_flag,
    )
    if query_names is not None:
        return [f'{file_path}::{query_name}' for query_name in query_names]
    return file_path
class _FormatType(Enum):
    POSITIONAL = 1
    OMITTED_POSITIONAL = 2
    KEY_WORD = 3
def _add_element(cur_res, element, format_spec_additional=None):
    if isinstance(element, Operation):
        if format_spec_additional is None:
            cur_res += element.apply(str)
        elif issubclass(element.dtype, float) and re.fullmatch(r'\.\d+f', format_spec_additional):
            # float has strange behavior when precision=0
            decimal_elem = element.apply(ott.decimal)
            cur_res += decimal_elem.decimal.str(re.findall(r'\d+', format_spec_additional)[0])
        elif issubclass(element.dtype, (ott.nsectime, ott.msectime)):
            cur_res += element.dt.strftime(format_spec_additional)
        else:
            raise ValueError(f'Unsupported formatting `{format_spec_additional}` for field type {element.dtype}')
    else:
        if format_spec_additional is None:
            cur_res += str(element)
        elif isinstance(element, float):
            formatting = f'{{:{format_spec_additional}}}'
            cur_res += formatting.format(element)
        else:
            raise ValueError(f'Unsupported formatting `{format_spec_additional}` for literal {type(element)}')
    return cur_res
def _validate_format_line(format_line: str):
    open_brackets_num = 0
    close_brackets_num = 0
    for symbol in format_line:
        if symbol == '{':
            open_brackets_num += 1
        if symbol == '}':
            close_brackets_num += 1
        if open_brackets_num > close_brackets_num + 1:
            raise ValueError("'{' appeared before previous '{' was closed")
        if open_brackets_num < close_brackets_num:
            raise ValueError("Single '}' encountered in format string")
    if open_brackets_num != close_brackets_num:
        raise ValueError("Single '{' encountered in format string")
def _get_format_type(format_array: List[str]) -> _FormatType:
    if len(format_array) < 2:
        return _FormatType.OMITTED_POSITIONAL
    format_spec_array = format_array[1::2]
    uses_positional = False
    uses_omitted_positional = False
    uses_key_word = False
    for format_spec in format_spec_array:
        format_spec_param = format_spec.split(':')[0]
        if not format_spec_param:
            uses_omitted_positional = True
        elif format_spec_param[0].isdigit():
            if not format_spec_param.isnumeric():
                raise ValueError(f'Incorrect positional argument: `{format_spec_param}`')
            uses_positional = True
        elif format_spec_param[0].isalpha():
            if not re.fullmatch('[a-zA-Z0-9_]+', format_spec_param):
                raise ValueError(f'Incorrect key word argument: `{format_spec_param}`')
            uses_key_word = True
        else:
            raise ValueError(f'Unrecognised format specification: `{format_spec_param}`')
    if uses_positional and not (uses_omitted_positional or uses_key_word):
        return _FormatType.POSITIONAL
    if uses_omitted_positional and not (uses_positional or uses_key_word):
        return _FormatType.OMITTED_POSITIONAL
    if uses_key_word and not (uses_positional or uses_omitted_positional):
        return _FormatType.KEY_WORD
    raise ValueError("Format string has mixed type of referring to arguments which is not allowed")