import inspect
import types
from onetick import py as otp
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source._symbol_param_column import _SymbolParamColumn
from onetick.py.core._internal._param_column import _ParamColumn
class _QueryEvalWrapper:
def __init__(self, query, params=None, output_field=None, request_substitute_symbol=False):
self.query = query
self.params = params
self.output_field = output_field
self.request_substitute_symbol = request_substitute_symbol
self._inner_source = None
if isinstance(query, otp.Source):
self._inner_source = query
self.str_params = otp.query._params_to_str(params or {}, with_expr=True)
elif isinstance(query, otp.query):
self.path = query.path
self.str_params = query._params_to_str(params or {}, with_expr=True)
else:
raise ValueError("Wrong query parameter, it should be otp.query, otp.Query or function, "
"which returns them.")
def to_eval_string(self, tmp_otq=None):
"""
If self._inner_source is not None, then temporary query needs to be saved
or added to tmp_otq storage (if passed)
"""
if self._inner_source is not None:
# if substitute symbol is requested, then we need to set an unbound symbol for query in eval
# so that onetick can substitute it with the unbound symbol from the external query
symbols = None
if self.request_substitute_symbol:
symbols = 'SYMBOL_TO_SUBSTITUTE'
if tmp_otq is not None:
tmp_otq.merge(self._inner_source._tmp_otq)
query_name = self._inner_source._store_in_tmp_otq(tmp_otq, symbols=symbols, operation_suffix="eval")
self.path = f'THIS::{query_name}'
else:
self.path = self._inner_source.to_otq(file_suffix="_eval_query.otq",
symbols=symbols,
query_name="main_eval_query")
eval_str = f'eval("{self.path}", "{self.str_params}")'
if self.output_field:
return f'{eval_str}.{self.output_field}'
return eval_str
def to_symbol_param(self):
if self._inner_source:
return self._inner_source.to_symbol_param()
else:
return _SymbolParamSource()
def __str__(self):
return self.to_eval_string()
def copy(self, output_field=None):
return _QueryEvalWrapper(query=self.query,
params=self.params,
output_field=output_field,
request_substitute_symbol=self.request_substitute_symbol)
def __getitem__(self, item):
return self.copy(item)
[docs]def eval(query, symbol=None, start=None, end=None, **kwargs):
""" Bind symbol parameter to otp.query or otp.Query and Source for later use
Parameters
----------
query: otp.Source, otp.query
symbol: symbol
start: meta field (otp.Source.meta_fields) or symbol param
end: meta field (otp.Source.meta_fields) or symbol param
kwargs: will be passed to `query`. Allowed params: strings, numbers, otp.Source meta fields, symbol params
Returns
-------
result: _QueryEvalWrapper
Query wrapper
Examples
--------
>>> def fsq(symbol):
... symbols = otp.Tick(SYMBOL_NAME=symbol.name + '1')
... return symbols
>>> def main(symbol):
... data = otp.DataSource(db='SOME_DB', tick_type='TT')
... data = otp.funcs.merge([data], symbols=otp.eval(fsq, symbol=symbol))
... return data
>>> otp.run(main, symbols='S') # OTdirective: snippet-name: eval with symbols;
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
"""
if isinstance(query, (types.FunctionType, types.LambdaType)) or inspect.ismethod(query):
params = {}
params_to_convert = {}
sig = inspect.signature(query)
for param in sig.parameters:
if "symbol" == param:
if isinstance(symbol, _SymbolParamSource):
params['symbol'] = symbol
else:
params["symbol"] = _SymbolParamSource()
else:
value = kwargs[param]
if isinstance(value, otp.sources._Column) and (value.name not in otp.Source.meta_fields and
not isinstance(value, _SymbolParamColumn)):
raise ValueError('Eval parameters can not depends on tick.')
params_to_convert[param] = value
params.update(prepare_params(**params_to_convert))
query = query(**params)
params = {}
request_substitute_symbol = False
if symbol is not None:
if not isinstance(symbol, _SymbolParamSource):
raise ValueError("Symbol parameter has wrong type, are you sure you are using it from function passed "
"to merge or join method?")
params["SYMBOL_NAME"] = symbol.name
request_substitute_symbol = True
if start is not None:
params["_START_TIME"] = start
if end is not None:
params["_END_TIME"] = end
params.update(kwargs)
return _QueryEvalWrapper(query, params, request_substitute_symbol=request_substitute_symbol)
def prepare_params(**kwargs):
converted_params = {}
for key, value in kwargs.items():
dtype = otp.types.get_object_type(value)
if type(value) is str:
if len(value) > otp.types.string.DEFAULT_LENGTH:
dtype = otp.types.string[len(value)]
param = _ParamColumn(key, dtype)
converted_params[key] = param
return converted_params