import itertools
import warnings
import inspect
import datetime
import re
from collections import defaultdict, Counter
from functools import singledispatch
from itertools import chain, zip_longest, repeat
from typing import List, Union
import onetick.query as otq
from onetick.py.configuration import config
from onetick.py.core.eval_query import _QueryEvalWrapper
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.utils import get_type_that_includes, adaptive, default
from onetick.py.types import type2str, date, datetime as otp_datetime
from onetick.py.sources import query
from onetick.py.core.column import Column
from onetick.py.core.cut_builder import _QCutBuilder, _CutBuilder
__all__ = ['merge', 'join', 'join_by_time']
def output_type_by_index(sources, index):
if index is None:
from onetick.py.core.source import _Source
return _Source
return type(sources[index])
[docs]def merge(sources, align_schema=True, symbols=None, identify_input_ts=False,
presort=adaptive, concurrency=default, batch_size=default, output_type_index=None):
"""
Merges ticks from the ``sources`` into a single output ordered by the timestamp
Parameters
----------
sources : list
List of sources to merge
align_schema : bool
If set to True, then table is added right after merge.
We recommended to keep True to prevent problems with
different tick schemas. Default: True
symbols: str, list of str or functions, :class:`Source`
Symbol(s) to run the query for passed as a string, a list of strings, or as a "symbols" query which results
include the ``SYMBOL_NAME`` column. The start/end times for the
symbols query will taken from the :meth:`run` params.
See :ref:`symbols <Symbols>` for more details.
identify_input_ts: bool
If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks.
presort: bool
Add the presort EP in case of bound symbols.
Applicable only when ``symbols`` is not None.
By default, it is set to True if ``symbols`` are set
and to False otherwise.
concurrency: int
Specifies number of CPU cores to utilize for the ``presort``
By default, the value from otp.config.default_concurrency is used.
batch_size: int
Specifies the query batch size for the ``presort``.
By default, the value from otp.config.default_batch_size is used.
output_type_index: int
Specifies index of source in ``sources`` from which type and properties of output will be taken.
Useful when merging sources that inherited from :class:`Source`.
By default, output object type will be :class:`Source`.
Return
------
:class:`Source` or same class as ``sources[output_type_index]``
A time series of ticks.
See also
--------
**MERGE** and **PRESORT** OneTick event processors
Examples
--------
``merge`` is used to merge different data sources
>>> data1 = otp.Ticks(X=[1, 2], Y=['a', 'd'])
>>> data2 = otp.Ticks(X=[-1, -2], Y=['*', '-'])
>>> data = otp.funcs.merge([data1, data2]) # OTdirective: snippet-name:merge.as list;
>>> otp.run(data)
Time X Y
0 2003-12-01 00:00:00.000 1 a
1 2003-12-01 00:00:00.000 -1 *
2 2003-12-01 00:00:00.001 2 d
3 2003-12-01 00:00:00.001 -2 -
Merge series from multiple symbols into one series
>>> # OTdirective: snippet-name:merge.bound symbols;
>>> data = otp.Ticks(X=[1])
>>> data['SYMBOL_NAME'] = data['_SYMBOL_NAME']
>>> symbols = otp.Ticks(SYMBOL_NAME=['A', 'B'])
>>> data = otp.merge([data], symbols=symbols)
>>> otp.run(data)
Time X SYMBOL_NAME
0 2003-12-01 1 A
1 2003-12-01 1 B
Adding symbols param before merge
>>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'], param=[1, -1])
>>> def func(symbol):
... pre = otp.Ticks(X=[1])
... pre["SYMBOL_NAME"] = symbol.name
... pre["PARAM"] = symbol.param
... return pre
>>> data = otp.funcs.merge([func], symbols=symbols)
>>> otp.run(data)[['PARAM', 'SYMBOL_NAME']]
PARAM SYMBOL_NAME
0 1 S1
1 -1 S2
"""
from onetick.py.core.source import _Source
if not sources:
raise ValueError("Merge should have one or more inputs")
output_type = output_type_by_index(sources, output_type_index)
if presort is adaptive:
presort = True if symbols is not None else False
if presort and not symbols:
warnings.warn("Using the `presort` parameter makes effect only when "
"symbols are specified in the `symbols` parameter")
if concurrency is not default and not presort:
warnings.warn("Using the `concurrency` parameter makes effect only when "
"the `presort` parameter is set to True")
if batch_size is not default and not presort:
warnings.warn("Using the `batch_size` parameter makes effect only when "
"the `presort` parameter is set to True")
if concurrency is default:
concurrency = (
config.default_concurrency
if config.default_concurrency is not None
# otq.Presort does not support None
else ''
)
if batch_size is default:
batch_size = config.default_batch_size
def _base_ep_for_cross_symbol(symbol, tmp_otq):
if presort and symbol:
base_ep = otq.Presort(batch_size=batch_size, max_concurrency=concurrency)
else:
base_ep = otq.Merge(identify_input_ts=identify_input_ts)
if symbol:
if isinstance(symbol, (query, _QueryEvalWrapper)):
symbol = symbol.to_eval_string(tmp_otq=tmp_otq)
elif isinstance(symbol, _Source):
symbol = symbol._convert_symbol_to_string(symbol, tmp_otq=tmp_otq)
base_ep = base_ep.symbols(symbol)
return base_ep
def _evaluate_functions_in_sources_list(sources, symbols):
result = []
if not isinstance(sources, list):
sources = [sources]
for s in sources:
if not isinstance(s, _Source) and callable(s):
num_params = len(inspect.signature(s).parameters)
if num_params == 0:
s = s()
elif num_params == 1:
s = s(symbols.to_symbol_param() if isinstance(symbols, (_Source, _QueryEvalWrapper))
else _SymbolParamSource())
else:
raise ValueError(
f"It is expected only one parameter from the callback, but {num_params} passed"
) # TODO: test this case
if isinstance(s, _Source):
result.append(s)
else:
raise ValueError("Source and functions (returning _source) are expected as preprocessors")
return result
sources = _evaluate_functions_in_sources_list(sources, symbols)
need_table = False
merged_columns, need_table, used_columns = _collect_merged_columns(need_table, sources)
need_table = _is_table_after_merge_needed(need_table, used_columns)
# we need to store internal graphs somewhere while we create base ep from eval
intermediate_tmp_otq = TmpOtq()
result = output_type(node=_base_ep_for_cross_symbol(symbols, tmp_otq=intermediate_tmp_otq), **merged_columns)
result._tmp_otq.merge(intermediate_tmp_otq)
__copy_sources_on_merge_or_join(result, sources, symbols, output_type_index=output_type_index)
if presort and symbols:
result.sink(otq.Merge(identify_input_ts=identify_input_ts))
if identify_input_ts:
result[("SYMBOL_NAME", str)]
result[("TICK_TYPE", str)]
result = _add_table_after_merge(align_schema, merged_columns, need_table, result)
result._fix_varstrings()
return result
def _add_table_after_merge(add_table, merged_columns, need_table, result):
if add_table and need_table:
# a special case, when the add_table parameter is a list of common columns that should
# be added to a final table
# it is used internally
if isinstance(add_table, list):
merged_columns = {key: value for key, value in merged_columns.items() if key in add_table}
if len(merged_columns):
table = otq.Table(
fields=",".join(type2str(dtype) + " " + name for name, dtype in merged_columns.items()),
keep_input_fields=True,
)
result.sink(table)
return result
def __copy_sources_on_merge_or_join(result,
sources,
symbols=None,
names=None,
drop_meta=False,
leading=None,
output_type_index=None,
use_rename_ep=True):
""" copy columns, state vars and other metadata from joining, merging sources
Parameters
----------
result: _Source
Source object constructed as join, merge operation, e.g. result = _Source(otq.Merge(sources))
sources: list of _Source, tuple of _Source
Sources were joined, merged
symbols:
Symbols to copy
names: list of str or None, tuple of str or None, bool, optional
- If collection of string or None than add passthrough eps with such name to `sources` if name is specify
or do not add anything if corresponding item in names is None.
- If True, than autogenerate such names in __SRC_{number}__ format
- If None, False than do not add passthrough eps and do not change node names.
drop_meta : bool, optional
If True drop TIMESTAMP and OMDSEQ field
leading : List of str, Tuple of str, Optional
List of leading sources names
output_type_index: int, optional
Specifies index of source in `sources` from which properties of `result` will be taken.
Useful when merging sources that inherited from otp.Source.
use_rename_ep: bool
Use :py:class:`onetick.query.RenameFields` event processor or not.
This event processor can't be used in generic aggregation.
Returns
-------
None
Modify result directly
"""
from onetick.py.core.source import _Source
result._copy_state_vars_from(sources)
result._clean_sources_dates() # because it is not a real _source
for source in sources:
result._merge_tmp_otq(source)
if source.get_name():
if not result.get_name():
result.set_name(source.get_name())
if result.get_name() != source.get_name():
warnings.warn(f"Merging/joining sources with different names: '{result.get_name()}' "
f"and '{source.get_name()}'. Some of those names will be lost")
if isinstance(symbols, _Source):
result._merge_tmp_otq(symbols)
names = __copy_and_rename_nodes_on_merge_join(result, names, sources, symbols)
if drop_meta:
to_drop = list(map(lambda x: x + ".TIMESTAMP", names))
to_drop += list(map(lambda x: x + ".OMDSEQ", names))
__rename_leading_omdseq(leading, names, result, sources, use_rename_ep=use_rename_ep)
result.sink(otq.Passthrough(fields=",".join(to_drop), drop_fields=True))
if output_type_index is not None:
result._copy_properties_from(sources[output_type_index])
def __rename_fields(source, mapping, use_rename_ep=True):
"""
Function to rename fields from ``mapping`` in ``source``.
Note that it is a low-level function that doesn't change python schema of the ``source``.
Modifies ``source`` inplace, doesn't return anything.
If ``use_rename_ep`` is `True`, then :py:class:`onetick.query.RenameFields` event processor will be used.
"""
if use_rename_ep:
source.sink(otq.RenameFields(','.join(f'{k}={v}' for k, v in mapping.items())))
return
# May be needed, because RenameFields ep is not supported in generic aggregation
for old, new in mapping.items():
# RenameFields ignores non-existent fields,
# all this mess is needed to mimic that logic
source.sink(otq.WhereClause(where=f'UNDEFINED("{old}")'))
if_branch_graph = source.node().copy_graph()
if_branch_rules = source.node().copy_rules()
source.sink(otq.AddField(new, old), out_pin='ELSE')
source.sink(otq.Passthrough(old, drop_fields=True))
source.sink(otq.Merge(identify_input_ts=False))
source.source(if_branch_graph)
source.node().add_rules(if_branch_rules)
def __rename_leading_omdseq(leading, names, result, sources, use_rename_ep=True):
if leading is not None:
if len(leading) == 1:
leading = leading.pop()
__rename_fields(result, {f"{leading}.OMDSEQ": "OMDSEQ"}, use_rename_ep=use_rename_ep)
else:
number, indexes = __get_number_and_indexes_of_sources_have_field(sources, "OMDSEQ")
if number == 1:
__rename_fields(result, {f"{names[indexes.pop()]}.OMDSEQ": "OMDSEQ"}, use_rename_ep=use_rename_ep)
elif number:
raise ValueError(
"Several sources was specified as leading and OMDSEQ field is presented in more than "
"one source. Resulted OMDSEQ can't be derived in such case."
)
def __get_number_and_indexes_of_sources_have_field(sources, field):
number = 0
indexes = []
for s in sources:
if field in s.columns():
indexes.append(number)
number += 1
return number, indexes
def __copy_and_rename_nodes_on_merge_join(result, names, sources, symbols):
# shared eps between sources
eps = defaultdict()
if names is True:
names = [f"__SRC_{n}__" for n in range(len(sources))]
if not names:
names = itertools.repeat(None)
if sources:
for name, src in zip(names, sources):
obj = src
if name:
obj = src.copy()
obj.sink(otq.Passthrough())
obj.node_name(name)
result.source(obj.node().copy_graph(eps))
result.node().add_rules(obj.node().copy_rules())
result._set_sources_dates(obj, copy_symbols=not bool(symbols))
return names
def _is_table_after_merge_needed(need_table, used_columns):
if not need_table:
for key, value in used_columns.items():
if not value:
need_table = True
break
return need_table
def _collect_merged_columns(need_table, sources):
merged_columns = sources[0].columns(skip_meta_fields=True)
used_columns = {key: False for key in merged_columns.keys()}
for src in sources[1:]:
for key, value in src.columns(skip_meta_fields=True).items():
if key in merged_columns:
orig_type = merged_columns[key]
try:
merged_dtype, merged_need_table = get_type_that_includes([orig_type, value])
except ValueError as e:
raise ValueError(f"Column '{key}' has different types for "
f"different branches: {orig_type} {value}") from e
need_table |= merged_need_table
merged_columns[key] = merged_dtype
else:
need_table = True
merged_columns[key] = value
if key in used_columns:
used_columns[key] = True
return merged_columns, need_table, used_columns
def concat(sources=None, add_table=True, symbols=None):
""" Deprecated: Merges ticks from the sources into a single output _source ordered by the timestamp
This function is deprecated due the wrong name notation.
Use 'merge' instead.
Parameters
----------
sources : list
List of sources to merge
align_schema : bool
If set to True, then table is added right after merge.
We recommended to keep True to prevent problems with
different tick schemas. Default: True
Return
------
A new _source that holds a result of the merged sources
"""
warnings.warn("This function is deprecated due the wrong name notation. Use `merge` instead.", DeprecationWarning)
return merge(sources=sources, align_schema=add_table, symbols=symbols)
[docs]def join(left, right, on, how="outer", rprefix="RIGHT", keep_fields_not_in_schema=False, output_type_index=None):
"""
Joins two sources ``left`` and ``right`` based on ``on`` condition.
In case you willing to add prefix/suffix to all columns in one of the sources you should use
:func:`Source.add_prefix` or :func:`Source.add_suffix`
Parameters
----------
left: :class:`Source`
left source to join
right: :class:`Source`
right source to join
on: :py:class:`~onetick.py.Operation` or 'all' or 'same_size' or list of strings
If 'all' joins every tick from ``left`` with every tick from ``right``.
If 'same_size' and size of sources are same, joins ticks from two sources directly, else raises exception.
If it is list of strings, then ticks with same ``on`` fields will be joined.
how: 'inner' or 'outer'
Joining type.
Inner join will only produce ticks that matched the ``on`` condition.
Outer join will also produce the ticks from the ``left`` source
that didn't match the condition (so it's basically a left-outer join).
Doesn't matter for ``on='same_size'``.
rprefix: str
The name of ``right`` data source. It will be added as prefix to overlapping columns arrived
from right to result
keep_fields_not_in_schema: bool
If True - join function will try to preserve any fields of original sources that are not in the source schema,
propagating them to output. This means a possibility of runtime error if fields are duplicating.
If False, will remove all fields that are not in schema.
output_type_index: int
Specifies index of source in sources from which type and properties of output will be taken.
Useful when joining sources that inherited from :class:`Source`.
By default output object type will be :class:`Source`.
Note
----
``join`` does some internal optimization in case of using time-based ``on`` conditions. Optimization doesn't apply
if ``on`` expression has functions in it. So it is recommended to use addition/subtraction number of
milliseconds (integers).
See examples for more details.
Returns
-------
:class:`Source` or same class as ``[left, right][output_type_index]``
joined data
See also
--------
**JOIN** OneTick event processor
Examples
--------
>>> d1 = otp.Ticks({'ID': [1, 2, 3], 'A': ['a', 'b', 'c']})
>>> d2 = otp.Ticks({'ID': [2, 3, 4], 'B': ['q', 'w', 'e']})
Outer join:
>>> data = otp.join(d1, d2, on=d1['ID'] == d2['ID'], how='outer')
>>> otp.run(data)
Time ID A RIGHT_ID B
0 2003-12-01 00:00:00.000 1 a 0
1 2003-12-01 00:00:00.001 2 b 2 q
2 2003-12-01 00:00:00.002 3 c 3 w
Inner join:
>>> data = otp.join(d1, d2, on=d1['ID'] == d2['ID'], how='inner')
>>> otp.run(data)
Time ID A RIGHT_ID B
0 2003-12-01 00:00:00.001 2 b 2 q
1 2003-12-01 00:00:00.002 3 c 3 w
Join all ticks:
>>> data = otp.join(d1, d2, on='all')
>>> otp.run(data)
Time ID A RIGHT_ID B
0 2003-12-01 00:00:00.000 1 a 2 q
1 2003-12-01 00:00:00.000 1 a 3 w
2 2003-12-01 00:00:00.000 1 a 4 e
3 2003-12-01 00:00:00.001 2 b 2 q
4 2003-12-01 00:00:00.001 2 b 3 w
5 2003-12-01 00:00:00.001 2 b 4 e
6 2003-12-01 00:00:00.002 3 c 2 q
7 2003-12-01 00:00:00.002 3 c 3 w
8 2003-12-01 00:00:00.002 3 c 4 e
Join same size sources:
>>> data = otp.join(d1, d2, on='same_size')
>>> otp.run(data)
Time ID A RIGHT_ID B
0 2003-12-01 00:00:00.000 1 a 2 q
1 2003-12-01 00:00:00.001 2 b 3 w
2 2003-12-01 00:00:00.002 3 c 4 e
Adding prefix to the right source for all columns:
>>> d_right = d2.add_prefix('right_')
>>> data = otp.join(d1, d_right, on=d1['ID'] == d_right['right_ID'])
>>> otp.run(data)
Time ID A right_ID right_B
0 2003-12-01 00:00:00.000 1 a 0
1 2003-12-01 00:00:00.001 2 b 2 q
2 2003-12-01 00:00:00.002 3 c 3 w
This condition will be optimized during run time:
>>> data = otp.join(d1, d2, on=(d1['ID'] == d2['ID']) & (d1['Time'] >= d2['Time']), how='outer')
>>> otp.run(data)
Time ID A RIGHT_ID B
0 2003-12-01 00:00:00.000 1 a 0
1 2003-12-01 00:00:00.001 2 b 2 q
2 2003-12-01 00:00:00.002 3 c 3 w
This condition won't be optimized during run time since in transforms addition to time into function.
So please note, this way of using ``join`` is not recommended.
>>> data = otp.join(d1, d2, on=(d1['ID'] == d2['ID']) & (d1['Time'] >= d2['Time'] + otp.Milli(1)), how='outer')
>>> otp.run(data)
Time ID A RIGHT_ID B
0 2003-12-01 00:00:00.000 1 a 0
1 2003-12-01 00:00:00.001 2 b 2 q
2 2003-12-01 00:00:00.002 3 c 3 w
In such cases (adding/subtracting constants to time) adding/subtraction number of milliseconds should be done.
This example will return exactly the same result as previous one, but it will be optimized, so runtime will be
shorter.
>>> data = otp.join(d1, d2, on=(d1['ID'] == d2['ID']) & (d1['Time'] >= d2['Time'] + 1), how='outer')
>>> otp.run(data)
Time ID A RIGHT_ID B
0 2003-12-01 00:00:00.000 1 a 0
1 2003-12-01 00:00:00.001 2 b 2 q
2 2003-12-01 00:00:00.002 3 c 3 w
``on`` can be list of strings:
>>> left = otp.Ticks(A=[1, 2, 3], B=[4, 6, 7])
>>> right = otp.Ticks(A=[2, 3, 4], B=[6, 9, 8], C=[7, 2, 0])
>>> data = otp.join(left, right, on=['A', 'B'], how='inner')
>>> otp.run(data)
Time A B C
0 2003-12-01 00:00:00.001 2 6 7
"""
output_type = output_type_by_index((left, right), output_type_index)
on_list = []
if isinstance(on, list):
for column in on:
if column not in left.schema:
raise ValueError(f'`{column}` column does not exist in the left source.')
if column not in right.schema:
raise ValueError(f'`{column}` column does not exist in the right source.')
if len(on) == 0:
raise ValueError('`on` parameter can not be empty list.')
on_list = on
on = (left[on_list[0]] == right[on_list[0]])
for column in on_list[1:]:
on = on & (left[column] == right[column])
timezone_hack = None
if re.search(r'\b_TIMEZONE\b', str(on)):
# join does not support using _TIMEZONE pseudo-field in join_criteria,
# replacing it with temporary fields in the branches
timezone_hack = '__TIMEZONE_HACK__'
left[timezone_hack] = left['_TIMEZONE']
right[timezone_hack] = right['_TIMEZONE']
if str(on) == "all":
on = f'1 = 1 or {rprefix}.TIMESTAMP >= 0'
_LEFT_NODE_NAME = "__SRC_LEFT__" # this is internal name
_RIGHT_NODE_NAME = rprefix
initial_left_use_name_for_column_prefix = left.use_name_for_column_prefix()
initial_right_use_name_for_column_prefix = right.use_name_for_column_prefix()
initial_left_source_node_name = left.node_name()
initial_right_source_node_name = right.node_name()
left.use_name_for_column_prefix(True)
left.node_name(_LEFT_NODE_NAME)
# we have to add _source prefix to all column operations
# `on` expression is written with right, so we should modify it, we will restore it later
right.use_name_for_column_prefix(True) # add prefix to every operation with that table in on
right.node_name(_RIGHT_NODE_NAME)
columns_name_set = set()
columns = {}
fields_to_skip_right_source = {'TIMESTAMP'}
for name, dtype in chain(left.columns(skip_meta_fields=True).items(), right.columns(skip_meta_fields=True).items()):
if name in columns_name_set:
columns[_RIGHT_NODE_NAME + "_" + name] = dtype
fields_to_skip_right_source.add(name)
else:
columns[name] = dtype
columns_name_set.add(name)
if how == "outer":
join_type = "LEFT_OUTER"
elif how == "inner":
join_type = "INNER"
else:
raise ValueError("The 'how' parameter has wrong value. Only 'outer' and 'inner' are supported")
if timezone_hack:
on = re.sub(r'\._TIMEZONE\b', f'.{timezone_hack}', str(on))
on = re.sub(r'\b_TIMEZONE\b', f'{_LEFT_NODE_NAME}.{timezone_hack}', str(on))
# ------------------
# create objects
params = {"join_criteria": str(on), "join_type": join_type, "left_source": _LEFT_NODE_NAME}
# return states of sources back
left.use_name_for_column_prefix(initial_left_use_name_for_column_prefix)
left.node_name(initial_left_source_node_name)
right.use_name_for_column_prefix(initial_right_use_name_for_column_prefix)
right.node_name(initial_right_source_node_name)
if str(on) == "same_size":
result = output_type(node=otq.JoinSameSizeTs(), **columns)
else:
result = output_type(node=otq.Join(**params), **columns)
__copy_sources_on_merge_or_join(result, (left, right),
names=(_LEFT_NODE_NAME, _RIGHT_NODE_NAME),
output_type_index=output_type_index)
rename_fields_dict = {}
for lc, rc in zip_longest(left.columns(skip_meta_fields=True), right.columns(skip_meta_fields=True)):
if lc:
rename_fields_dict[f"{_LEFT_NODE_NAME}.{lc}"] = lc
if rc:
if rc not in fields_to_skip_right_source:
rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"] = rc
else:
rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"] = f"{_RIGHT_NODE_NAME}_{rc}"
__rename_fields(result, rename_fields_dict)
result.sink(otq.Passthrough(fields=_LEFT_NODE_NAME + ".TIMESTAMP", drop_fields=True))
items = []
for name, dtype in result.columns(skip_meta_fields=True).items():
items.append(type2str(dtype) + " " + name)
if keep_fields_not_in_schema:
# Here we try to preserve fields of original sources that were not in schema
# in their original form. If there's a duplication of fields or any other problem
# in runtime, we'll be able to do nothing
result.sink(otq.Passthrough(fields=_RIGHT_NODE_NAME + ".TIMESTAMP", drop_fields=True))
result.sink(otq.RenameFieldsEp(rename_fields=rf"{_LEFT_NODE_NAME}\.(.*)=\1,{_RIGHT_NODE_NAME}\.(.*)=\1",
use_regex=True))
result.sink(otq.Table(fields=",".join(items), keep_input_fields=True))
else:
result.sink(otq.Table(fields=",".join(items)))
if timezone_hack:
result = result.drop([
field for field in result.schema
if field.endswith(timezone_hack)
])
left.drop(timezone_hack, inplace=True)
right.drop(timezone_hack, inplace=True)
for column in on_list:
result.drop(f'{_RIGHT_NODE_NAME}_{column}', inplace=True)
return result
[docs]def join_by_time(sources, how="outer", on=None, policy=None, check_schema=True, leading=0,
match_if_identical_times=None, output_type_index=None, use_rename_ep=True):
"""
Joins ticks from multiple input time series, based on input tick timestamps.
``leading`` source tick joined with already arrived ticks from other sources.
>>> leading = otp.Ticks(A=[1, 2], offset=[1, 3])
>>> other = otp.Ticks(B=[1], offset=[2])
>>> otp.join_by_time([leading, other])()
Time A B
0 2003-12-01 00:00:00.001 1 0
1 2003-12-01 00:00:00.003 2 1
In case you willing to add prefix/suffix to all columns in one of the sources you should use
:func:`Source.add_prefix` or :func:`Source.add_suffix`
Parameters
----------
sources: Collection[:class:`Source`]
The collection of Source objects which will be joined
how: 'outer' or 'inner'
The method of join (inner or outer)
on: Collection[:class:`Column`]
``on`` add an extra check to join - only ticks with same ``on`` fields will be joined
>>> leading = otp.Ticks(A=[1, 2], offset=[1, 3])
>>> other = otp.Ticks(A=[2, 2], B=[1, 2], offset=[0, 2])
>>> otp.join_by_time([leading, other], on=['A'])()
Time A B
0 2003-12-01 00:00:00.001 1 0
1 2003-12-01 00:00:00.003 2 2
policy: 'arrival_order', 'latest_ticks', 'each_for_leader_with_first' or 'each_for_leader_with_latest'
Policy of joining ticks with same timestamps
>>> leading = otp.Ticks(A=[1, 2], offset=[0, 0], OMDSEQ=[0, 3])
>>> other = otp.Ticks(B=[1, 2], offset=[0, 0], OMDSEQ=[2, 4])
Note: in the examples below we assume that all ticks have same timestamps, but order of ticks as in example.
OMDSEQ is a special field that store order of ticks with same timestamp
- ``arrival_order``
output tick generated on arrival of ``leading`` source tick
>>> data = otp.join_by_time([leading, other], policy='arrival_order')
>>> otp.run(data)[['Time', 'A', 'B']]
Time A B
0 2003-12-01 1 0
1 2003-12-01 2 1
- ``latest_ticks``
Tick generated at the time of expiration of a particular timestamp (when all ticks from all sources
for current timestamp arrived). Only latest tick from ``leading`` source will be used.
>>> data = otp.join_by_time([leading, other], policy='latest_ticks')
>>> otp.run(data)[['Time', 'A', 'B']]
Time A B
0 2003-12-01 2 2
- ``each_for_leader_with_first``
Each tick from ``leading`` source will be joined with first tick from other sources for current timestamp
>>> data = otp.join_by_time(
... [leading, other],
... policy='each_for_leader_with_first'
... )
>>> otp.run(data)[['Time', 'A', 'B']]
Time A B
0 2003-12-01 1 1
1 2003-12-01 2 1
- ``each_for_leader_with_latest``
Each tick from ``leading`` source will be joined with last tick from other sources for current timestamp
>>> data = otp.join_by_time(
... [leading, other],
... policy='each_for_leader_with_latest'
... )
>>> otp.run(data)[['Time', 'A', 'B']]
Time A B
0 2003-12-01 1 2
1 2003-12-01 2 2
check_schema: bool
If True onetick.py will check that all columns names are unambiguous
and columns listed in `on` param are exists in sources schema.
Which can lead to false positive error
in case of some event processors were sink to Source. To avoid this set check_scheme to False.
leading: int, 'all', :class:`Source`, list of int, list of :class:`Source`
A list sources or their indexes. If this parameter is 'all', every source is considered to be leading.
match_if_identical_times: bool
A True value of this parameter causes an output tick to be formed from input ticks with identical timestamps
only. If ``on`` is set to 'outer', default values of fields (``otp.nan``, 0, empty string) are propagated for
sources that did not tick at a given timestamp.
If this parameter is set to True, the default value of ``policy`` parameter is set to 'latest_ticks'.
output_type_index: int
Specifies index of source in ``sources`` from which type and properties of output will be taken.
Useful when joining sources that inherited from :class:`Source`.
By default output object type will be :class:`Source`.
use_rename_ep: bool
Use :py:class:`onetick.query.RenameFields` event processor or not.
This event processor can't be used in generic aggregation.
See also
--------
**JOIN_BY_TIME** OneTick event processor
Examples
--------
>>> d1 = otp.Ticks({'A': [1, 2, 3], 'offset': [1, 2, 3]})
>>> d2 = otp.Ticks({'B': [1, 2, 3], 'offset': [1, 2, 4]})
>>> otp.join_by_time([d1, d2])()
Time A B
0 2003-12-01 00:00:00.001 1 0
1 2003-12-01 00:00:00.002 2 1
2 2003-12-01 00:00:00.003 3 2
>>> otp.join_by_time([d1, d2], leading=1)()
Time A B
0 2003-12-01 00:00:00.001 1 1
1 2003-12-01 00:00:00.002 2 2
2 2003-12-01 00:00:00.004 3 3
>>> otp.join_by_time([d1, d2], leading=1, match_if_identical_times=True)()
Time A B
0 2003-12-01 00:00:00.001 1 1
1 2003-12-01 00:00:00.002 2 2
2 2003-12-01 00:00:00.004 0 3
Adding prefix to right source for all columns:
>>> otp.join_by_time([d1, d2.add_prefix('right_')])()
Time A right_B
0 2003-12-01 00:00:00.001 1 0
1 2003-12-01 00:00:00.002 2 1
2 2003-12-01 00:00:00.003 3 2
Returns
-------
:class:`Source` or same class as ``sources[output_type_index]``
A time series of ticks.
"""
output_type = output_type_by_index(sources, output_type_index)
join_str_keys = []
# if key is set, then generalize it, ie convert into list;
# then remove keys from 'columns_count' dict to pass validation after
if on is not None:
if isinstance(on, list):
pass
elif isinstance(on, Column):
on = [on]
elif isinstance(on, str):
on = [on]
else:
raise TypeError(f"It is not supported to have '{type(on)}' type as a key")
for join_key in on:
dtypes = set()
if check_schema:
for source in sources:
try:
key_type = source.schema[str(join_key)]
except KeyError as e:
raise KeyError(f"Column '{join_key}' not found in source schema {source}") from e
type_name = type2str(key_type)
if type_name == "string[64]":
type_name = "string"
dtypes.add(type_name)
if len(dtypes) > 1:
raise TypeError(f"Column '{join_key}' has different types in sources: {dtypes}")
if isinstance(join_key, Column):
join_str_keys.append(str(join_key))
elif isinstance(join_key, str):
join_str_keys.append(join_key)
if check_schema:
_check_schema_for_join_by_time(join_str_keys, sources)
if how not in ["inner", "outer"]:
raise Exception('Wrong value for the "how" parameter. It is allowed to use "inner" or "outer" values')
join_type = how.upper()
# ------------------
# create objects
params = {"add_source_prefix": False, "join_type": join_type}
leading = _fill_leading_sources_param(leading, params, sources)
if on is not None:
params["join_keys"] = ",".join(join_str_keys)
if policy is not None:
policies = {"arrival_order", "latest_ticks", "each_for_leader_with_first", "each_for_leader_with_latest"}
if policy.lower() not in policies:
raise ValueError("Invalid policy. Only the following ones are allowed: " + ", ".join(policies) + ".")
params["same_timestamp_join_policy"] = policy.upper()
if match_if_identical_times is not None:
params["match_if_identical_times"] = match_if_identical_times
columns = {name: dtype for src in sources for name, dtype in src.columns(skip_meta_fields=True).items()}
result = output_type(node=otq.JoinByTime(**params), **columns)
__copy_sources_on_merge_or_join(result, sources,
names=True,
drop_meta=True,
leading=leading,
output_type_index=output_type_index,
use_rename_ep=use_rename_ep)
if how == "outer":
items = []
for name, dtype in result.columns(skip_meta_fields=True).items():
items.append(type2str(dtype) + " " + name)
result.sink(otq.Table(fields=",".join(items), keep_input_fields=True))
return result
@singledispatch
def _fill_leading_sources_param(leading, params, sources):
from onetick.py.core.source import _Source
if isinstance(leading, _Source): # TODO: PY-104 Get rid of circular dependencies in code to avoid local import
result = f"__SRC_{__find_by_id(sources, leading)}__"
params["leading_sources"] = result
result = [result]
elif leading == "all": # all sources are leading which is specified by empty string
params["leading_sources"] = ""
result = []
else:
raise ValueError(
"wrong leading param was specified, please use any of int, 'all' literal, " "list of int, list of _Source"
)
return result
@_fill_leading_sources_param.register(int)
def _(leading, params, sources):
if leading < 0:
leading = len(sources) + leading
if 0 <= leading < len(sources):
result = f"__SRC_{leading}__"
params["leading_sources"] = result
return [result]
else:
raise ValueError(
f"leading source index should be in range(-len(source), len(source)), " f"but {leading} was specified."
)
@_fill_leading_sources_param.register(list) # type: ignore # _ already defined above
@_fill_leading_sources_param.register(tuple)
def _(leading, params, sources):
if len(leading) > len(sources):
raise ValueError("Number of leading sources can't be bigger number of sources")
elif len(leading) == len(sources):
Warning(
"You've specified leading and source lists of the same size, you can specify leading='all' " "instead of it"
)
if isinstance(leading[0], int):
result = leading
else:
result = [__find_by_id(sources, lead) for lead in leading]
indexes = ",".join(f"__SRC_{i}__" for i in result)
params["leading_sources"] = indexes
return result
def __find_by_id(collection, item):
for index, s in enumerate(collection):
if s is item:
return index
raise ValueError("The leading source should be in join sources list")
def _check_schema_for_join_by_time(join_str_keys, sources):
# check that there aren't matching columns
columns_count = Counter()
for src in sources:
columns_count.update(src.columns(skip_meta_fields=True).keys())
for join_key in join_str_keys:
del columns_count[join_key]
matched = [k for k, value in columns_count.items() if value > 1]
if "OMDSEQ" in matched:
# OMDSEQ behaves like the TIMESTAMP field
matched.remove("OMDSEQ")
if len(matched):
raise Exception(f"There are matched columns between sources: {','.join(matched)}")
def apply_query(query,
in_sources=None,
output_pins=None,
shared_state_variables_list=None,
output_type_index=None,
**params):
from onetick.py.sources import query as otp_query
output_type = output_type_by_index(in_sources, output_type_index)
output_pins = output_pins if output_pins else []
in_sources = in_sources if in_sources else {}
shared_state_variables_list = shared_state_variables_list if shared_state_variables_list else []
if isinstance(query, str):
# it seems that path is passed
query = otp_query(query, **params)
elif isinstance(query, otp_query):
if params:
query.update_params(**params)
# elif
columns = {}
for src in in_sources.values():
columns.update(src.columns(skip_meta_fields=True))
str_params = query.str_params
shared_state_variables = ",".join(shared_state_variables_list)
inputs_need_unbound_symbols = {in_pin: src._is_unbound_required() for in_pin, src in in_sources.items()}
if query.graph_info.has_unbound_if_pinned(inputs_need_unbound_symbols):
symbol = adaptive
else:
symbol = None
nested_src = output_type(
node=otq.NestedOtq(query.path, str_params, shared_state_variables=shared_state_variables),
_has_output=len(output_pins) > 0,
_symbols=symbol,
**columns,
)
eps = defaultdict()
for in_pin, src in in_sources.items():
nested_src.source(src.node().copy_graph(eps), in_pin)
nested_src.node().add_rules(src.node().copy_rules())
nested_src._set_sources_dates(src)
nested_src._merge_tmp_otq(src)
if len(output_pins) == 0:
# output_pins = ['OUT']
return nested_src
if len(output_pins) > 1:
result = []
for out_pin in output_pins:
res_src = nested_src.copy()
res_src.node().out_pin(out_pin)
# NOTE: need to comment out this node
res_src.sink(otq.Passthrough())
# apply config customization
query.config.apply(out_pin, res_src)
result.append(res_src)
return tuple(result)
else:
# TODO: move setting out_pin on the creating step of nested_src
# It seems as not working now, because seems .copy() of _Source doesnt
# copy out_pin reference, need to check
nested_src.node().out_pin(output_pins[0])
# apply config customization
query.config.apply(output_pins[0], nested_src)
return nested_src
def apply(query, *args, **kwargs):
return apply_query(query.path, *args, **kwargs, **query.params)
[docs]def cut(column: 'Column', bins: Union[int, List[float]], labels: List[str] = None):
"""
Bin values into discrete intervals (mimics :pandas:`pandas.cut`).
Parameters
----------
column: :py:class:`~onetick.py.Column`
Column with numeric data used to build bins.
bins: int or List[float]
When List[float] - defines the bin edges.
When int - Defines the number of equal-width bins in the range of x.
labels: List[str]
Labels used to name resulting bins.
If not set, bins are numeric intervals like (5.0000000000, 7.5000000000].
Return
------
object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__`
Examples
--------
>>> # OTdirective: snippet-name: Source.functions.cut;
>>> data = otp.Ticks({"X": [9, 8, 5, 6, 7, 0, ]})
>>> data['bin'] = otp.cut(data['X'], bins=3, labels=['a', 'b', 'c'])
>>> otp.run(data)[['X', 'bin']]
X bin
0 9 c
1 8 c
2 5 b
3 6 b
4 7 c
5 0 a
"""
src = column.obj_ref
return _CutBuilder(src, column, bins, labels=labels)
[docs]def qcut(column: 'Column', q: Union[int, List[float]], labels: List[str] = None):
"""
Quantile-based discretization function (mimics :pandas:`pandas.qcut`).
Parameters
----------
column: :py:class:`~onetick.py.Column`
Column with numeric data used to build bins.
q: int or List[float]
When List[float] - array of quantiles, e.g. [0, .25, .5, .75, 1.] for quartiles.
When int - Number of quantiles. 10 for deciles, 4 for quartiles, etc.
labels: List[str]
Labels used to name resulting bins.
If not set, bins are numeric intervals like (5.0000000000, 7.5000000000].
Return
------
object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__`
Examples
--------
>>> # OTdirective: snippet-name: Source.functions.qcut;
>>> data = otp.Ticks({"X": [10, 3, 5, 6, 7, 1]})
>>> data['bin'] = otp.qcut(data['X'], q=3, labels=['a', 'b', 'c'])
>>> otp.run(data)[['X', 'bin']]
X bin
0 10 c
1 3 a
2 5 b
3 6 b
4 7 c
5 1 a
"""
# TODO when q is a List[float] like [0, .25, .5, .75, 1.]
src = column.obj_ref
return _QCutBuilder(src, column, q, labels=labels)
[docs]def coalesce(sources, max_source_delay: float = 0.0, output_type_index: int = None):
"""
Used to fill the gaps in one time series with the ticks from one or several other time series.
This event processor considers ticks that arrive from several sources at the same time as being the same,
allowing for possible delay across the sources when determining whether the ticks are the same.
When the same tick arrives from several sources, it is only propagated from the source
that has the highest priority among those sources.
Input ticks do not necessarily have the same structure - they can have different fields.
In order to distinguish time series the event processor adds the SYMBOL_NAME field.
Also SOURCE field is added to each tick which lacks it to identify the source from which the tick is coming.
Hence, one must avoid adding SOURCE field in event processors positioned after COALSECE.
Parameters
----------
sources: list of :class:`Source`
List of the sources to coalesce. Also, this list is treated as priority order.
First member of the list has the highest priority when determining whether ticks are the same.
max_source_delay: float
The maximum time in seconds by which a tick from one input time series
can arrive later than the same tick from another time series.
output_type_index: int
Specifies index of source in ``sources`` from which type and properties of output will be taken.
Useful when merging sources that inherited from :class:`Source`.
By default, output object type will be :class:`Source`.
Return
------
:class:`Source`
A time series of ticks.
See also
--------
**COALESCE** OneTick event processor
Examples
--------
If ticks from different sources have the same time,
only the tick from source with the highest priority will be propagated.
>>> data1 = otp.Ticks(A=[1, 2])
>>> data2 = otp.Ticks(A=[3, 4])
>>> data = otp.coalesce([data2, data1])
>>> otp.run(data)[['Time', 'A']]
Time A
0 2003-12-01 00:00:00.000 3
1 2003-12-01 00:00:00.001 4
We can use ``max_source_delay`` parameter to expand time interval in which
ticks are considered to have the "same time".
>>> data1 = otp.Ticks({
... 'A': [1, 2, 3],
... 'offset': [0, 3000, 6000],
... })
>>> data2 = otp.Ticks({
... 'A': [4, 5, 6],
... # 4 is delayed by less than one second from 1
... # 5 is delayed by one second from 2
... # 6 is delayed by more than one second from 3
... 'offset': [999, 4000, 7001],
... })
>>> data = otp.coalesce([data2, data1], max_source_delay=1)
>>> otp.run(data)[['Time', 'A']]
Time A
0 2003-12-01 00:00:00.999 4
1 2003-12-01 00:00:04.000 5
2 2003-12-01 00:00:06.000 3
3 2003-12-01 00:00:07.001 6
"""
if not sources:
raise ValueError("Coalesce should have one or more inputs")
output_type = output_type_by_index(sources, output_type_index)
# change node names for sources, COALESCE ep needs them
new_node_names = [
f'__COALESCE_SRC_{i}__' for i, source in enumerate(sources, start=1)
]
node = otq.Coalesce(
priority_order=','.join(new_node_names),
max_source_delay=max_source_delay,
)
columns = {
# these fields will be added by COALESCE ep
'SYMBOL_NAME': str,
'TICK_TYPE': str,
}
for source in sources:
for name in ['SYMBOL_NAME', 'TICK_TYPE']:
if name in source.schema:
raise ValueError(f"Field with name '{name}' is already present in the source. "
'Please, rename or delete that field prior to invoking coalesce().')
shared_columns = set(source.schema).intersection(columns)
for name in shared_columns:
type_1, type_2 = source.schema[name], columns[name]
if type_1 != type_2:
raise ValueError(f"Conflicting types for field '{name}' in different sources: {type_1}, {type_2}")
columns.update(source.schema)
# TODO: do we need field SOURCE (especially when node names are auto-generated)?
# this field will be added by COALESCE if it's not presented in sources
columns.setdefault('SOURCE', str)
result = output_type(node, **columns)
__copy_sources_on_merge_or_join(result, sources, names=new_node_names, output_type_index=output_type_index)
return result
[docs]def corp_actions(source,
adjustment_date: Union[date, otp_datetime, int, str, None] = None,
adjustment_date_tz: str = default,
fields=None,
adjust_rule="PRICE",
apply_split: bool = True,
apply_spinoff: bool = False,
apply_cash_dividend: bool = False,
apply_stock_dividend: bool = False,
apply_security_splice: bool = False,
apply_others: str = "",
apply_all: bool = False,
):
"""Adjusts values using corporate actions information loaded into OneTick
from the reference data file. To use it, location of reference database must
be specified via OneTick configuration.
Parameters
----------
source : onetick.py.Source
Source object adjusted by corporate actions information.
adjustment_date : otp.date, otp.datetime, int, str, None, optional
The date as of which the values are adjusted. `int` format is YYYYMMDD.
If it is not set, the values are adjusted as of the end date in the query.
All corporate actions of the types specified in the parameters
that lie between the tick timestamp and the adjustment date will be applied to each tick.
Notice that the adjustment date is not affected neither by _SYMBOL_PARAM._PARAM_END_TIME
nor by the apply_times_daily setting.
When parameter is in YYYYMMDD format, the time is assumed to be 17:00:00 GMT.
By default None
adjustment_date_tz : str, optional
Timezone for adjustment date, by default global `otp.config.tz` value used.
Local timezone can't be used so in this case parameter is set to 'GMT'.
When `adjustment_date` is in YYYYMMDD format, `adjustment_date_tz` parameter is set to 'GMT'.
fields : str, optional
A comma-separated list of fields to be adjusted. If this parameter is not set,
some default adjustments will take place if appropriately named fields exist in the tick:
- If the ADJUST_RULE parameter is set to PRICE, and the PRICE field is present,
it will get adjusted. If the fields ASK_PRICE or BID_PRICE are present, they will get adjusted.
If fields ASK_VALUE or BID_VALUE are present, they will get adjusted
- If the ADJUST_RULE parameter is set to SIZE, and the SIZE field is present,
it will get adjusted. If the fields ASK_SIZE or BID_SIZE are present, they will get adjusted.
If fields ASK_VALUE or BID_VALUE are present, they will get adjusted.
By default None
adjust_rule : str, optional
When set to PRICE, adjustments are applied under the assumption that fields to be adjusted contain prices
(adjustment direction is determined appropriately).
When set to SIZE, adjustments are applied under the assumption that fields contain sizes
(adjustment direction is opposite to that when the parameter's value is PRICE).
By default "PRICE"
apply_split : bool, optional
If true, adjustments for splits are applied, by default `True`
apply_spinoff : bool, optional
If true, adjustments for spin-offs are applied, by default `False`
apply_cash_dividend : bool, optional
If true, adjustments for cash dividends are applied, by default `False`
apply_stock_dividend : bool, optional
If true, adjustments for stock dividends are applied, by default `False`
apply_security_splice : bool, optional
If true, adjustments for security splices are applied, by default `False`
apply_others : str, optional
A comma-separated list of names of custom adjustment types to apply, by default ""
apply_all : bool, optional
If true, applies all types of adjustments, both built-in and custom, by default `False`
Returns
-------
onetick.py.Source
A new source object with applied adjustments.
See also
--------
**CORP_ACTIONS** OneTick event processor
Examples
--------
>>> src = otp.DataSource('NYSE_TAQ',
... tick_type='TRD',
... start=otp.dt(2022, 5, 20, 9, 30),
... end=otp.dt(2022, 5, 26, 16))
>>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22))
>>> df["PRICE"][0]
0.0911
>>> src = otp.corp_actions(src,
... adjustment_date=otp.date(2022, 5, 22),
... fields="PRICE",)
>>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22))
>>> df["PRICE"][0]
1.36649931675
"""
if not isinstance(adjustment_date, (date, int, type(None))):
adjustment_date = date(adjustment_date)
if isinstance(adjustment_date, date):
adjustment_date = int(adjustment_date.to_str())
adjustment_date_tz_is_default = False
if adjustment_date_tz is default:
adjustment_date_tz_is_default = True
adjustment_date_tz = config.tz
if not adjustment_date_tz:
warnings.warn("Local timezone can't be used in parameter 'adjustment_date_tz', setting to 'GMT'.")
adjustment_date_tz = 'GMT'
if isinstance(adjustment_date, (date, int, str)) and adjustment_date_tz != 'GMT':
adjustment_date_tz = 'GMT'
if not adjustment_date_tz_is_default:
warnings.warn("`adjustment_date_tz` was changed to 'GMT' since "
"it is the only valid value when `adjustment_date` is in YYYYMMDD format.")
source.sink(otq.CorpActions(
adjustment_date=adjustment_date,
adjustment_date_tz=adjustment_date_tz,
fields=fields,
adjust_rule=adjust_rule,
apply_split=apply_split,
apply_spinoff=apply_spinoff,
apply_cash_dividend=apply_cash_dividend,
apply_stock_dividend=apply_stock_dividend,
apply_security_splice=apply_security_splice,
apply_others=apply_others,
apply_all=apply_all,
))
return source
def save_sources_to_single_file(sources,
file_path=None,
file_suffix='',
start=None,
end=None,
start_time_expression=None,
end_time_expression=None,
timezone=None,
running_query_flag=None):
"""
Save onetick.py.Source objects to the single file.
Parameters
----------
sources: dict or list
dict of names -> sources or list of sources to merge into single file.
If it's the list then names will be autogenerated.
Source can be :class:`otp.Source` object or dictionary with these allowed parameters:
{
'source': otp.Source,
'start': datetime(2022, 1, 1), # optional
'end': datetime(2022, 1, 2), # optional
'symbols': otp.Source or otp.Symbols, # optional
}
file_path: str, optional
Path to the file where all sources will be saved.
If not set, sources will be saved to temporary file and its name will be returned.
file_suffix: str
Only used if ``file_path`` is not set.
This suffix will be added to the name of a generated query file.
start: datetime, optional
start time for the resulting query file
end: datetime, optional
end time for the resulting query file
start_time_expression: str, optional
start time expression for the resulting query file
end_time_expression: str, optional
end time expression for the resulting query file
timezone: str, optional
timezone for the resulting query file
running_query_flag: bool, optional
running query flag for the resulting query file
Returns
-------
If `sources` is list then returns list of full query paths (path_to_file::query_name)
with autogenerated names corresponding to each source from `sources`.
If `sources` is dict then the path to the query file is returned.
"""
if isinstance(sources, dict):
names = sources.keys()
sources = sources.values()
query_names = None
else:
names = repeat(None)
query_names = []
tmp_otq = TmpOtq()
for name, source in zip(names, sources):
query_start = query_end = query_symbols = None
if isinstance(source, dict):
query_start = source.get('start')
query_end = source.get('end')
query_symbols = source.get('symbols')
source = source['source']
query_name = source._store_in_tmp_otq(tmp_otq,
name=name,
start=query_start,
end=query_end,
symbols=query_symbols)
if query_names is not None:
query_names.append(query_name)
file_path = tmp_otq.save_to_file(
file_path=file_path,
file_suffix=file_suffix,
start=start,
end=end,
start_time_expression=start_time_expression,
end_time_expression=end_time_expression,
timezone=timezone,
running_query_flag=running_query_flag,
)
if query_names is not None:
return [f'{file_path}::{query_name}' for query_name in query_names]
return file_path