import inspect
import datetime
import warnings
from typing import Union, List, Optional, Dict, Any, Callable
from collections import defaultdict
import onetick.query as otq
import pandas as pd
import pyomd
from onetick import py as otp
from onetick.py import utils, configuration
from onetick.py.core.column_operations.base import _Operation
from onetick.py.types import time2nsectime
from onetick.py.core.source import _is_dict_required
from onetick.lib.instance import OneTickLib
[docs]def run(query: Union[Callable, Dict, otp.Source, otp.MultiOutputSource, otp.query, str, otq.EpBase, otq.GraphQuery,
otq.ChainQuery, otq.Chainlet],
*,
symbols: Union[List[Union[str, otq.Symbol]], otp.Source, str, None] = None,
start: Union[datetime.datetime, otp.datetime, pyomd.timeval_t, None] = utils.adaptive,
end: Union[datetime.datetime, otp.datetime, pyomd.timeval_t, None] = utils.adaptive,
start_time_expression: Optional[str] = None,
end_time_expression: Optional[str] = None,
timezone=utils.default, # type: ignore
context=utils.default, # type: ignore
username: Optional[str] = None,
alternative_username: Optional[str] = None,
password: Optional[str] = None,
batch_size: Optional[int] = utils.default,
running: Optional[bool] = False,
query_properties: Optional[pyomd.QueryProperties] = None,
concurrency: Optional[int] = utils.default,
apply_times_daily: Optional[int] = None,
symbol_date: Union[datetime.datetime, int, None] = None,
query_params: Optional[Dict[str, Any]] = None,
time_as_nsec: bool = True,
treat_byte_arrays_as_strings: bool = True,
output_matrix_per_field: bool = False,
output_structure: Optional[str] = None,
return_utc_times: Optional[bool] = None,
connection=None,
callback=None,
svg_path=None,
use_connection_pool: bool = False,
node_name: Union[str, List[str], None] = None,
require_dict: bool = False,
max_expected_ticks_per_symbol: Optional[int] = None):
"""
Executes a query and returns its result.
Parameters
----------
query: :py:class:`onetick.py.Source`, otq.Ep, otq.Graph, otq.GraphQuery, otq.ChainQuery, str, otq.Chainlet
Query to execute can be source, path of the query on a disk or onetick.query graph or event processor.
For running OTQ files, it represents the path (including filename) to the OTQ file to run a single query within
the file. If more than one query is present, then the query to be run must be specified
(that is, ``'path_to_file/otq_file.otq::query_to_run'``).
symbols: str, list of str, list of otq.Symbol, :py:class:`onetick.py.Source`, pd.DataFrame, optional
Symbol(s) to run the query for passed as a string, a list of strings, a pd.DataFrame with the ``SYMBOL_NAME``
column, or as a "symbols" query which results include the ``SYMBOL_NAME`` column. The start/end times for the
symbols query will taken from the params below. See :ref:`symbols <Symbols>` for more details.
start: datetime.datetime, :py:class:`onetick.py.datetime`, :py:class:`pyomd.timeval_t`, optional
The start time of the query. If datetime.datetime was pass timezone of object is ignored by Onetick,
therefore we suggest using otp.datetime objects only as an argument.
onetick.py uses otp.config.default_start_time as default value,
if you don't want to specify start time, e.g. to use saved time of the query,
then you should specify None value.
See also timezone argument.
end: datetime.datetime, :py:class:`onetick.py.datetime`, :py:class:`pyomd.timeval_t`, optional
The end time of the query. If datetime.datetime was pass timezone of object is ignored by Onetick,
therefore we suggest using otp.datetime objects only as an argument.
See also timezone argument.
onetick.py uses otp.config.default_end_time as default value,
if you don't want to specify end time, e.g. to use saved time of the query,
then you should specify None value.
start_time_expression: str, optional
Start time onetick expression of the query. If specified, it will take precedence over start.
Supported only if query is Source, Graph or Event Processor.
end_time_expression: str, optional
End time onetick expression of the query. If specified, it will take precedence over end.
Supported only if query is Source, Graph or Event Processor.
timezone: str, optional
The timezone of start and end times, as well as of the output timestamps. It has higher priority then timezone
of start and end parameters. If parameter is omitted timestamps of ticks will be formatted with
the default timezone.
context: str (defaults to otp.config.default_context), optional
Allows specification of different instances of OneTick tick_servers to connect to
username
The username to make the connection.
By default the user which executed the process is used.
alternative_username: str
The username used for authentication.
Needs to be set only when the tick server is configured to use password-based authentication.
By default, ``otp.config.default_auth_username`` is used.
password: str, optional
The password used for authentication.
Needs to be set only when the tick server is configured to use password-based authentication.
Note: not supported and ignored on older OneTick versions.
By default, ``otp.config.default_password`` is used.
batch_size: int
number of symbols to run in one batch.
By default, the value from otp.config.default_batch_size is used.
running: bool, optional
Indicates whether a query is CEP or not. Default is `False`.
query_properties: :py:class:`pyomd.QueryProperties`, optional
Query properties, such as ONE_TO_MANY_POLICY, ALLOW_GRAPH_REUSE, etc
concurrency: int, optional
The maximum number of CPU cores to use to process the query.
By default, the value from otp.config.default_concurrency is used.
apply_times_daily: bool
Runs the query for every day in the ``start``-``end`` time range,
using the time components of ``start`` and ``end`` times.
symbol_date:
The symbol date used to look up symbology mapping information in the reference database,
expressed as datetime object or integer of YYYYMMDD format
query_params: dict
Parameters of the query.
time_as_nsec: bool
Outputs timestamps up to nanoseconds granularity
(defaults to False: by default we output timestamps in microseconds granularity)
treat_byte_arrays_as_strings: bool
Outputs byte arrays as strings (defaults to True)
output_matrix_per_field: bool
Changes output format to list of matrices per field.
output_structure: otp.Source.OutputStructure, optional
Structure (type) of the result. Supported values are:
- `df` (default) - the result is returned as pandas.DataFrame
or dict[symbol: pandas.Dataframe] in case of using multiple symbols or first stage query.
- `map` - the result is returned as SymbolNumpyResultMap.
- `list` - the result is returned as list.
return_utc_times: bool
If True Return times in UTC timezone and in local timezone otherwise
connection: :py:class:`pyomd.Connection`
The connection to be used for discovering nested .otq files
callback: :py:class:`onetick.py.CallbackBase`
Class with callback methods.
If set, the output of the query should be controlled with callbacks
and this function returns nothing.
svg_path
use_connection_pool
node_name: str, List[str], optional
Name of the output node to select result from. If query graph has several output nodes, you can specify the name
of the node to choose result from. If node_name was specified, query should be presented by path on the disk
and output_structure should be `df`
require_dict: bool
If set to True, result will be forced to be a dictionary even if it's returned for a single symbol
max_expected_ticks_per_symbol: int
Expected maximum number of ticks per symbol (used for performance optimizations).
By default, ``otp.config.max_expected_ticks_per_symbol`` is used.
Note
----
It is possible to log currently executed symbol. For that `otp.config.log_symbol` should be set to `True`
(it can be set via `OTP_LOG_SYMBOL` env var). Note, in this case otp.run does not produce the output so
it should be used only for debugging purposes.
Returns
-------
result, list, dict, :pandas:`pandas.DataFrame`, None
result of the query
Examples
--------
Running :py:class:`onetick.py.Source` and setting start and end times
>>> data = otp.Tick(A=1)
>>> otp.run(data, start=otp.dt(2003, 12, 2), end=otp.dt(2003, 12, 4))
Time A
0 2003-12-02 1
Running otq.Ep and passing query parameters
>>> ep = otq.TickGenerator(bucket_interval=0, fields='long A = $X').tick_type('TT')
>>> otp.run(ep, symbols='LOCAL::', query_params={'X': 1})
Time A
0 2003-12-04 1
Running in callback mode
>>> class Callback(otp.CallbackBase):
... def __init__(self):
... self.result = None
... def process_tick(self, tick, time):
... self.result = tick
>>> data = otp.Tick(A=1)
>>> callback = Callback()
>>> otp.run(data, callback=callback)
>>> callback.result
{'A': 1}
"""
_ = OneTickLib()
if timezone is utils.default:
timezone = configuration.config.tz
if context is utils.default:
context = configuration.config.context
if concurrency is utils.default:
concurrency = configuration.config.default_concurrency
if batch_size is utils.default:
batch_size = configuration.config.default_batch_size
if query_properties is None:
query_properties = pyomd.QueryProperties()
str_qp = query_properties.convert_to_name_value_pairs_string().c_str()
if not next(filter(lambda k: k == 'USE_FT', map(lambda pair: pair.split('=')[0], str_qp.split(','))), False):
query_properties.set_property_value('USE_FT', otp.config.default_fault_tolerance)
if isinstance(start, _Operation) and start_time_expression is None:
start_time_expression = str(start)
start = utils.adaptive
if isinstance(end, _Operation) and end_time_expression is None:
end_time_expression = str(end)
end = utils.adaptive
if inspect.ismethod(query) or inspect.isfunction(query):
t_s = None
if isinstance(symbols, otp.Source):
t_s = symbols
if isinstance(symbols, otp.query):
t_s = otp.Query(symbols)
if isinstance(symbols, str):
t_s = otp.Tick(SYMBOL_NAME=symbols)
if isinstance(symbols, list):
t_s = otp.Ticks(SYMBOL_NAME=symbols)
if isinstance(t_s, otp.Source):
query = query(t_s.to_symbol_param()) # type: ignore
query, query_params = _preprocess_otp_query(query, query_params)
# If query is an otp.Source object, then it can deal with otp.datetime and pd.Timestamp types
if callback is None and otp.config.log_symbol:
callback = LogCallback(query)
output_mode = otq.QueryOutputMode.numpy
if callback is not None:
output_mode = otq.QueryOutputMode.callback
output_structure, output_structure_for_otq = _process_output_structure(output_structure)
if symbol_date:
# otq.run supports only strings and datetime.date
symbol_date = otp.date(symbol_date).to_str()
require_dict = require_dict or _is_dict_required(symbols)
# converting symbols properly
if isinstance(symbols, otp.Source):
# check if SYMBOL_NAME is in schema, or if schema contains only one field
if ('SYMBOL_NAME' not in symbols.columns(skip_meta_fields=True).keys()) and \
len(symbols.columns(skip_meta_fields=True)) != 1:
warnings.warn('Using as a symbol list a source without "SYMBOL_NAME" field '
'and with more than one field! This won\'t work unless the schema is incomplete')
symbols = symbols._convert_symbol_to_string(
symbol=symbols,
tmp_otq=query._tmp_otq if isinstance(query, otp.Source) else None,
start=start,
end=end,
timezone=timezone
)
if isinstance(symbols, str):
symbols = [symbols]
if isinstance(symbols, pd.DataFrame):
symbols = utils.get_symbol_list_from_df(symbols)
if isinstance(query, dict):
# we assume it's a dictionary of sources for the MultiOutputSource object
query = otp.MultiOutputSource(query)
if isinstance(query, otp.Source) or isinstance(query, otp.MultiOutputSource):
start = None if start is utils.adaptive else start
end = None if end is utils.adaptive else end
start, end = _get_start_end(start, end, timezone, use_pyomd_timeval=False) # TODO: undstnd why nsec not supptd
param_upd = query._prepare_for_execution(symbols=symbols, start=start, end=end,
timezone=timezone,
start_time_expression=start_time_expression,
end_time_expression=end_time_expression,
require_dict=require_dict,
running_query_flag=running,
node_name=node_name, has_output=None)
for key, value in param_upd.items():
# here we want to make sure we substituted all params from the passed dict,
# so we raise an error if an unknown parameter is passed in the dict
if key == 'query': query = value # noqa: E701
elif key == 'symbols': symbols = value # noqa: E701
elif key == 'start': start = value # noqa: E701
elif key == 'end': end = value # noqa: E701
elif key == 'start_time_expression': start_time_expression = value # noqa: E701
elif key == 'end_time_expression': end_time_expression = value # noqa: E701
elif key == 'require_dict': require_dict = value # noqa: E701
elif key == 'node_name': node_name = value # noqa: E701
elif key == 'time_as_nsec': time_as_nsec = value # noqa: E701
else: raise ValueError('Unknown parameter returned!') # noqa: E701
elif isinstance(query, (otq.graph_components.EpBase, otq.Chainlet)):
query = otq.Graph(query)
start, end = _get_start_end(start, end, timezone)
# if file name is not in single quotes, then put it in single quotes
if isinstance(query, str):
if not query[0] == "'" and not query[-1] == "'":
# callback mode doesn't like single quotes
if output_mode != otq.QueryOutputMode.callback:
query = f"'{query}'"
# authentication
alternative_username = alternative_username or otp.config.default_auth_username
password = password or otp.config.default_password
kwargs = {}
if password is not None:
version = '20221111120000'
if otp.__build__ < version:
warnings.warn(f"otp.run parameter 'password' is not supported on OneTick version '{otp.__build__}'"
f"It is supported by version starting form {version}.")
else:
kwargs['password'] = password
max_expected_ticks_per_symbol = max_expected_ticks_per_symbol or otp.config.max_expected_ticks_per_symbol
version = '20220714120000'
if otp.__build__ < version:
warnings.warn("otp.run parameter 'max_expected_ticks_per_symbol'"
f" is not supported on OneTick version '{otp.__build__}'"
f"It is supported by version starting form {version}.")
else:
kwargs['max_expected_ticks_per_symbol'] = max_expected_ticks_per_symbol
result = otq.run(query, symbols=symbols, start=start, end=end, context=context, username=username,
timezone=timezone, start_time_expression=start_time_expression,
end_time_expression=end_time_expression,
alternative_username=alternative_username, batch_size=batch_size,
running_query_flag=running, query_properties=query_properties,
max_concurrency=concurrency, apply_times_daily=apply_times_daily, symbol_date=symbol_date,
query_params=query_params, time_as_nsec=time_as_nsec,
treat_byte_arrays_as_strings=treat_byte_arrays_as_strings,
output_mode=output_mode,
output_matrix_per_field=output_matrix_per_field, output_structure=output_structure_for_otq,
return_utc_times=return_utc_times, connection=connection,
callback=callback, svg_path=svg_path, use_connection_pool=use_connection_pool, **kwargs)
if output_mode == otq.QueryOutputMode.callback:
return result
# node_names should be either a list of node names or None
if isinstance(node_name, str):
node_names = [node_name]
else:
node_names = node_name
return _format_call_output(result, output_structure=output_structure,
require_dict=require_dict, node_names=node_names)
def _filter_returned_map_by_node(result, node_names):
"""
Here, result has the following format: {symbol: {node_name: data}}
We need to filter by correct node_name
"""
# TODO: implement filtering by node_name in a way
# that no information from SymbolNumpyResultMap object is lost
return result
# if not node_name:
# return result
#
# res = {}
# for symbol, nodes_dict in result.items():
# res[symbol] = {}
# for node, data in nodes_dict.items():
# if node == node_name:
# res[symbol][node] = data
# return res
def _filter_returned_list_by_node(result, node_names):
"""
Here, result has the following format: [(symbol, data_1, data_2, node_name)]
We need to filter by correct node_names
"""
if not node_names:
return result
node_found = False
res = []
empty_result = True
for symbol, data_1, data_2, node in result:
if data_1:
empty_result = False
if node in node_names:
node_found = True
res.append((symbol, data_1, data_2, node))
if not empty_result and not node_found:
# TODO: Do we even want to raise it?
raise Exception(f'No passed node name(s) were found in the results. Passed node names were: {node_names}')
return res
def _form_dict_from_list(data_list, node_names=None):
"""
Here, data_list has the following format: [(symbol, data_1, data_2, node_name)]
We need to create the following result:
either {symbol: pd.DataFrame(data_1)} if there is only one result per symbol
or {symbol: [pd.DataFrame(data_1)]} if there are multiple results for symbol for a single node_name
or {symbol: {node_name: pd.DataFrame(data_1)}} if there are single results for multiple node names for a symbol
or {symbol: {node_name: [pd.DataFrame(data_1)]}} if there are multiple results for multiple node names for a symbol
"""
def reduce_list(lst):
if len(lst) == 1:
return lst[0][1]
elif node_names and len(node_names) == 1:
return list(map(lambda i: i[1], lst))
else:
return lst
def form_node_name_dict(lst):
"""
lst is a lit of (node, dataframe)
"""
d = defaultdict(list)
for node, df in lst:
d[node].append(df)
for node in d.keys(): # noqa
if len(d[node]) == 1:
d[node] = d[node][0]
if len(d) == 1:
d = list(d.values())[0]
else: # converting defaultdict to regular dict
d = dict(d)
return d
def get_dataframe(data):
return pd.DataFrame({col_name: col_value for col_name, col_value in data})
symbols_dict = defaultdict(list)
for symbol, data, _, node in data_list:
df = get_dataframe(data)
list_item = (node, df)
symbols_dict[symbol].append(list_item)
for symbol, lst in symbols_dict.items():
symbols_dict[symbol] = form_node_name_dict(lst)
return dict(symbols_dict)
def _format_call_output(result, output_structure, node_names, require_dict):
"""Formats output of otq.run() according to passed parameters.
See parameters' description for more information
Parameters
----------
output_structure: ['df', 'list', 'map']
If 'df': forms pandas.DataFrame from the result.
Returns a dictionary with symbols as keys if there's more than one symbol
in returned data of if require_dict = True.
Values of the returned dictionary, or returned value itself if no dictionary is formed,
is either a list of tuples: (node_name, dataframe) if there's output for more than one node
or a dataframe
If 'list' or 'map': returns data as returned by otq.run(), possibly filtered by node_name (see below)
node_names: str, None
If not None, then selects only output returned by nodes in node_names list
for all output structures
require_dict: bool
If True, forces output for output_structure='df' to always be a dictionary, even if only one symbol is returned
Has no effect for other values of output_structure
Returns
----------
Formatted output: pandas DataFrame, dictionary or list
"""
if output_structure == 'list':
return _filter_returned_list_by_node(result, node_names)
elif output_structure == 'map':
return _filter_returned_map_by_node(result, node_names)
assert output_structure == 'df', f'Output structure should be one of: "df", "map", "list", ' \
f'instead "{output_structure}" was passed'
# "df" output structure implies that raw results came as a list
result_list = _filter_returned_list_by_node(result, node_names)
result_dict = _form_dict_from_list(result_list, node_names)
if len(result_dict) == 1 and not require_dict:
return list(result_dict.values())[0]
else:
return result_dict
def _preprocess_otp_query(query, query_params):
if isinstance(query, otp.query._outputs):
query = query['OUT']
if isinstance(query, otp.query):
if query.params:
if query_params:
raise ValueError("please specify parameters in query or in otp.run only")
query_params = query.params
query = query.path
return query, query_params
def _get_start_end(start, end, timezone, use_pyomd_timeval=True):
def support_nanoseconds(time):
if isinstance(time, (pd.Timestamp, otp.datetime)) and use_pyomd_timeval:
time = pyomd.timeval_t(pyomd.OT_time_nsec(time2nsectime(time, timezone)))
return time
# `isinstance(obj, datetime.date)` is not correct because
# isinstance(<datetime.datetime object>, datetime.date) = True
if type(start) is datetime.date:
start = datetime.datetime(start.year, start.month, start.day)
if type(end) is datetime.date:
end = datetime.datetime(end.year, end.month, end.day)
start = configuration.config.default_start_time if start is utils.adaptive else support_nanoseconds(start)
end = configuration.config.default_end_time if end is utils.adaptive else support_nanoseconds(end)
return start, end
def _process_output_structure(output_structure):
if not output_structure or output_structure == "df": # otq doesn't support df
output_structure = "df"
output_structure_for_otq = "symbol_result_list"
elif output_structure == "list":
output_structure_for_otq = "symbol_result_list"
elif output_structure == "map":
output_structure_for_otq = "symbol_result_map"
else:
raise ValueError("output_structure support only the following values: df, list and map")
return output_structure, output_structure_for_otq
class LogCallback(otp.CallbackBase):
def __init__(self, query_name):
print(f'Running query {query_name}')
super().__init__()
def process_symbol_name(self, symbol_name):
print(f'Processing symbol {symbol_name}')