Source code for onetick.py.core.eval_query

import inspect
import types

from onetick import py as otp
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source._symbol_param_column import _SymbolParamColumn
from onetick.py.core._internal._param_column import _ParamColumn


class _QueryEvalWrapper:
    def __init__(self, query, params=None, output_field=None, request_substitute_symbol=False):
        self.query = query
        self.params = params
        self.output_field = output_field
        self.request_substitute_symbol = request_substitute_symbol
        self._inner_source = None
        if isinstance(query, otp.Source):
            self._inner_source = query
            self.str_params = otp.query._params_to_str(params or {}, with_expr=True)
        elif isinstance(query, otp.query):
            self.path = query.path
            self.str_params = query._params_to_str(params or {}, with_expr=True)
        else:
            raise ValueError("Wrong query parameter, it should be otp.query, otp.Query or function, "
                             "which returns them.")

    def to_eval_string(self, tmp_otq=None):
        """
        If self._inner_source is not None, then temporary query needs to be saved
        or added to tmp_otq storage (if passed)
        """
        if self._inner_source is not None:
            # if substitute symbol is requested, then we need to set an unbound symbol for query in eval
            # so that onetick can substitute it with the unbound symbol from the external query
            symbols = None
            if self.request_substitute_symbol:
                symbols = 'SYMBOL_TO_SUBSTITUTE'
            if tmp_otq is not None:
                tmp_otq.merge(self._inner_source._tmp_otq)
                query_name = self._inner_source._store_in_tmp_otq(tmp_otq, symbols=symbols, operation_suffix="eval")
                self.path = f'THIS::{query_name}'
            else:
                self.path = self._inner_source.to_otq(file_suffix="_eval_query.otq",
                                                      symbols=symbols,
                                                      query_name="main_eval_query")
        eval_str = f'eval("{self.path}", "{self.str_params}")'
        if self.output_field:
            return f'{eval_str}.{self.output_field}'
        return eval_str

    def to_symbol_param(self):
        if self._inner_source:
            return self._inner_source.to_symbol_param()
        else:
            return _SymbolParamSource()

    def __str__(self):
        return self.to_eval_string()

    def copy(self, output_field=None):
        return _QueryEvalWrapper(query=self.query,
                                 params=self.params,
                                 output_field=output_field,
                                 request_substitute_symbol=self.request_substitute_symbol)

    def __getitem__(self, item):
        return self.copy(item)


[docs]def eval(query, symbol=None, start=None, end=None, **kwargs): """ Bind symbol parameter to otp.query or otp.Query and Source for later use Parameters ---------- query: otp.Source, otp.query symbol: symbol start: meta field (otp.Source.meta_fields) or symbol param end: meta field (otp.Source.meta_fields) or symbol param kwargs: will be passed to `query`. Allowed params: strings, numbers, otp.Source meta fields, symbol params Returns ------- result: _QueryEvalWrapper Query wrapper Examples -------- >>> def fsq(symbol): ... symbols = otp.Tick(SYMBOL_NAME=symbol.name + '1') ... return symbols >>> def main(symbol): ... data = otp.DataSource(db='SOME_DB', tick_type='TT') ... data = otp.funcs.merge([data], symbols=otp.eval(fsq, symbol=symbol)) ... return data >>> otp.run(main, symbols='S') # OTdirective: snippet-name: eval with symbols; Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 """ if isinstance(query, (types.FunctionType, types.LambdaType)) or inspect.ismethod(query): params = {} params_to_convert = {} sig = inspect.signature(query) for param in sig.parameters: if "symbol" == param: if isinstance(symbol, _SymbolParamSource): params['symbol'] = symbol else: params["symbol"] = _SymbolParamSource() else: value = kwargs[param] if isinstance(value, otp.sources._Column) and (value.name not in otp.Source.meta_fields and not isinstance(value, _SymbolParamColumn)): raise ValueError('Eval parameters can not depends on tick.') params_to_convert[param] = value params.update(prepare_params(**params_to_convert)) query = query(**params) params = {} request_substitute_symbol = False if symbol is not None: if not isinstance(symbol, _SymbolParamSource): raise ValueError("Symbol parameter has wrong type, are you sure you are using it from function passed " "to merge or join method?") params["SYMBOL_NAME"] = symbol.name request_substitute_symbol = True if start is not None: params["_START_TIME"] = start if end is not None: params["_END_TIME"] = end params.update(kwargs) return _QueryEvalWrapper(query, params, request_substitute_symbol=request_substitute_symbol)
def prepare_params(**kwargs): converted_params = {} for key, value in kwargs.items(): dtype = otp.types.get_object_type(value) if type(value) is str: if len(value) > otp.types.string.DEFAULT_LENGTH: dtype = otp.types.string[len(value)] param = _ParamColumn(key, dtype) converted_params[key] = param return converted_params