importinspectimporttypesfromonetickimportpyasotpimportonetick.py.typesasottfromonetick.py.core._source._symbol_param_sourceimport_SymbolParamSourcefromonetick.py.core._source._symbol_param_columnimport_SymbolParamColumnfromonetick.py.core._internal._param_columnimport_ParamColumnclass_QueryEvalWrapper:def__init__(self,query,params=None,output_field=None,request_substitute_symbol=False):self.query=queryself.params=paramsself.output_field=output_fieldself.request_substitute_symbol=request_substitute_symbolself._inner_source=Noneifisinstance(query,otp.Source):self._inner_source=queryparams=paramsor{}start,end=query._get_widest_time_range()ifstartandendand'_START_TIME'notinparamsand'_END_TIME'notinparams:params['_START_TIME']=startparams['_END_TIME']=endself.str_params=otp.query._params_to_str(params,with_expr=True)elifisinstance(query,otp.query):self.path=query.pathself.str_params=query._params_to_str(paramsor{},with_expr=True)else:raiseValueError("Wrong query parameter, it should be otp.query, otp.Query or function, ""which returns them.")defto_eval_string(self,tmp_otq=None,operation_suffix='eval',start=None,end=None,timezone=None,file_suffix='_eval_query.otq',query_name='main_eval_query'):""" If self._inner_source is not None, then temporary query needs to be saved or added to tmp_otq storage (if passed) """ifself._inner_sourceisnotNone:# if substitute symbol is requested, then we need to set an unbound symbol for query in eval# so that onetick can substitute it with the unbound symbol from the external querysymbols=Noneifself.request_substitute_symbol:symbols='SYMBOL_TO_SUBSTITUTE'iftmp_otqisnotNone:tmp_otq.merge(self._inner_source._tmp_otq)query_name=self._inner_source._store_in_tmp_otq(tmp_otq,symbols=symbols,start=start,end=end,operation_suffix=operation_suffix)self.path=f'THIS::{query_name}'else:self.path=self._inner_source.to_otq(file_suffix=file_suffix,symbols=symbols,query_name=query_name,start=start,end=end,timezone=timezone)eval_str=f'eval("{self.path}", "{self.str_params}")'ifself.output_field:returnf'{eval_str}.{self.output_field}'returneval_strdefto_symbol_param(self):ifself._inner_source:returnself._inner_source.to_symbol_param()else:return_SymbolParamSource()def__str__(self):returnself.to_eval_string()defcopy(self,output_field=None):return_QueryEvalWrapper(query=self.query,params=self.params,output_field=output_field,request_substitute_symbol=self.request_substitute_symbol)def__getitem__(self,item):returnself.copy(item)
[docs]defeval(query,symbol=None,start=None,end=None,**kwargs):""" Creates an object with ``query`` with saved parameters that can be used later. It can be used to: * return a list of symbols for which the main query will be executed (multistage queries). Note that in this case ``query`` must return ticks with column **SYMBOL_NAME**. * return some value dynamically to be used in other places in the main query. Note that in this case ``query`` must return only single tick. Note that only constant expressions are allowed in query parameters, they must not depend on ticks. Parameters ---------- query: :py:class:`onetick.py.Source`, :py:class:`onetick.py.query` or function source or query to evaluate. If function, then it must return :py:class:`onetick.py.Source` or :py:class:`onetick.py.query`. Parameter with name **symbol** and parameters specified in ``kwargs`` will be propagated to this function. Parameter from ``kwargs`` *must* be specified in function signature, but parameter **symbol** may be omitted if it is not used. symbol: :py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamSource` symbol parameter that will be used by ``query`` as a symbol. By default the symbol for evaluated query is inherited from the main query. If the function is used as a ``query``, parameter **symbol** can be defined in function signature and used in source operations. start: meta field (:py:class:`~onetick.py.core.source.MetaFields`) \ or symbol param (:py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamColumn`) start time with which ``query`` will be executed. By default the start time for evaluated query is inherited from the main query. end: meta field (:py:class:`~onetick.py.core.source.MetaFields`) \ or symbol param (:py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamColumn`) end time with which ``query`` will be executed. By default the end time for evaluated query is inherited from the main query. kwargs: str, int, meta fields (:py:class:`~onetick.py.core.source.MetaFields`) \ or symbol params (:py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamColumn`) parameters that will be passed to ``query``. If the function is used as a ``query``, parameters specified in ``kwargs`` *must* be defined in function signature and can be used in source operations. See also -------- :ref:`Symbol Parameters Objects` :ref:`Symbol parameters` Examples -------- Use ``otp.eval`` to be passed as symbols when running the query: >>> def fsq(): ... symbols = otp.Ticks(SYMBOL_NAME=['AAPL', 'AAP']) ... return symbols >>> main = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', date=otp.dt(2022, 3, 1)) >>> main['SYMBOL_NAME'] = main.Symbol.name >>> main = otp.funcs.merge([main], symbols=otp.eval(fsq)) >>> otp.run(main) # OTdirective: snippet-name: eval with symbols; Time PRICE SIZE SYMBOL_NAME 0 2022-03-01 00:00:00.000 1.30 100 AAPL 1 2022-03-01 00:00:00.000 45.37 0 AAP 2 2022-03-01 00:00:00.001 1.40 10 AAPL 3 2022-03-01 00:00:00.001 45.41 0 AAP 4 2022-03-01 00:00:00.002 1.40 50 AAPL Use ``otp.eval`` as filter: >>> def get_filter(a, b): ... return otp.Tick(WHERE=f'X >= {a} and X < {b}', OTHER_FIELD='X') >>> data = otp.Ticks(X=[1, 2, 3]) >>> # note that in this case column WHERE must be specified, >>> # because evaluated query returns tick with more than one field >>> data, _ = data[otp.eval(get_filter, a=0, b=2)['WHERE']] >>> otp.run(data) Time X 0 2003-12-01 1 Use ``otp.eval`` with meta fields: >>> def filter_by_tt(tick_type): ... res = otp.Ticks({ ... 'TICK_TYPE': ['TRD', 'QTE'], ... 'WHERE': ['PRICE>=1.4', 'ASK_PRICE>=1.4'] ... }) ... res, _ = res[res['TICK_TYPE'] == tick_type] ... return res.drop(['TICK_TYPE']) >>> t = otp.DataSource('NYSE_TAQ::TRD') >>> # note that in this case column WHERE do not need to be specified, >>> # because evaluated query returns tick with only one field >>> t, _ = t[otp.eval(filter_by_tt, tick_type=t['_TICK_TYPE'])] >>> otp.run(t, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE SIZE 0 2022-03-01 00:00:00.001 1.4 10 1 2022-03-01 00:00:00.002 1.4 50 """ifisinstance(query,(types.FunctionType,types.LambdaType))orinspect.ismethod(query):params={}params_to_convert={}sig=inspect.signature(query)forparaminsig.parameters:if"symbol"==param:ifisinstance(symbol,_SymbolParamSource):params['symbol']=symbolelse:params["symbol"]=_SymbolParamSource()else:value=kwargs[param]ifisinstance(value,otp.sources._Column)and(value.namenotinotp.Source.meta_fieldsandnotisinstance(value,_SymbolParamColumn)):raiseValueError('Eval parameters can not depends on tick.')params_to_convert[param]=valueparams.update(prepare_params(**params_to_convert))query=query(**params)params={}request_substitute_symbol=FalseifsymbolisnotNone:ifnotisinstance(symbol,_SymbolParamSource):raiseValueError("Symbol parameter has wrong type, are you sure you are using it from function passed ""to merge or join method?")params["SYMBOL_NAME"]=symbol.namerequest_substitute_symbol=TrueifstartisnotNone:params["_START_TIME"]=startifendisnotNone:params["_END_TIME"]=endparams.update(kwargs)return_QueryEvalWrapper(query,params,request_substitute_symbol=request_substitute_symbol)