[docs]defrun(query:Union[Callable,Dict,otp.Source,otp.MultiOutputSource,otp.query,str,otq.EpBase,otq.GraphQuery,otq.ChainQuery,otq.Chainlet],*,symbols:Union[List[Union[str,otq.Symbol]],otp.Source,str,None]=None,start:Union[datetime.datetime,otp.datetime,pyomd.timeval_t,None]=utils.adaptive,end:Union[datetime.datetime,otp.datetime,pyomd.timeval_t,None]=utils.adaptive,start_time_expression:Optional[str]=None,end_time_expression:Optional[str]=None,timezone=utils.default,# type: ignorecontext=utils.default,# type: ignoreusername:Optional[str]=None,alternative_username:Optional[str]=None,password:Optional[str]=None,batch_size:Optional[int]=utils.default,running:Optional[bool]=False,query_properties:Optional[pyomd.QueryProperties]=None,concurrency:Optional[int]=utils.default,apply_times_daily:Optional[int]=None,symbol_date:Union[datetime.datetime,int,None]=None,query_params:Optional[Dict[str,Any]]=None,time_as_nsec:bool=True,treat_byte_arrays_as_strings:bool=True,output_matrix_per_field:bool=False,output_structure:Optional[str]=None,return_utc_times:Optional[bool]=None,connection=None,callback=None,svg_path=None,use_connection_pool:bool=False,node_name:Union[str,List[str],None]=None,require_dict:bool=False,max_expected_ticks_per_symbol:Optional[int]=None):""" Executes a query and returns its result. Parameters ---------- query: :py:class:`onetick.py.Source`, otq.Ep, otq.Graph, otq.GraphQuery, otq.ChainQuery, str, otq.Chainlet Query to execute can be source, path of the query on a disk or otq graph or event processor. For running OTQ files, it represents the path (including filename) to the OTQ file to run a single query within the file. If more than one query is present, then the query to be run must be specified (that is, 'path_to_file'/otq_file.otq::query_to_run). symbols: str, list of str, list of otq.Symbol, :py:class:`onetick.py.Source`, pd.DataFrame, optional Symbol(s) to run the query for passed as a string, a list of strings, a pd.DataFrame with the ``SYMBOL_NAME`` column, or as a "symbols" query which results include the ``SYMBOL_NAME`` column. The start/end times for the symbols query will taken from the params below. See :ref:`symbols <Symbols>` for more details. start: datetime.datetime, :py:class:`onetick.py.datetime`, pyomd.timeval_t, optional The start time of the query. If datetime.datetime was pass timezone of object is ignored by Onetick, therefore we suggest using otp.datetime objects only as an argument. onetick.py uses otp.config.default_start_time as default value, if you don't want to specify start time, e.g. to use saved time of the query, then you should specify None value. See also timezone argument. end: datetime.datetime, :py:class:`onetick.py.datetime`, pyomd.timeval_t, optional The end time of the query. If datetime.datetime was pass timezone of object is ignored by Onetick, therefore we suggest using otp.datetime objects only as an argument. See also timezone argument. onetick.py uses otp.config.default_end_time as default value, if you don't want to specify end time, e.g. to use saved time of the query, then you should specify None value. start_time_expression: str, optional Start time onetick expression of the query. If specified, it will take precedence over start. Supported only if query is Source, Graph or Event Processor. end_time_expression: str, optional End time onetick expression of the query. If specified, it will take precedence over end. Supported only if query is Source, Graph or Event Processor. timezone: str, optional The timezone of start and end times, as well as of the output timestamps. It has higher priority then timezone of start and end parameters. If parameter is omitted timestamps of ticks will be formatted with the default timezone. context: str (defaults to otp.config.default_context), optional Allows specification of different instances of OneTick tick_servers to connect to username The username to make the connection. By default the user which executed the process is used. alternative_username: str The username used for authentication. Needs to be set only when the tick server is configured to use password-based authentication. By default, ``otp.config.default_auth_username`` is used. password: str, optional The password used for authentication. Needs to be set only when the tick server is configured to use password-based authentication. Note: not supported and ignored on older OneTick versions. By default, ``otp.config.default_password`` is used. batch_size: int number of symbols to run in one batch. By default, the value from otp.config.default_batch_size is used. running: bool, optional Indicates whether a query is CEP or not. Default is `False`. query_properties: (pyomd.QueryProperties), optional Query properties, such as ONE_TO_MANY_POLICY, ALLOW_GRAPH_REUSE, etc concurrency: int, optional The maximum number of CPU cores to use to process the query. By default, the value from otp.config.default_concurrency is used. apply_times_daily: bool Runs the query for every day in the ``start``-``end`` time range, using the time components of ``start`` and ``end`` times. symbol_date: The symbol date used to look up symbology mapping information in the reference database, expressed as datetime object or integer of YYYYMMDD format query_params: dict Parameters of the query. time_as_nsec: bool Outputs timestamps up to nanoseconds granularity (defaults to False: by default we output timestamps in microseconds granularity) treat_byte_arrays_as_strings: bool Outputs byte arrays as strings (defaults to True) output_matrix_per_field: bool Changes output format to list of matrices per field. output_structure: otp.Source.OutputStructure, optional Structure (type) of the result. Supported values are: - "df" (default) - the result is returned as pandas.DataFrame or dict[symbol: pandas.Dataframe] in case of multi symbols or evals. - "map" - the result is returned as SymbolNumpyResultMap. - "list" - the result is returned as list. return_utc_times: bool If True Return times in UTC timezone and in local timezone otherwise connection: (pyomd.Connection) The connection to be used for discovering nested otq files callback: :py:class:`onetick.py.CallbackBase` Class with callback methods. If set, the output of the query should be controlled with callbacks and this function returns nothing. svg_path use_connection_pool node_name: str, List[str], optional Name of the output node to select result from. If query graph has several output nodes, you can specify the name of the node to choose result from. If node_name was specified, query should be presented by path on the disk and output_structure should be "df" require_dict: bool If set to True, result will be forced to be a dictionary even if it's returned for a single symbol max_expected_ticks_per_symbol: int Expected maximum number of ticks per symbol (used for performance optimizations). By default, ``otp.config.max_expected_ticks_per_symbol`` is used. Note ---- It is possible to log currently executed symbol. For that `otp.config.log_symbol` should be set to `True` (it can be set via `OTP_LOG_SYMBOL` env var). Note, in this case otp.run does not produce the output so it should be used only for debugging purposes. Returns ------- result, list, dict, :pandas:`pandas.DataFrame`, None result of the query Examples -------- Running :py:class:`onetick.py.Source` and setting start and end times >>> data = otp.Tick(A=1) >>> otp.run(data, start=otp.dt(2003, 12, 2), end=otp.dt(2003, 12, 4)) Time A 0 2003-12-02 1 Running otq.Ep and passing query parameters >>> ep = otq.TickGenerator(bucket_interval=0, fields='long A = $X').tick_type('TT') >>> otp.run(ep, symbols='LOCAL::', query_params={'X': 1}) Time A 0 2003-12-04 1 Running in callback mode >>> class Callback(otp.CallbackBase): ... def __init__(self): ... self.result = None ... def process_tick(self, tick, time): ... self.result = tick >>> data = otp.Tick(A=1) >>> callback = Callback() >>> otp.run(data, callback=callback) >>> callback.result {'A': 1} """_=OneTickLib()iftimezoneisutils.default:timezone=configuration.config.tzifcontextisutils.default:context=configuration.config.contextifconcurrencyisutils.default:concurrency=configuration.config.default_concurrencyifbatch_sizeisutils.default:batch_size=configuration.config.default_batch_sizeifquery_propertiesisNone:query_properties=pyomd.QueryProperties()str_qp=query_properties.convert_to_name_value_pairs_string().c_str()ifnotnext(filter(lambdak:k=='USE_FT',map(lambdapair:pair.split('=')[0],str_qp.split(','))),False):query_properties.set_property_value('USE_FT',otp.config.default_fault_tolerance)ifisinstance(start,_Operation)andstart_time_expressionisNone:start_time_expression=str(start)start=utils.adaptiveifisinstance(end,_Operation)andend_time_expressionisNone:end_time_expression=str(end)end=utils.adaptiveifinspect.ismethod(query)orinspect.isfunction(query):t_s=Noneifisinstance(symbols,otp.Source):t_s=symbolsifisinstance(symbols,otp.query):t_s=otp.Query(symbols)ifisinstance(symbols,str):t_s=otp.Tick(SYMBOL_NAME=symbols)ifisinstance(symbols,list):t_s=otp.Ticks(SYMBOL_NAME=symbols)ifisinstance(t_s,otp.Source):query=query(t_s.to_symbol_param())# type: ignorequery,query_params=_preprocess_otp_query(query,query_params)# If query is an otp.Source object, then it can deal with otp.datetime and pd.Timestamp typesifcallbackisNoneandotp.config.log_symbol:callback=LogCallback(query)output_mode=otq.QueryOutputMode.numpyifcallbackisnotNone:output_mode=otq.QueryOutputMode.callbackoutput_structure,output_structure_for_otq=_process_output_structure(output_structure)ifsymbol_date:# otq.run supports only strings and datetime.datesymbol_date=otp.date(symbol_date).to_str()require_dict=require_dictor_is_dict_required(symbols)# converting symbols properlyifisinstance(symbols,otp.Source):# check if SYMBOL_NAME is in schema, or if schema contains only one fieldif('SYMBOL_NAME'notinsymbols.columns(skip_meta_fields=True).keys())and \
len(symbols.columns(skip_meta_fields=True))!=1:warnings.warn('Using as a symbol list a source without "SYMBOL_NAME" field ''and with more than one field! This won\'t work unless the schema is incomplete')symbols=symbols._convert_symbol_to_string(symbol=symbols,tmp_otq=query._tmp_otqifisinstance(query,otp.Source)elseNone,start=start,end=end,timezone=timezone)ifisinstance(symbols,str):symbols=[symbols]ifisinstance(symbols,pd.DataFrame):symbols=utils.get_symbol_list_from_df(symbols)ifisinstance(query,dict):# we assume it's a dictionary of sources for the MultiOutputSource objectquery=otp.MultiOutputSource(query)ifisinstance(query,otp.Source)orisinstance(query,otp.MultiOutputSource):start=Noneifstartisutils.adaptiveelsestartend=Noneifendisutils.adaptiveelseendstart,end=_get_start_end(start,end,timezone,use_pyomd_timeval=False)# TODO: undstnd why nsec not supptdparam_upd=query._prepare_for_execution(symbols=symbols,start=start,end=end,timezone=timezone,start_time_expression=start_time_expression,end_time_expression=end_time_expression,require_dict=require_dict,running_query_flag=running,node_name=node_name,has_output=None)forkey,valueinparam_upd.items():# here we want to make sure we substituted all params from the passed dict,# so we raise an error if an unknown parameter is passed in the dictifkey=='query':query=value# noqa: E701elifkey=='symbols':symbols=value# noqa: E701elifkey=='start':start=value# noqa: E701elifkey=='end':end=value# noqa: E701elifkey=='start_time_expression':start_time_expression=value# noqa: E701elifkey=='end_time_expression':end_time_expression=value# noqa: E701elifkey=='require_dict':require_dict=value# noqa: E701elifkey=='node_name':node_name=value# noqa: E701elifkey=='time_as_nsec':time_as_nsec=value# noqa: E701else:raiseValueError('Unknown parameter returned!')# noqa: E701elifisinstance(query,(otq.graph_components.EpBase,otq.Chainlet)):query=otq.Graph(query)start,end=_get_start_end(start,end,timezone)# if file name is not in single quotes, then put it in single quotesifisinstance(query,str):ifnotquery[0]=="'"andnotquery[-1]=="'":# callback mode doesn't like single quotesifoutput_mode!=otq.QueryOutputMode.callback:query=f"'{query}'"# authenticationalternative_username=alternative_usernameorotp.config.default_auth_usernamepassword=passwordorotp.config.default_passwordkwargs={}ifpasswordisnotNone:version='20221111120000'ifotp.__build__<version:warnings.warn(f"otp.run parameter 'password' is not supported on OneTick version '{version}'")else:kwargs['password']=passwordmax_expected_ticks_per_symbol=max_expected_ticks_per_symbolorotp.config.max_expected_ticks_per_symbolversion='20220714120000'ifotp.__build__<version:warnings.warn("otp.run parameter 'max_expected_ticks_per_symbol'"f" is not supported on OneTick version '{version}'")else:kwargs['max_expected_ticks_per_symbol']=max_expected_ticks_per_symbolresult=otq.run(query,symbols=symbols,start=start,end=end,context=context,username=username,timezone=timezone,start_time_expression=start_time_expression,end_time_expression=end_time_expression,alternative_username=alternative_username,batch_size=batch_size,running_query_flag=running,query_properties=query_properties,max_concurrency=concurrency,apply_times_daily=apply_times_daily,symbol_date=symbol_date,query_params=query_params,time_as_nsec=time_as_nsec,treat_byte_arrays_as_strings=treat_byte_arrays_as_strings,output_mode=output_mode,output_matrix_per_field=output_matrix_per_field,output_structure=output_structure_for_otq,return_utc_times=return_utc_times,connection=connection,callback=callback,svg_path=svg_path,use_connection_pool=use_connection_pool,**kwargs)ifoutput_mode==otq.QueryOutputMode.callback:returnresult# node_names should be either a list of node names or Noneifisinstance(node_name,str):node_names=[node_name]else:node_names=node_namereturn_format_call_output(result,output_structure=output_structure,require_dict=require_dict,node_names=node_names)
def_filter_returned_map_by_node(result,node_names):""" Here, result has the following format: {symbol: {node_name: data}} We need to filter by correct node_name """# TODO: implement filtering by node_name in a way# that no information from SymbolNumpyResultMap object is lostreturnresult# if not node_name:# return result## res = {}# for symbol, nodes_dict in result.items():# res[symbol] = {}# for node, data in nodes_dict.items():# if node == node_name:# res[symbol][node] = data# return resdef_filter_returned_list_by_node(result,node_names):""" Here, result has the following format: [(symbol, data_1, data_2, node_name)] We need to filter by correct node_names """ifnotnode_names:returnresultnode_found=Falseres=[]empty_result=Trueforsymbol,data_1,data_2,nodeinresult:ifdata_1:empty_result=Falseifnodeinnode_names:node_found=Trueres.append((symbol,data_1,data_2,node))ifnotempty_resultandnotnode_found:# TODO: Do we even want to raise it?raiseException(f'No passed node name(s) were found in the results. Passed node names were: {node_names}')returnresdef_form_dict_from_list(data_list,node_names=None):""" Here, data_list has the following format: [(symbol, data_1, data_2, node_name)] We need to create the following result: either {symbol: pd.DataFrame(data_1)} if there is only one result per symbol or {symbol: [pd.DataFrame(data_1)]} if there are multiple results for symbol for a single node_name or {symbol: {node_name: pd.DataFrame(data_1)}} if there are single results for multiple node names for a symbol or {symbol: {node_name: [pd.DataFrame(data_1)]}} if there are multiple results for multiple node names for a symbol """defreduce_list(lst):iflen(lst)==1:returnlst[0][1]elifnode_namesandlen(node_names)==1:returnlist(map(lambdai:i[1],lst))else:returnlstdefform_node_name_dict(lst):""" lst is a lit of (node, dataframe) """d=defaultdict(list)fornode,dfinlst:d[node].append(df)fornodeind.keys():# noqaiflen(d[node])==1:d[node]=d[node][0]iflen(d)==1:d=list(d.values())[0]else:# converting defaultdict to regular dictd=dict(d)returnddefget_dataframe(data):returnpd.DataFrame({col_name:col_valueforcol_name,col_valueindata})symbols_dict=defaultdict(list)forsymbol,data,_,nodeindata_list:df=get_dataframe(data)list_item=(node,df)symbols_dict[symbol].append(list_item)forsymbol,lstinsymbols_dict.items():symbols_dict[symbol]=form_node_name_dict(lst)returndict(symbols_dict)def_format_call_output(result,output_structure,node_names,require_dict):"""Formats output of otq.run() according to passed parameters. See parameters' description for more information Parameters ---------- output_structure: ['df', 'list', 'map'] If 'df': forms pandas.DataFrame from the result. Returns a dictionary with symbols as keys if there's more than one symbol in returned data of if require_dict = True. Values of the returned dictionary, or returned value itself if no dictionary is formed, is either a list of tuples: (node_name, dataframe) if there's output for more than one node or a dataframe If 'list' or 'map': returns data as returned by otq.run(), possibly filtered by node_name (see below) node_names: str, None If not None, then selects only output returned by nodes in node_names list for all output structures require_dict: bool If True, forces output for output_structure='df' to always be a dictionary, even if only one symbol is returned Has no effect for other values of output_structure Returns ---------- Formatted output: pandas DataFrame, dictionary or list """ifoutput_structure=='list':return_filter_returned_list_by_node(result,node_names)elifoutput_structure=='map':return_filter_returned_map_by_node(result,node_names)assertoutput_structure=='df',f'Output structure should be one of: "df", "map", "list", ' \
f'instead "{output_structure}" was passed'# "df" output structure implies that raw results came as a listresult_list=_filter_returned_list_by_node(result,node_names)result_dict=_form_dict_from_list(result_list,node_names)iflen(result_dict)==1andnotrequire_dict:returnlist(result_dict.values())[0]else:returnresult_dictdef_preprocess_otp_query(query,query_params):ifisinstance(query,otp.query._outputs):query=query['OUT']ifisinstance(query,otp.query):ifquery.params:ifquery_params:raiseValueError("please specify parameters in query or in otp.run only")query_params=query.paramsquery=query.pathreturnquery,query_paramsdef_get_start_end(start,end,timezone,use_pyomd_timeval=True):defsupport_nanoseconds(time):ifisinstance(time,(pd.Timestamp,otp.datetime))anduse_pyomd_timeval:time=pyomd.timeval_t(pyomd.OT_time_nsec(time2nsectime(time,timezone)))returntime# `isinstance(obj, datetime.date)` is not correct because# isinstance(<datetime.datetime object>, datetime.date) = Trueiftype(start)isdatetime.date:start=datetime.datetime(start.year,start.month,start.day)iftype(end)isdatetime.date:end=datetime.datetime(end.year,end.month,end.day)start=configuration.config.default_start_timeifstartisutils.adaptiveelsesupport_nanoseconds(start)end=configuration.config.default_end_timeifendisutils.adaptiveelsesupport_nanoseconds(end)returnstart,enddef_process_output_structure(output_structure):ifnotoutput_structureoroutput_structure=="df":# otq doesn't support dfoutput_structure="df"output_structure_for_otq="symbol_result_list"elifoutput_structure=="list":output_structure_for_otq="symbol_result_list"elifoutput_structure=="map":output_structure_for_otq="symbol_result_map"else:raiseValueError("output_structure support only the following values: df, list and map")returnoutput_structure,output_structure_for_otqclassLogCallback(otp.CallbackBase):def__init__(self,query_name):print(f'Running query {query_name}')super().__init__()defprocess_symbol_name(self,symbol_name):print(f'Processing symbol {symbol_name}')