Source code for onetick.py.run

import inspect
import datetime
import warnings
from typing import Union, List, Optional, Dict, Any, Callable
from collections import defaultdict

import onetick.query as otq
import pandas as pd
import pyomd

from onetick import py as otp
from onetick.py import utils, configuration
from onetick.py.core.column_operations.base import _Operation
from onetick.py.types import time2nsectime
from onetick.py.core.source import _is_dict_required
from onetick.lib.instance import OneTickLib


[docs]def run(query: Union[Callable, Dict, otp.Source, otp.MultiOutputSource, otp.query, str, otq.EpBase, otq.GraphQuery, otq.ChainQuery, otq.Chainlet], *, symbols: Union[List[Union[str, otq.Symbol]], otp.Source, str, None] = None, start: Union[datetime.datetime, otp.datetime, pyomd.timeval_t, None] = utils.adaptive, end: Union[datetime.datetime, otp.datetime, pyomd.timeval_t, None] = utils.adaptive, start_time_expression: Optional[str] = None, end_time_expression: Optional[str] = None, timezone=utils.default, # type: ignore context=utils.default, # type: ignore username: Optional[str] = None, alternative_username: Optional[str] = None, batch_size: Optional[int] = utils.default, running: Optional[bool] = False, query_properties: Optional[pyomd.QueryProperties] = None, concurrency: Optional[int] = utils.default, apply_times_daily: Optional[int] = None, symbol_date: Union[datetime.datetime, int, None] = None, query_params: Optional[Dict[str, Any]] = None, time_as_nsec: bool = True, treat_byte_arrays_as_strings: bool = True, output_matrix_per_field: bool = False, output_structure: Optional[str] = None, return_utc_times: Optional[bool] = None, connection=None, callback=None, svg_path=None, use_connection_pool: bool = False, node_name: Union[str, List[str], None] = None, require_dict: bool = False): """ Executes a query and returns its result. Parameters ---------- query: otp.Source, otq.Ep, otq.Graph, otq.GraphQuery, otq.ChainQuery, str, otq.Chainlet Query to execute can be source, path of the query on a disk or otq graph or event processor. For running OTQ files, it represents the path (including filename) to the OTQ file to run a single query within the file. If more than one query is present, then the query to be run must be specified (that is, 'path_to_file'/otq_file.otq::query_to_run). symbols: str, list of str, list of otq.Symbol, otp.Source, pd.DataFrame, optional Symbol(s) to run the query for passed as a string, a list of strings, a pd.DataFrame with the ``SYMBOL_NAME`` column, or as a "symbols" query which results include the ``SYMBOL_NAME`` column. The start/end times for the symbols query will taken from the params below. See :ref:`symbols <Symbols>` for more details. start: datetime.datetime, otp.datetime, pyomd.timeval_t, optional The start time of the query. If datetime.datetime was pass timezone of object is ignored by Onetick, therefore we suggest using otp.datetime objects only as an argument. onetick.py uses otp.config.default_start_time as default value, if you don't want to specify start time, e.g. to use saved time of the query, then you should specify None value. See also timezone argument. end: datetime.datetime, otp.datetime, pyomd.timeval_t, optional The end time of the query. If datetime.datetime was pass timezone of object is ignored by Onetick, therefore we suggest using otp.datetime objects only as an argument. See also timezone argument. onetick.py uses otp.config.default_end_time as default value, if you don't want to specify end time, e.g. to use saved time of the query, then you should specify None value. start_time_expression: string, optional Start time onetick expression of the query. If specified, it will take precedence over start. Supported only if query is Source, Graph or Event Processor. end_time_expression: string, optional End time onetick expression of the query. If specified, it will take precedence over end. Supported only if query is Source, Graph or Event Processor. timezone: str, optional The timezone of start and end times, as well as of the output timestamps. It has higher priority then timezone of start and end parameters. If parameter is omitted timestamps of ticks will be formatted with the default timezone. context: str (defaults to otp.config.default_context), optional Allows specification of different instances of OneTick tick_servers to connect to username alternative_username batch_size: int number of symbols to run in one batch. By default, the value from otp.config.default_batch_size is used. running: bool, optional Indicates whether a query is CEP or not. Default is `False`. query_properties: (pyomd.QueryProperties), optional Query properties, such as ONE_TO_MANY_POLICY, ALLOW_GRAPH_REUSE, etc concurrency: int, optional The maximum number of CPU cores to use to process the query. By default, the value from otp.config.default_concurrency is used. apply_times_daily: bool Runs the query for every day in the start_time-end_time range, using the time components of start and end times symbol_date: The symbol date used to look up symbology mapping information in the reference database, expressed as datetime object or integer of YYYYMMDD format query_params: dict Parameters of the query. time_as_nsec: bool Outputs timestamps up to nanoseconds granularity (defaults to False: by default we output timestamps in microseconds granularity) treat_byte_arrays_as_strings: bool Outputs byte arrays as strings (defaults to True) output_matrix_per_field: bool Changes output format to list of matrices per field. output_structure: otp.Source.OutputStructure, optional Structure (type) of the result. Supported values are: - "df" (default) - the result is returned as pandas.DataFrame or dict[symbol: pandas.Dataframe] in case of multi symbols or evals. - "map" - the result is returned as SymbolNumpyResultMap. - "list" - the result is returned as list. return_utc_times: bool If True Return times in UTC timezone and in local timezone otherwise connection: (pyomd.Connection) The connection to be used for discovering nested otq files callback svg_path use_connection_pool node_name: str, List[str], optional Name of the output node to select result from. If query graph has several output nodes, you can specify the name of the node to choose result from. If node_name was specified, query should be presented by path on the disk and output_structure should be "df" require_dict: bool If set to True, result will be forced to be a dictionary even if it's returned for a single symbol Returns ------- result, list, dict, pandas.DataFrame, None result of the query """ _ = OneTickLib() if timezone is utils.default: timezone = configuration.config.tz if context is utils.default: context = configuration.config.context if concurrency is utils.default: concurrency = configuration.config.default_concurrency if batch_size is utils.default: batch_size = configuration.config.default_batch_size if query_properties is None: query_properties = pyomd.QueryProperties() str_qp = query_properties.convert_to_name_value_pairs_string().c_str() if not next(filter(lambda k: k == 'USE_FT', map(lambda pair: pair.split('=')[0], str_qp.split(','))), False): query_properties.set_property_value('USE_FT', otp.config.default_fault_tolerance) if isinstance(start, _Operation) and start_time_expression is None: start_time_expression = str(start) start = utils.adaptive if isinstance(end, _Operation) and end_time_expression is None: end_time_expression = str(end) end = utils.adaptive if inspect.ismethod(query) or inspect.isfunction(query): t_s = None if isinstance(symbols, otp.Source): t_s = symbols if isinstance(symbols, otp.query): t_s = otp.Query(symbols) if isinstance(symbols, str): t_s = otp.Tick(SYMBOL_NAME=symbols) if isinstance(symbols, list): t_s = otp.Ticks(SYMBOL_NAME=symbols) if isinstance(t_s, otp.Source): query = query(t_s.to_symbol_param()) # type: ignore query, query_params = _preprocess_otp_query(query, query_params) # If query is an otp.Source object, then it can deal with otp.datetime and pd.Timestamp types output_structure, output_structure_for_otq = _process_output_structure(output_structure) if symbol_date: # otq.run supports only strings and datetime.date symbol_date = otp.date(symbol_date).to_str() require_dict = require_dict or _is_dict_required(symbols) # converting symbols properly if isinstance(symbols, otp.Source): # check if SYMBOL_NAME is in schema, or if schema contains only one field if ('SYMBOL_NAME' not in symbols.columns(skip_meta_fields=True).keys()) and \ len(symbols.columns(skip_meta_fields=True)) != 1: warnings.warn('Using as a symbol list a source without "SYMBOL_NAME" field ' 'and with more than one field! This won\'t work unless the schema is incomplete') symbols = symbols._convert_symbol_to_string( symbol=symbols, tmp_otq=query._tmp_otq if isinstance(query, otp.Source) else None, start=start, end=end, timezone=timezone ) if isinstance(symbols, str): symbols = [symbols] if isinstance(symbols, pd.DataFrame): symbols = utils.get_symbol_list_from_df(symbols) if isinstance(query, dict): # we assume it's a dictionary of sources for the MultiOutputSource object query = otp.MultiOutputSource(query) if isinstance(query, otp.Source) or isinstance(query, otp.MultiOutputSource): start = None if start is utils.adaptive else start end = None if end is utils.adaptive else end start, end = _get_start_end(start, end, timezone, use_pyomd_timeval=False) # TODO: undstnd why nsec not supptd param_upd = query._prepare_for_execution(symbols=symbols, start=start, end=end, timezone=timezone, start_time_expression=start_time_expression, end_time_expression=end_time_expression, require_dict=require_dict, running_query_flag=running, node_name=node_name, has_output=None) for key, value in param_upd.items(): # here we want to make sure we substituted all params from the passed dict, # so we raise an error if an unknown parameter is passed in the dict if key == 'query': query = value # noqa: E701 elif key == 'symbols': symbols = value # noqa: E701 elif key == 'start': start = value # noqa: E701 elif key == 'end': end = value # noqa: E701 elif key == 'start_time_expression': start_time_expression = value # noqa: E701 elif key == 'end_time_expression': end_time_expression = value # noqa: E701 elif key == 'require_dict': require_dict = value # noqa: E701 elif key == 'node_name': node_name = value # noqa: E701 elif key == 'time_as_nsec': time_as_nsec = value # noqa: E701 else: raise ValueError('Unknown parameter returned!') # noqa: E701 elif isinstance(query, (otq.graph_components.EpBase, otq.Chainlet)): query = otq.Graph(query) start, end = _get_start_end(start, end, timezone) # if file name is not in single quotes, then put it in single quotes if isinstance(query, str): if not query[0] == "'" and not query[-1] == "'": query = f"'{query}'" result = otq.run(query, symbols=symbols, start=start, end=end, context=context, username=username, timezone=timezone, start_time_expression=start_time_expression, end_time_expression=end_time_expression, alternative_username=alternative_username, batch_size=batch_size, running_query_flag=running, query_properties=query_properties, max_concurrency=concurrency, apply_times_daily=apply_times_daily, symbol_date=symbol_date, query_params=query_params, time_as_nsec=time_as_nsec, treat_byte_arrays_as_strings=treat_byte_arrays_as_strings, output_matrix_per_field=output_matrix_per_field, output_structure=output_structure_for_otq, return_utc_times=return_utc_times, connection=connection, callback=callback, svg_path=svg_path, use_connection_pool=use_connection_pool) # node_names should be either a list of node names or None if isinstance(node_name, str): node_names = [node_name] else: node_names = node_name return _format_call_output(result, output_structure=output_structure, require_dict=require_dict, node_names=node_names)
def _filter_returned_map_by_node(result, node_names): """ Here, result has the following format: {symbol: {node_name: data}} We need to filter by correct node_name """ # TODO: implement filtering by node_name in a way # that no information from SymbolNumpyResultMap object is lost return result # if not node_name: # return result # # res = {} # for symbol, nodes_dict in result.items(): # res[symbol] = {} # for node, data in nodes_dict.items(): # if node == node_name: # res[symbol][node] = data # return res def _filter_returned_list_by_node(result, node_names): """ Here, result has the following format: [(symbol, data_1, data_2, node_name)] We need to filter by correct node_names """ if not node_names: return result node_found = False res = [] for symbol, data_1, data_2, node in result: if node in node_names: node_found = True res.append((symbol, data_1, data_2, node)) if not node_found: # TODO: Do we even want to raise it? raise Exception(f'No passed node name(s) were found in the results. Passed node names were: {node_names}') return res def _form_dict_from_list(data_list, node_names=None): """ Here, data_list has the following format: [(symbol, data_1, data_2, node_name)] We need to create the following result: either {symbol: pd.DataFrame(data_1)} if there is only one result per symbol or {symbol: [pd.DataFrame(data_1)]} if there are multiple results for symbol for a single node_name or {symbol: {node_name: pd.DataFrame(data_1)}} if there are single results for multiple node names for a symbol or {symbol: {node_name: [pd.DataFrame(data_1)]}} if there are multiple results for multiple node names for a symbol """ def reduce_list(lst): if len(lst) == 1: return lst[0][1] elif node_names and len(node_names) == 1: return list(map(lambda i: i[1], lst)) else: return lst def form_node_name_dict(lst): """ lst is a lit of (node, dataframe) """ d = defaultdict(list) for node, df in lst: d[node].append(df) for node in d.keys(): # noqa if len(d[node]) == 1: d[node] = d[node][0] if len(d) == 1: d = list(d.values())[0] else: # converting defaultdict to regular dict d = dict(d) return d def get_dataframe(data): return pd.DataFrame({col_name: col_value for col_name, col_value in data}) symbols_dict = defaultdict(list) for symbol, data, _, node in data_list: df = get_dataframe(data) list_item = (node, df) symbols_dict[symbol].append(list_item) for symbol, lst in symbols_dict.items(): symbols_dict[symbol] = form_node_name_dict(lst) return dict(symbols_dict) def _format_call_output(result, output_structure, node_names, require_dict): """Formats output of otq.run() according to passed parameters. See parameters' description for more information Parameters ---------- output_structure: ['df', 'list', 'map'] If 'df': forms pandas.DataFrame from the result. Returns a dictionary with symbols as keys if there's more than one symbol in returned data of if require_dict = True. Values of the returned dictionary, or returned value itself if no dictionary is formed, is either a list of tuples: (node_name, dataframe) if there's output for more than one node or a dataframe If 'list' or 'map': returns data as returned by otq.run(), possibly filtered by node_name (see below) node_names: str, None If not None, then selects only output returned by nodes in node_names list for all output structures require_dict: bool If True, forces output for output_structure='df' to always be a dictionary, even if only one symbol is returned Has no effect for other values of output_structure Returns ---------- Formatted output: pandas DataFrame, dictionary or list """ if output_structure == 'list': return _filter_returned_list_by_node(result, node_names) elif output_structure == 'map': return _filter_returned_map_by_node(result, node_names) assert output_structure == 'df', f'Output structure should be one of: "df", "map", "list", ' \ f'instead "{output_structure}" was passed' # "df" output structure implies that raw results came as a list result_list = _filter_returned_list_by_node(result, node_names) result_dict = _form_dict_from_list(result_list, node_names) if len(result_dict) == 1 and not require_dict: return list(result_dict.values())[0] else: return result_dict def _preprocess_otp_query(query, query_params): if isinstance(query, otp.query._outputs): query = query['OUT'] if isinstance(query, otp.query): if query.params: if query_params: raise ValueError("please specify parameters in query or in otp.run only") query_params = query.params query = query.path return query, query_params def _get_start_end(start, end, timezone, use_pyomd_timeval=True): def support_nanoseconds(time): if isinstance(time, (pd.Timestamp, otp.datetime)) and use_pyomd_timeval: time = pyomd.timeval_t(pyomd.OT_time_nsec(time2nsectime(time, timezone))) return time # `isinstance(obj, datetime.date)` is not correct because # isinstance(<datetime.datetime object>, datetime.date) = True if type(start) is datetime.date: start = datetime.datetime(start.year, start.month, start.day) if type(end) is datetime.date: end = datetime.datetime(end.year, end.month, end.day) start = configuration.config.default_start_time if start is utils.adaptive else support_nanoseconds(start) end = configuration.config.default_end_time if end is utils.adaptive else support_nanoseconds(end) return start, end def _process_output_structure(output_structure): if not output_structure or output_structure == "df": # otq doesn't support df output_structure = "df" output_structure_for_otq = "symbol_result_list" elif output_structure == "list": output_structure_for_otq = "symbol_result_list" elif output_structure == "map": output_structure_for_otq = "symbol_result_map" else: raise ValueError("output_structure support only the following values: df, list and map") return output_structure, output_structure_for_otq