Source code for onetick.py.functions

import itertools
import warnings
import inspect
import datetime
import re
from collections import defaultdict, Counter
from functools import singledispatch
from itertools import chain, zip_longest, repeat
from typing import List, Union, Sequence

import onetick.query as otq

from onetick.py.configuration import config
from onetick.py.core.eval_query import _QueryEvalWrapper
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.utils import get_type_that_includes, adaptive, default
from onetick.py.types import type2str, date
from onetick.py.sources import query
from onetick.py.core.column import Column
from onetick.py.core.cut_builder import _QCutBuilder, _CutBuilder


__all__ = ['merge', 'join', 'join_by_time']


def output_type_by_index(sources, index):
    if index is None:
        from onetick.py.core.source import _Source
        return _Source
    return type(sources[index])


[docs]def merge(sources, align_schema=True, symbols=None, identify_input_ts=False, presort=adaptive, concurrency=default, batch_size=default, output_type_index=None): """ Merges ticks from the ``sources`` into a single output ordered by the timestamp Parameters ---------- sources : list List of sources to merge align_schema : bool If set to True, then table is added right after merge. We recommended to keep True to prevent problems with different tick schemas. Default: True symbols: str, list of str or functions, :class:`Source` Symbol(s) to run the query for passed as a string, a list of strings, or as a "symbols" query which results include the ``SYMBOL_NAME`` column. The start/end times for the symbols query will taken from the :meth:`run ` params. See :ref:`symbols <Symbols>` for more details. identify_input_ts: bool If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks. presort: bool Add the presort EP in case of bound symbols. Applicable only when ``symbols`` is not None. By default, it is set to True if ``symbols`` are set and to False otherwise. concurrency: int Specifies number of CPU cores to utilize for the ``presort`` By default, the value from otp.config.default_concurrency is used. batch_size: int Specifies the query batch size for the ``presort``. By default, the value from otp.config.default_batch_size is used. output_type_index: int Specifies index of source in ``sources`` from which type and properties of output will be taken. Useful when merging sources that inherited from :class:`Source`. By default, output object type will be :class:`Source`. Return ------ :class:`Source` or same class as ``sources[output_type_index]`` A time series of ticks. Examples -------- ``merge`` is used to merge different data sources >>> data1 = otp.Ticks(X=[1, 2], Y=['a', 'd']) >>> data2 = otp.Ticks(X=[-1, -2], Y=['*', '-']) >>> data = otp.funcs.merge([data1, data2]) # OTdirective: snippet-name:merge.as list; >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 a 1 2003-12-01 00:00:00.000 -1 * 2 2003-12-01 00:00:00.001 2 d 3 2003-12-01 00:00:00.001 -2 - Merge series from multiple symbols into one series >>> # OTdirective: snippet-name:merge.bound symbols; >>> data = otp.Ticks(X=[1]) >>> data['SYMBOL_NAME'] = data['_SYMBOL_NAME'] >>> symbols = otp.Ticks(SYMBOL_NAME=['A', 'B']) >>> data = otp.merge([data], symbols=symbols) >>> data() Time X SYMBOL_NAME 0 2003-12-01 1 A 1 2003-12-01 1 B Adding symbols param before merge >>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'], param=[1, -1]) >>> def func(symbol): ... pre = otp.Ticks(X=[1]) ... pre["SYMBOL_NAME"] = symbol.name ... pre["PARAM"] = symbol.param ... return pre >>> data = otp.funcs.merge([func], symbols=symbols) >>> otp.run(data)[['PARAM', 'SYMBOL_NAME']] PARAM SYMBOL_NAME 0 1 S1 1 -1 S2 """ from onetick.py.core.source import _Source if not sources: raise ValueError("Merge should have one or more inputs") output_type = output_type_by_index(sources, output_type_index) if presort is adaptive: presort = True if symbols is not None else False if presort and not symbols: warnings.warn("Using the `presort` parameter makes effect only when " "symbols are specified in the `symbols` parameter") if concurrency is not default and not presort: warnings.warn("Using the `concurrency` parameter makes effect only when " "the `presort` parameter is set to True") if batch_size is not default and not presort: warnings.warn("Using the `batch_size` parameter makes effect only when " "the `presort` parameter is set to True") if concurrency is default: concurrency = ( config.default_concurrency if config.default_concurrency is not None # otq.Presort does not support None else '' ) if batch_size is default: batch_size = config.default_batch_size def _base_ep_for_cross_symbol(symbol, tmp_otq): if presort and symbol: base_ep = otq.Presort(batch_size=batch_size, max_concurrency=concurrency) else: base_ep = otq.Merge(identify_input_ts=identify_input_ts) if symbol: if isinstance(symbol, (query, _QueryEvalWrapper)): symbol = symbol.to_eval_string(tmp_otq=tmp_otq) elif isinstance(symbol, _Source): symbol = symbol._convert_symbol_to_string(symbol, tmp_otq=tmp_otq) base_ep = base_ep.symbols(symbol) return base_ep def _evaluate_functions_in_sources_list(sources, symbols): result = [] if not isinstance(sources, list): sources = [sources] for s in sources: if not isinstance(s, _Source) and callable(s): num_params = len(inspect.signature(s).parameters) if num_params == 0: s = s() elif num_params == 1: s = s(symbols.to_symbol_param() if isinstance(symbols, (_Source, _QueryEvalWrapper)) else _SymbolParamSource()) else: raise ValueError( f"It is expected only one parameter from the callback, but {num_params} passed" ) # TODO: test this case if isinstance(s, _Source): result.append(s) else: raise ValueError("Source and functions (returning _source) are expected as preprocessors") return result sources = _evaluate_functions_in_sources_list(sources, symbols) need_table = False merged_columns, need_table, used_columns = _collect_merged_columns(need_table, sources) need_table = _is_table_after_merge_needed(need_table, used_columns) # we need to store internal graphs somewhere while we create base ep from eval intermediate_tmp_otq = TmpOtq() result = output_type(node=_base_ep_for_cross_symbol(symbols, tmp_otq=intermediate_tmp_otq), **merged_columns) result._tmp_otq.merge(intermediate_tmp_otq) __copy_sources_on_merge_or_join(result, sources, symbols, output_type_index=output_type_index) if presort and symbols: result.sink(otq.Merge(identify_input_ts=identify_input_ts)) if identify_input_ts: result[("SYMBOL_NAME", str)] result[("TICK_TYPE", str)] result = _add_table_after_merge(align_schema, merged_columns, need_table, result) return result
def _add_table_after_merge(add_table, merged_columns, need_table, result): if add_table and need_table: # a special case, when the add_table parameter is a list of common columns that should # be added to a final table # it is used internally if isinstance(add_table, list): merged_columns = {key: value for key, value in merged_columns.items() if key in add_table} if len(merged_columns): table = otq.Table( fields=",".join(type2str(dtype) + " " + name for name, dtype in merged_columns.items()), keep_input_fields=True, ) result.sink(table) return result def __copy_sources_on_merge_or_join(result, sources, symbols=None, names=None, drop_meta=False, leading=None, output_type_index=None, use_rename_ep=True): """ copy columns, state vars and other metadata from joining, merging sources Parameters ---------- result: _Source Source object constructed as join, merge operation, e.g. result = _Source(otq.Merge(sources)) sources: list of _Source, tuple of _Source Sources were joined, merged symbols: Symbols to copy names: list of str or None, tuple of str or None, bool, optional - If collection of string or None than add passthrough eps with such name to `sources` if name is specify or do not add anything if corresponding item in names is None. - If True, than autogenerate such names in __SRC_{number}__ format - If None, False than do not add passthrough eps and do not change node names. drop_meta : bool, optional If True drop TIMESTAMP and OMDSEQ field leading : List of str, Tuple of str, Optional List of leading sources names output_type_index: int, optional Specifies index of source in `sources` from which properties of `result` will be taken. Useful when merging sources that inherited from otp.Source. use_rename_ep: bool Use RenameFields ep or not. This ep can't be used in generic aggregation. Returns ------- None Modify result directly """ from onetick.py.core.source import _Source result._copy_state_vars_from(sources) result._clean_sources_dates() # because it is not a real _source for source in sources: result._merge_tmp_otq(source) if source.get_name(): if not result.get_name(): result.set_name(source.get_name()) if result.get_name() != source.get_name(): warnings.warn(f"Merging/joining sources with different names: '{result.get_name()}' " f"and '{source.get_name()}'. Some of those names will be lost") if isinstance(symbols, _Source): result._merge_tmp_otq(symbols) names = __copy_and_rename_nodes_on_merge_join(result, names, sources, symbols) if drop_meta: to_drop = list(map(lambda x: x + ".TIMESTAMP", names)) to_drop += list(map(lambda x: x + ".OMDSEQ", names)) __rename_leading_omdseq(leading, names, result, sources, use_rename_ep=use_rename_ep) result.sink(otq.Passthrough(fields=",".join(to_drop), drop_fields=True)) if output_type_index is not None: result._copy_properties_from(sources[output_type_index]) def __rename_fields(source, mapping, use_rename_ep=True): """ Function to rename fields from ``mapping`` in ``source``. Note that it is a low-level function that doesn't change python schema of the ``source``. Modifies ``source`` inplace, doesn't return anything. If ``use_rename_ep`` is `True`, then RenameFields ep will be used. """ if use_rename_ep: source.sink(otq.RenameFields(','.join(f'{k}={v}' for k, v in mapping.items()))) return # May be needed, because RenameFields ep is not supported in generic aggregation for old, new in mapping.items(): # RenameFields ignores non-existent fields, # all this mess is needed to mimic that logic source.sink(otq.WhereClause(where=f'UNDEFINED("{old}")')) if_branch_graph = source.node().copy_graph() if_branch_rules = source.node().copy_rules() source.sink(otq.AddField(new, old), out_pin='ELSE') source.sink(otq.Passthrough(old, drop_fields=True)) source.sink(otq.Merge(identify_input_ts=False)) source.source(if_branch_graph) source.node().add_rules(if_branch_rules) def __rename_leading_omdseq(leading, names, result, sources, use_rename_ep=True): if leading is not None: if len(leading) == 1: leading = leading.pop() __rename_fields(result, {f"{leading}.OMDSEQ": "OMDSEQ"}, use_rename_ep=use_rename_ep) else: number, indexes = __get_number_and_indexes_of_sources_have_field(sources, "OMDSEQ") if number == 1: __rename_fields(result, {f"{names[indexes.pop()]}.OMDSEQ": "OMDSEQ"}, use_rename_ep=use_rename_ep) elif number: raise ValueError( "Several sources was specified as leading and OMDSEQ field is presented in more than " "one source. Resulted OMDSEQ can't be derived in such case." ) def __get_number_and_indexes_of_sources_have_field(sources, field): number = 0 indexes = [] for s in sources: if field in s.columns(): indexes.append(number) number += 1 return number, indexes def __copy_and_rename_nodes_on_merge_join(result, names, sources, symbols): # shared eps between sources eps = defaultdict() if names is True: names = [f"__SRC_{n}__" for n in range(len(sources))] if not names: names = itertools.repeat(None) if sources: for name, src in zip(names, sources): obj = src if name: obj = src.copy() obj.sink(otq.Passthrough()) obj.node_name(name) result.source(obj.node().copy_graph(eps)) result.node().add_rules(obj.node().copy_rules()) result._set_sources_dates(obj, copy_symbols=not bool(symbols)) return names def _is_table_after_merge_needed(need_table, used_columns): if not need_table: for key, value in used_columns.items(): if not value: need_table = True break return need_table def _collect_merged_columns(need_table, sources): merged_columns = sources[0].columns(skip_meta_fields=True) used_columns = {key: False for key in merged_columns.keys()} for src in sources[1:]: for key, value in src.columns(skip_meta_fields=True).items(): if key in merged_columns: orig_type = merged_columns[key] try: merged_dtype, merged_need_table = get_type_that_includes([orig_type, value]) except ValueError as e: raise ValueError(f"Column '{key}' has different types for " f"different branches: {orig_type} {value}") from e need_table |= merged_need_table merged_columns[key] = merged_dtype else: need_table = True merged_columns[key] = value if key in used_columns: used_columns[key] = True return merged_columns, need_table, used_columns def concat(sources=None, add_table=True, symbols=None): """ Deprecated: Merges ticks from the sources into a single output _source ordered by the timestamp This function is deprecated due the wrong name notation. Use 'merge' instead. Parameters ---------- sources : list List of sources to merge align_schema : bool If set to True, then table is added right after merge. We recommended to keep True to prevent problems with different tick schemas. Default: True Return ------ A new _source that holds a result of the merged sources """ warnings.warn("This function is deprecated due the wrong name notation. Use `merge` instead.", DeprecationWarning) return merge(sources=sources, align_schema=add_table, symbols=symbols)
[docs]def join(left, right, on, how="outer", rprefix="RIGHT", keep_fields_not_in_schema=False, output_type_index=None): """ Joins two sources ``left`` and ``right`` based on ``on`` condition. In case you willing to add prefix/suffix to all columns in one of the sources you should use :func:`Source.add_prefix` or :func:`Source.add_suffix` Parameters ---------- left: :class:`Source` left source to join right: :class:`Source` right source to join on: :py:class:`~onetick.py.Operation` or 'all' or 'same_size' If 'all' joins every tick from ``left`` with every tick from ``right``. If 'same_size' and size of sources are same, joins ticks from two sources directly, else raises exception. how: 'inner' or 'outer' joining type Doesn't matter for 'all' and 'same_size' rprefix: str The name of ``right`` data source. It will be added as prefix to overlapping columns arrived from right to result keep_fields_not_in_schema: bool If True - join function will try to preserve any fields of original sources that are not in the source schema, propagating them to output. This means a possibility of runtime error if fields are duplicating. If False, will remove all fields that are not in schema. output_type_index: int Specifies index of source in sources from which type and properties of output will be taken. Useful when joining sources that inherited from otp.Source. By default output object type will be otp.Source. Returns ------- :class:`Source` or same class as ``[left, right][output_type_index]`` joined data Examples -------- >>> d1 = otp.Ticks({'ID': [1, 2, 3], 'A': ['a', 'b', 'c']}) >>> d2 = otp.Ticks({'ID': [2, 3, 4], 'B': ['q', 'w', 'e']}) >>> otp.join(d1, d2, on=d1['ID'] == d2['ID'])() Time ID A RIGHT_ID B 0 2003-12-01 00:00:00.000 1 a 0 1 2003-12-01 00:00:00.001 2 b 2 q 2 2003-12-01 00:00:00.002 3 c 3 w >>> otp.join(d1, d2, on=d1['ID'] == d2['ID'], how='inner')() Time ID A RIGHT_ID B 0 2003-12-01 00:00:00.001 2 b 2 q 1 2003-12-01 00:00:00.002 3 c 3 w Adding preix to right source for all columns: >>> d2 = d2.add_prefix('right_') >>> otp.join(d1, d2, on=d1['ID'] == d2['right_ID'])() Time ID A right_ID right_B 0 2003-12-01 00:00:00.000 1 a 0 1 2003-12-01 00:00:00.001 2 b 2 q 2 2003-12-01 00:00:00.002 3 c 3 w """ output_type = output_type_by_index((left, right), output_type_index) timezone_hack = None if re.search(r'\b_TIMEZONE\b', str(on)): # join does not support using _TIMEZONE pseudo-field in join_criteria, # replacing it with temporary fields in the branches timezone_hack = '__TIMEZONE_HACK__' left[timezone_hack] = left['_TIMEZONE'] right[timezone_hack] = right['_TIMEZONE'] if str(on) == "all": on = f'1 = 1 or {rprefix}.TIMESTAMP >= 0' _LEFT_NODE_NAME = "__SRC_LEFT__" # this is internal name _RIGHT_NODE_NAME = rprefix initial_left_use_name_for_column_prefix = left.use_name_for_column_prefix() initial_right_use_name_for_column_prefix = right.use_name_for_column_prefix() initial_left_source_node_name = left.node_name() initial_right_source_node_name = right.node_name() left.use_name_for_column_prefix(True) left.node_name(_LEFT_NODE_NAME) # we have to add _source prefix to all column operations # `on` expression is written with right, so we should modify it, we will restore it later right.use_name_for_column_prefix(True) # add prefix to every operation with that table in on right.node_name(_RIGHT_NODE_NAME) columns_name_set = set() columns = {} fields_to_skip_right_source = {'TIMESTAMP'} for name, dtype in chain(left.columns(skip_meta_fields=True).items(), right.columns(skip_meta_fields=True).items()): if name in columns_name_set: columns[_RIGHT_NODE_NAME + "_" + name] = dtype fields_to_skip_right_source.add(name) else: columns[name] = dtype columns_name_set.add(name) if how == "outer": join_type = "LEFT_OUTER" elif how == "inner": join_type = "INNER" else: raise ValueError("The 'how' parameter has wrong value. Only 'outer' and 'inner' are supported") if timezone_hack: on = re.sub(r'\._TIMEZONE\b', f'.{timezone_hack}', str(on)) on = re.sub(r'\b_TIMEZONE\b', f'{_LEFT_NODE_NAME}.{timezone_hack}', str(on)) # ------------------ # create objects params = {"join_criteria": str(on), "join_type": join_type, "left_source": _LEFT_NODE_NAME} # return states of sources back left.use_name_for_column_prefix(initial_left_use_name_for_column_prefix) left.node_name(initial_left_source_node_name) right.use_name_for_column_prefix(initial_right_use_name_for_column_prefix) right.node_name(initial_right_source_node_name) if str(on) == "same_size": result = output_type(node=otq.JoinSameSizeTs(), **columns) else: result = output_type(node=otq.Join(**params), **columns) __copy_sources_on_merge_or_join(result, (left, right), names=(_LEFT_NODE_NAME, _RIGHT_NODE_NAME), output_type_index=output_type_index) rename_fields_dict = {} for lc, rc in zip_longest(left.columns(skip_meta_fields=True), right.columns(skip_meta_fields=True)): if lc: rename_fields_dict[f"{_LEFT_NODE_NAME}.{lc}"] = lc if rc: if rc not in fields_to_skip_right_source: rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"] = rc else: rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"] = f"{_RIGHT_NODE_NAME}_{rc}" __rename_fields(result, rename_fields_dict) result.sink(otq.Passthrough(fields=_LEFT_NODE_NAME + ".TIMESTAMP", drop_fields=True)) items = [] for name, dtype in result.columns(skip_meta_fields=True).items(): items.append(type2str(dtype) + " " + name) if keep_fields_not_in_schema: # Here we try to preserve fields of original sources that were not in schema # in their original form. If there's a duplication of fields or any other problem # in runtime, we'll be able to do nothing result.sink(otq.Passthrough(fields=_RIGHT_NODE_NAME + ".TIMESTAMP", drop_fields=True)) result.sink(otq.RenameFieldsEp(rename_fields=rf"{_LEFT_NODE_NAME}\.(.*)=\1,{_RIGHT_NODE_NAME}\.(.*)=\1", use_regex=True)) result.sink(otq.Table(fields=",".join(items), keep_input_fields=True)) else: result.sink(otq.Table(fields=",".join(items))) if timezone_hack: result = result.drop([ field for field in result.schema if field.endswith(timezone_hack) ]) return result
[docs]def join_by_time(sources, how="outer", on=None, policy=None, check_schema=True, leading=0, match_if_identical_times=None, output_type_index=None, use_rename_ep=True): """ Joins ticks from multiple input time series, based on input tick timestamps. ``leading`` source tick joined with already arrived ticks from other sources. >>> leading = otp.Ticks(A=[1, 2], offset=[1, 3]) >>> other = otp.Ticks(B=[1], offset=[2]) >>> otp.join_by_time([leading, other])() Time A B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.003 2 1 In case you willing to add prefix/suffix to all columns in one of the sources you should use :func:`Source.add_prefix` or :func:`Source.add_suffix` Parameters ---------- sources: Collection[:class:`Source`] The collection of Source objects which will be joined how: 'outer' or 'inner' The method of join (inner or outer) on: Collection[:class:`Column`] ``on`` add an extra check to join - only ticks with same ``on`` fields will be joined >>> leading = otp.Ticks(A=[1, 2], offset=[1, 3]) >>> other = otp.Ticks(A=[2, 2], B=[1, 2], offset=[0, 2]) >>> otp.join_by_time([leading, other], on=['A'])() Time A B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.003 2 2 policy: 'ARRIVAL_ORDER', 'LATEST_TICKS', 'EACH_FOR_LEADER_WITH_FIRST' or 'EACH_FOR_LEADER_WITH_LATEST' Policy of joining ticks with same timestamps >>> leading = otp.Ticks(A=[1, 2], offset=[0, 0], OMDSEQ=[0, 3]) >>> other = otp.Ticks(B=[1, 2], offset=[0, 0], OMDSEQ=[2, 4]) Note: in the examples below we assume that all ticks have same timestamps, but order of ticks as in example. OMDSEQ is a special field that store order of ticks with same timestamp - ``ARRIVAL_ORDER`` output tick generated on arrival of ``leading`` source tick >>> data = otp.join_by_time([leading, other], policy='ARRIVAL_ORDER') >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 1 0 1 2003-12-01 2 1 - ``LATEST_TICKS`` Tick generated at the time of expiration of a particular timestamp (when all ticks from all sources for current timestamp arrived). Only latest tick from ``leading`` source will be used. >>> data = otp.join_by_time([leading, other], policy='LATEST_TICKS') >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 2 2 - ``EACH_FOR_LEADER_WITH_FIRST`` Each tick from ``leading`` source will be joined with first tick from other sources for current timestamp >>> data = otp.join_by_time( ... [leading, other], ... policy='EACH_FOR_LEADER_WITH_FIRST' ... ) >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 1 1 1 2003-12-01 2 1 - ``EACH_FOR_LEADER_WITH_LATEST`` Each tick from ``leading`` source will be joined with last tick from other sources for current timestamp >>> data = otp.join_by_time( ... [leading, other], ... policy='EACH_FOR_LEADER_WITH_LATEST' ... ) >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 1 2 1 2003-12-01 2 2 check_schema: bool If True onetick.py will check that all columns names are unambiguous. Which can lead to false positive error in case of some event processors were sink to Source. To avoid this set check_scheme to False. leading: int, 'all', :class:`Source`, list of int, list of :class:`Source` A list sources or their indexes. If this parameter is 'all', every source is considered to be leading. match_if_identical_times: bool A True value of this parameter causes an output tick to be formed from input ticks with identical timestamps only. If ``on`` is set to 'outer', default values of fields (``otp.nan``, 0, empty string) are propagated for sources that did not tick at a given timestamp. If this parameter is set to True, the default value of ``policy`` parameter is set to 'LATEST_TICKS'. output_type_index: int Specifies index of source in ``sources`` from which type and properties of output will be taken. Useful when joining sources that inherited from :class:`Source`. By default output object type will be :class:`Source`. use_rename_ep: bool Use RenameFields ep or not. This ep can't be used in generic aggregation. Examples -------- >>> d1 = otp.Ticks({'A': [1, 2, 3], 'offset': [1, 2, 3]}) >>> d2 = otp.Ticks({'B': [1, 2, 3], 'offset': [1, 2, 4]}) >>> otp.join_by_time([d1, d2])() Time A B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.002 2 1 2 2003-12-01 00:00:00.003 3 2 >>> otp.join_by_time([d1, d2], leading=1)() Time A B 0 2003-12-01 00:00:00.001 1 1 1 2003-12-01 00:00:00.002 2 2 2 2003-12-01 00:00:00.004 3 3 >>> otp.join_by_time([d1, d2], leading=1, match_if_identical_times=True)() Time A B 0 2003-12-01 00:00:00.001 1 1 1 2003-12-01 00:00:00.002 2 2 2 2003-12-01 00:00:00.004 0 3 Adding preix to right source for all columns: >>> otp.join_by_time([d1, d2.add_prefix('right_')])() Time A right_B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.002 2 1 2 2003-12-01 00:00:00.003 3 2 Returns ------- :class:`Source` or same class as ``sources[output_type_index]`` A time series of ticks. """ output_type = output_type_by_index(sources, output_type_index) join_str_keys = [] # if key is set, then generalize it, ie convert into list; # then remove keys from 'columns_count' dict to pass validation after if on is not None: if isinstance(on, list): pass elif isinstance(on, Column): on = [on] elif isinstance(on, str): on = [on] else: raise TypeError(f"It is not supported to have '{type(on)}' type as a key") for join_key in on: if isinstance(join_key, Column): join_str_keys.append(str(join_key)) elif isinstance(join_key, str): join_str_keys.append(join_key) if check_schema: _check_schema_for_join_by_time(join_str_keys, sources) if how not in ["inner", "outer"]: raise Exception('Wrong value for the "how" parameter. It is allowed to use "inner" or "outer" values') join_type = how.upper() # ------------------ # create objects params = {"add_source_prefix": False, "join_type": join_type} leading = _fill_leading_sources_param(leading, params, sources) if on is not None: params["join_keys"] = ",".join(join_str_keys) if policy is not None: policies = {"ARRIVAL_ORDER", "LATEST_TICKS", "EACH_FOR_LEADER_WITH_FIRST", "EACH_FOR_LEADER_WITH_LATEST"} policy = policy.upper() if policy not in policies: raise ValueError("Invalid policy. Only the following ones are allowed: " + ", ".join(policies) + ".") params["same_timestamp_join_policy"] = policy if match_if_identical_times is not None: params["match_if_identical_times"] = match_if_identical_times columns = {name: dtype for src in sources for name, dtype in src.columns(skip_meta_fields=True).items()} result = output_type(node=otq.JoinByTime(**params), **columns) __copy_sources_on_merge_or_join(result, sources, names=True, drop_meta=True, leading=leading, output_type_index=output_type_index, use_rename_ep=use_rename_ep) if how == "outer": items = [] for name, dtype in result.columns(skip_meta_fields=True).items(): items.append(type2str(dtype) + " " + name) result.sink(otq.Table(fields=",".join(items), keep_input_fields=True)) return result
@singledispatch def _fill_leading_sources_param(leading, params, sources): from onetick.py.core.source import _Source if isinstance(leading, _Source): # TODO: PY-104 Get rid of circular dependencies in code to avoid local import result = f"__SRC_{__find_by_id(sources, leading)}__" params["leading_sources"] = result result = [result] elif leading == "all": # all sources are leading which is specified by empty string params["leading_sources"] = "" result = [] else: raise ValueError( "wrong leading param was specified, please use any of int, 'all' literal, " "list of int, list of _Source" ) return result @_fill_leading_sources_param.register(int) def _(leading, params, sources): if leading < 0: leading = len(sources) + leading if 0 <= leading < len(sources): result = f"__SRC_{leading}__" params["leading_sources"] = result return [result] else: raise ValueError( f"leading source index should be in range(-len(source), len(source)), " f"but {leading} was specified." ) @_fill_leading_sources_param.register(list) # type: ignore # _ already defined above @_fill_leading_sources_param.register(tuple) def _(leading, params, sources): if len(leading) > len(sources): raise ValueError("Number of leading sources can't be bigger number of sources") elif len(leading) == len(sources): Warning( "You've specified leading and source lists of the same size, you can specify leading='all' " "instead of it" ) if isinstance(leading[0], int): result = leading else: result = [__find_by_id(sources, lead) for lead in leading] indexes = ",".join(f"__SRC_{i}__" for i in result) params["leading_sources"] = indexes return result def __find_by_id(collection, item): for index, s in enumerate(collection): if s is item: return index raise ValueError("The leading source should be in join sources list") def _check_schema_for_join_by_time(join_str_keys, sources): # check that there aren't matching columns columns_count = Counter() for src in sources: columns_count.update(src.columns(skip_meta_fields=True).keys()) for join_key in join_str_keys: del columns_count[join_key] matched = [k for k, value in columns_count.items() if value > 1] if "OMDSEQ" in matched: # OMDSEQ behaves like the TIMESTAMP field matched.remove("OMDSEQ") if len(matched): raise Exception(f"There are matched columns between sources: {','.join(matched)}") def apply_query(query, in_sources=None, output_pins=None, shared_state_variables_list=None, output_type_index=None, **params): from onetick.py.sources import query as otp_query output_type = output_type_by_index(in_sources, output_type_index) output_pins = output_pins if output_pins else [] in_sources = in_sources if in_sources else {} shared_state_variables_list = shared_state_variables_list if shared_state_variables_list else [] if isinstance(query, str): # it seems that path is passed query = otp_query(query, **params) elif isinstance(query, otp_query): if params: query.update_params(**params) # elif columns = {} for src in in_sources.values(): columns.update(src.columns(skip_meta_fields=True)) str_params = query.str_params shared_state_variables = ",".join(shared_state_variables_list) inputs_need_unbound_symbols = {in_pin: src._is_unbound_required() for in_pin, src in in_sources.items()} if query.graph_info.has_unbound_if_pinned(inputs_need_unbound_symbols): symbol = adaptive else: symbol = None nested_src = output_type( node=otq.NestedOtq(query.path, str_params, shared_state_variables=shared_state_variables), _has_output=len(output_pins) > 0, _symbols=symbol, **columns, ) eps = defaultdict() for in_pin, src in in_sources.items(): nested_src.source(src.node().copy_graph(eps), in_pin) nested_src.node().add_rules(src.node().copy_rules()) nested_src._set_sources_dates(src) nested_src._merge_tmp_otq(src) if len(output_pins) == 0: # output_pins = ['OUT'] return nested_src if len(output_pins) > 1: result = [] for out_pin in output_pins: res_src = nested_src.copy() res_src.node().out_pin(out_pin) # NOTE: need to comment out this node res_src.sink(otq.Passthrough()) # apply config customization query.config.apply(out_pin, res_src) result.append(res_src) return tuple(result) else: # TODO: move setting out_pin on the creating step of nested_src # It seems as not working now, because seems .copy() of _Source doesnt # copy out_pin reference, need to check nested_src.node().out_pin(output_pins[0]) # apply config customization query.config.apply(output_pins[0], nested_src) return nested_src def apply(query, *args, **kwargs): return apply_query(query.path, *args, **kwargs, **query.params)
[docs]def cut(column: 'Column', bins: Union[int, List[float]], labels: List[str] = None): """ Bin values into discrete intervals (mimics :pandas:`pandas.cut`). Parameters ---------- column: :py:class:`~onetick.py.Column` Column with numberic data used to build bins. bins: int or List[float] When List[float] - defines the bin edges. When int - Defines the number of equal-width bins in the range of x. labels: List[str] Labels used to name resulting bins. If not set, bins are numberic intervals like (5.0000000000, 7.5000000000]. Return ------ object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__` Examples -------- >>> # OTdirective: snippet-name: Source.functions.cut; >>> data = otp.Ticks({"X": [9, 8, 5, 6, 7, 0, ]}) >>> data['bin'] = otp.cut(data['X'], bins=3, labels=['a', 'b', 'c']) >>> data.to_df()[['X', 'bin']] X bin 0 9 c 1 8 c 2 5 b 3 6 b 4 7 c 5 0 a """ src = column.obj_ref return _CutBuilder(src, column, bins, labels=labels)
[docs]def qcut(column: 'Column', q: Union[int, List[float]], labels: List[str] = None): """ Quantile-based discretization function (mimics :pandas:`pandas.qcut`). Parameters ---------- column: :py:class:`~onetick.py.Column` Column with numberic data used to build bins. q: int or List[float] When List[float] - array of quantiles, e.g. [0, .25, .5, .75, 1.] for quartiles. When int - Number of quantiles. 10 for deciles, 4 for quartiles, etc. labels: List[str] Labels used to name resulting bins. If not set, bins are numberic intervals like (5.0000000000, 7.5000000000]. Return ------ object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__` Examples -------- >>> # OTdirective: snippet-name: Source.functions.qcut; >>> data = otp.Ticks({"X": [10, 3, 5, 6, 7, 1]}) >>> data['bin'] = otp.qcut(data['X'], q=3, labels=['a', 'b', 'c']) >>> data.to_df()[['X', 'bin']] X bin 0 10 c 1 3 a 2 5 b 3 6 b 4 7 c 5 1 a """ # TODO when q is a List[float] like [0, .25, .5, .75, 1.] src = column.obj_ref return _QCutBuilder(src, column, q, labels=labels)
[docs]def coalesce(sources, max_source_delay: float = 0.0, output_type_index: int = None): """ Used to fill the gaps in one time series with the ticks from one or several other time series. This event processor considers ticks that arrive from several sources at the same time as being the same, allowing for possible delay across the sources when determining whether the ticks are the same. When the same tick arrives from several sources, it is only propagated from the source that has the highest priority among those sources. Input ticks do not necessarily have the same structure - they can have different fields. In order to distinguish time series the event processor adds the SYMBOL_NAME field. Also SOURCE field is added to each tick which lacks it to identify the source from which the tick is coming. Hence, one must avoid adding SOURCE field in event processors positioned after COALSECE. Parameters ---------- sources: list of :class:`Source` List of the sources to coalesce. Also, this list is treated as priority order. First member of the list has the highest priority when determining whether ticks are the same. max_source_delay: float The maximum time in seconds by which a tick from one input time series can arrive later than the same tick from another time series. output_type_index: int Specifies index of source in ``sources`` from which type and properties of output will be taken. Useful when merging sources that inherited from :class:`Source`. By default, output object type will be :class:`Source`. Return ------ :class:`Source` A time series of ticks. Examples -------- If ticks from different sources have the same time, only the tick from source with the highest priority will be propagated. >>> data1 = otp.Ticks(A=[1, 2]) >>> data2 = otp.Ticks(A=[3, 4]) >>> data = otp.coalesce([data2, data1]) >>> otp.run(data)[['Time', 'A']] Time A 0 2003-12-01 00:00:00.000 3 1 2003-12-01 00:00:00.001 4 We can use ``max_source_delay`` parameter to expand time interval in which ticks are considered to have the "same time". >>> data1 = otp.Ticks({ ... 'A': [1, 2, 3], ... 'offset': [0, 3000, 6000], ... }) >>> data2 = otp.Ticks({ ... 'A': [4, 5, 6], ... # 4 is delayed by less than one second from 1 ... # 5 is delayed by one second from 2 ... # 6 is delayed by more than one second from 3 ... 'offset': [999, 4000, 7001], ... }) >>> data = otp.coalesce([data2, data1], max_source_delay=1) >>> otp.run(data)[['Time', 'A']] Time A 0 2003-12-01 00:00:00.999 4 1 2003-12-01 00:00:04.000 5 2 2003-12-01 00:00:06.000 3 3 2003-12-01 00:00:07.001 6 """ if not sources: raise ValueError("Coalesce should have one or more inputs") output_type = output_type_by_index(sources, output_type_index) # change node names for sources, COALESCE ep needs them new_node_names = [ f'__COALESCE_SRC_{i}__' for i, source in enumerate(sources, start=1) ] node = otq.Coalesce( priority_order=','.join(new_node_names), max_source_delay=max_source_delay, ) columns = { # these fields will be added by COALESCE ep 'SYMBOL_NAME': str, 'TICK_TYPE': str, } for source in sources: for name in ['SYMBOL_NAME', 'TICK_TYPE']: if name in source.schema: raise ValueError(f"Field with name '{name}' is already present in the source. " 'Please, rename or delete that field prior to invoking coalesce().') shared_columns = set(source.schema).intersection(columns) for name in shared_columns: type_1, type_2 = source.schema[name], columns[name] if type_1 != type_2: raise ValueError(f"Conflicting types for field '{name}' in different sources: {type_1}, {type_2}") columns.update(source.schema) # TODO: do we need field SOURCE (especially when node names are auto-generated)? # this field will be added by COALESCE if it's not presented in sources columns.setdefault('SOURCE', str) result = output_type(node, **columns) __copy_sources_on_merge_or_join(result, sources, names=new_node_names, output_type_index=output_type_index) return result
[docs]def corp_actions(source, adjustment_date: Union[date, int, str, None] = None, adjustment_date_tz: str = default, fields=None, adjust_rule="PRICE", apply_split: bool = True, apply_spinoff: bool = False, apply_cash_dividend: bool = False, apply_stock_dividend: bool = False, apply_security_splice: bool = False, apply_others: str = "", apply_all: bool = False, ): """Adjusts values using corporate actions information loaded into OneTick from the reference data file. To use it, location of reference database must be specified via OneTick configuration. Parameters ---------- source : onetick.py.Source Source object adjusted by corporate actions information. adjustment_date : otp.date, int, str, None, optional The date as of which the values are adjusted. `int` format is YYYYMMDD. If it is not set, the values are adjusted as of the end date in the query. All corporate actions of the types specified in the parameters that lie between the tick timestamp and the adjustment date will be applied to each tick. Notice that the adjustment date is not affected neither by _SYMBOL_PARAM._PARAM_END_TIME nor by the apply_times_daily setting. By default None adjustment_date_tz : str, optional Timezone for adjustment date, by default global `otp.config.tz` value used. Local timezone can't be used so in this case parameter is set to 'GMT'. fields : str, optional A comma-separated list of fields to be adjusted. If this parameter is not set, some default adjustments will take place if appropriately named fields exist in the tick: - If the ADJUST_RULE parameter is set to PRICE, and the PRICE field is present, it will get adjusted. If the fields ASK_PRICE or BID_PRICE are present, they will get adjusted. If fields ASK_VALUE or BID_VALUE are present, they will get adjusted - If the ADJUST_RULE parameter is set to SIZE, and the SIZE field is present, it will get adjusted. If the fields ASK_SIZE or BID_SIZE are present, they will get adjusted. If fields ASK_VALUE or BID_VALUE are present, they will get adjusted. By default None adjust_rule : str, optional When set to PRICE, adjustments are applied under the assumption that fields to be adjusted contain prices (adjustment direction is determined appropriately). When set to SIZE, adjustments are applied under the assumption that fields contain sizes (adjustment direction is opposite to that when the parameter's value is PRICE). By default "PRICE" apply_split : bool, optional If true, adjustments for splits are applied, by default `True` apply_spinoff : bool, optional If true, adjustments for spin-offs are applied, by default `False` apply_cash_dividend : bool, optional If true, adjustments for cash dividends are applied, by default `False` apply_stock_dividend : bool, optional If true, adjustments for stock dividends are applied, by default `False` apply_security_splice : bool, optional If true, adjustments for security splices are applied, by default `False` apply_others : str, optional A comma-separated list of names of custom adjustment types to apply, by default "" apply_all : bool, optional If true, applies all types of adjustments, both built-in and custom, by default `False` Returns ------- onetick.py.Source A new source object with applied adjustments. Examples -------- >>> src = otp.DataSource('NYSE_TAQ', ... tick_type='TRD', ... start=otp.dt(2022, 5, 20, 9, 30), ... end=otp.dt(2022, 5, 26, 16)) >>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22)) >>> df["PRICE"][0] 0.0911 >>> src = otp.functions.corp_actions(src, ... adjustment_date=otp.date(2022, 5, 22), ... fields="PRICE",) >>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22)) >>> df["PRICE"][0] 1.36649931675 """ if not isinstance(adjustment_date, (date, int, type(None))): adjustment_date = date(adjustment_date) if isinstance(adjustment_date, date): adjustment_date = int(adjustment_date.to_str()) if adjustment_date_tz is default: adjustment_date_tz = config.tz if not adjustment_date_tz: warnings.warn("Local timezone can't be used in parameter 'adjustment_date_tz', setting to 'GMT'.") adjustment_date_tz = 'GMT' source.sink(otq.CorpActions( adjustment_date=adjustment_date, adjustment_date_tz=adjustment_date_tz, fields=fields, adjust_rule=adjust_rule, apply_split=apply_split, apply_spinoff=apply_spinoff, apply_cash_dividend=apply_cash_dividend, apply_stock_dividend=apply_stock_dividend, apply_security_splice=apply_security_splice, apply_others=apply_others, apply_all=apply_all, )) return source
def save_sources_to_single_file(sources, file_path=None, file_suffix='', start=None, end=None, start_time_expression=None, end_time_expression=None, timezone=None, running_query_flag=None): """ Save onetick.py.Source objects to the single file. Parameters ---------- sources: dict or list dict of names -> sources or list of sources to merge into single file. If it's the list then names will be autogenerated. Source can be :class:`otp.Source` object or dictionary with these allowed parameters: { 'source': otp.Source, 'start': datetime(2022, 1, 1), # optional 'end': datetime(2022, 1, 2), # optional 'symbols': otp.Source or otp.Symbols, # optional } file_path: str, optional Path to the file where all sources will be saved. If not set, sources will be saved to temporary file and its name will be returned. file_suffix: str Only used if ``file_path`` is not set. This suffix will be added to the name of a generated query file. start: datetime, optional start time for the resulting query file end: datetime, optional end time for the resulting query file start_time_expression: str, optional start time expression for the resulting query file end_time_expression: str, optional end time expression for the resulting query file timezone: str, optional timezone for the resulting query file running_query_flag: bool, optional running query flag for the resulting query file Returns ------- If `sources` is list then returns list of full query paths (path_to_file::query_name) with autogenerated names corresponding to each source from `sources`. If `sources` is dict then the path to the query file is returned. """ if isinstance(sources, dict): names = sources.keys() sources = sources.values() query_names = None else: names = repeat(None) query_names = [] tmp_otq = TmpOtq() for name, source in zip(names, sources): query_start = query_end = query_symbols = None if isinstance(source, dict): query_start = source.get('start') query_end = source.get('end') query_symbols = source.get('symbols') source = source['source'] query_name = source._store_in_tmp_otq(tmp_otq, name=name, start=query_start, end=query_end, symbols=query_symbols) if query_names is not None: query_names.append(query_name) file_path = tmp_otq.save_to_file( file_path=file_path, file_suffix=file_suffix, start=start, end=end, start_time_expression=start_time_expression, end_time_expression=end_time_expression, timezone=timezone, running_query_flag=running_query_flag, ) if query_names is not None: return [f'{file_path}::{query_name}' for query_name in query_names] return file_path