Source code for onetick.py.core.eval_query

import inspect
import types

from onetick import py as otp
import onetick.py.types as ott
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source._symbol_param_column import _SymbolParamColumn
from onetick.py.core._internal._param_column import _ParamColumn


class _QueryEvalWrapper:
    def __init__(self, query, params=None, output_field=None, request_substitute_symbol=False):
        self.query = query
        self.params = params
        self.output_field = output_field
        self.request_substitute_symbol = request_substitute_symbol
        self._inner_source = None
        if isinstance(query, otp.Source):
            self._inner_source = query
            params = params or {}

            start, end = query._get_widest_time_range()
            if start and end and '_START_TIME' not in params and '_END_TIME' not in params:
                params['_START_TIME'] = start
                params['_END_TIME'] = end

            self.str_params = otp.query._params_to_str(params, with_expr=True)
        elif isinstance(query, otp.query):
            self.path = query.path
            self.str_params = query._params_to_str(params or {}, with_expr=True)
        else:
            raise ValueError("Wrong query parameter, it should be otp.query, otp.Query or function, "
                             "which returns them.")

    def to_eval_string(self,
                       tmp_otq=None,
                       operation_suffix='eval',
                       start=None, end=None, timezone=None,
                       file_suffix='_eval_query.otq',
                       query_name='main_eval_query'):
        """
        If self._inner_source is not None, then temporary query needs to be saved
        or added to tmp_otq storage (if passed)
        """
        if self._inner_source is not None:
            # if substitute symbol is requested, then we need to set an unbound symbol for query in eval
            # so that onetick can substitute it with the unbound symbol from the external query
            symbols = None
            if self.request_substitute_symbol:
                symbols = 'SYMBOL_TO_SUBSTITUTE'
            if tmp_otq is not None:
                tmp_otq.merge(self._inner_source._tmp_otq)
                query_name = self._inner_source._store_in_tmp_otq(tmp_otq, symbols=symbols,
                                                                  start=start, end=end,
                                                                  operation_suffix=operation_suffix)
                self.path = f'THIS::{query_name}'
            else:
                self.path = self._inner_source.to_otq(file_suffix=file_suffix,
                                                      symbols=symbols,
                                                      query_name=query_name,
                                                      start=start, end=end, timezone=timezone)
        eval_str = f'eval("{self.path}", "{self.str_params}")'
        if self.output_field:
            return f'{eval_str}.{self.output_field}'
        return eval_str

    def to_symbol_param(self):
        if self._inner_source:
            return self._inner_source.to_symbol_param()
        else:
            return _SymbolParamSource()

    def __str__(self):
        return self.to_eval_string()

    def copy(self, output_field=None):
        return _QueryEvalWrapper(query=self.query,
                                 params=self.params,
                                 output_field=output_field,
                                 request_substitute_symbol=self.request_substitute_symbol)

    def __getitem__(self, item):
        return self.copy(item)


[docs]def eval(query, symbol=None, start=None, end=None, **kwargs): """ Creates an object with ``query`` with saved parameters that can be used later. It can be used to: * return a list of symbols for which the main query will be executed (multistage queries). Note that in this case ``query`` must return ticks with column **SYMBOL_NAME**. * return some value dynamically to be used in other places in the main query. Note that in this case ``query`` must return only single tick. Note that only constant expressions are allowed in query parameters, they must not depend on ticks. Parameters ---------- query: :py:class:`onetick.py.Source`, :py:class:`onetick.py.query` or function source or query to evaluate. If function, then it must return :py:class:`onetick.py.Source` or :py:class:`onetick.py.query`. Parameter with name **symbol** and parameters specified in ``kwargs`` will be propagated to this function. Parameter from ``kwargs`` *must* be specified in function signature, but parameter **symbol** may be omitted if it is not used. symbol: :py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamSource` symbol parameter that will be used by ``query`` as a symbol. By default the symbol for evaluated query is inherited from the main query. If the function is used as a ``query``, parameter **symbol** can be defined in function signature and used in source operations. start: meta field (:py:class:`~onetick.py.core.source.MetaFields`) \ or symbol param (:py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamColumn`) start time with which ``query`` will be executed. By default the start time for evaluated query is inherited from the main query. end: meta field (:py:class:`~onetick.py.core.source.MetaFields`) \ or symbol param (:py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamColumn`) end time with which ``query`` will be executed. By default the end time for evaluated query is inherited from the main query. kwargs: str, int, meta fields (:py:class:`~onetick.py.core.source.MetaFields`) \ or symbol params (:py:class:`~onetick.py.core._source._symbol_param_column._SymbolParamColumn`) parameters that will be passed to ``query``. If the function is used as a ``query``, parameters specified in ``kwargs`` *must* be defined in function signature and can be used in source operations. See also -------- :ref:`Symbol Parameters Objects` :ref:`Symbol parameters` Examples -------- Use ``otp.eval`` to be passed as symbols when running the query: >>> def fsq(): ... symbols = otp.Ticks(SYMBOL_NAME=['AAPL', 'AAP']) ... return symbols >>> main = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', date=otp.dt(2022, 3, 1)) >>> main['SYMBOL_NAME'] = main.Symbol.name >>> main = otp.funcs.merge([main], symbols=otp.eval(fsq)) >>> otp.run(main) # OTdirective: snippet-name: eval with symbols; Time PRICE SIZE SYMBOL_NAME 0 2022-03-01 00:00:00.000 1.30 100 AAPL 1 2022-03-01 00:00:00.000 45.37 0 AAP 2 2022-03-01 00:00:00.001 1.40 10 AAPL 3 2022-03-01 00:00:00.001 45.41 0 AAP 4 2022-03-01 00:00:00.002 1.40 50 AAPL Use ``otp.eval`` as filter: >>> def get_filter(a, b): ... return otp.Tick(WHERE=f'X >= {a} and X < {b}', OTHER_FIELD='X') >>> data = otp.Ticks(X=[1, 2, 3]) >>> # note that in this case column WHERE must be specified, >>> # because evaluated query returns tick with more than one field >>> data, _ = data[otp.eval(get_filter, a=0, b=2)['WHERE']] >>> otp.run(data) Time X 0 2003-12-01 1 Use ``otp.eval`` with meta fields: >>> def filter_by_tt(tick_type): ... res = otp.Ticks({ ... 'TICK_TYPE': ['TRD', 'QTE'], ... 'WHERE': ['PRICE>=1.4', 'ASK_PRICE>=1.4'] ... }) ... res, _ = res[res['TICK_TYPE'] == tick_type] ... return res.drop(['TICK_TYPE']) >>> t = otp.DataSource('NYSE_TAQ::TRD') >>> # note that in this case column WHERE do not need to be specified, >>> # because evaluated query returns tick with only one field >>> t, _ = t[otp.eval(filter_by_tt, tick_type=t['_TICK_TYPE'])] >>> otp.run(t, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE SIZE 0 2022-03-01 00:00:00.001 1.4 10 1 2022-03-01 00:00:00.002 1.4 50 """ if isinstance(query, (types.FunctionType, types.LambdaType)) or inspect.ismethod(query): params = {} params_to_convert = {} sig = inspect.signature(query) for param in sig.parameters: if "symbol" == param: if isinstance(symbol, _SymbolParamSource): params['symbol'] = symbol else: params["symbol"] = _SymbolParamSource() else: value = kwargs[param] if isinstance(value, otp.sources._Column) and (value.name not in otp.Source.meta_fields and not isinstance(value, _SymbolParamColumn)): raise ValueError('Eval parameters can not depends on tick.') params_to_convert[param] = value params.update(prepare_params(**params_to_convert)) query = query(**params) params = {} request_substitute_symbol = False if symbol is not None: if not isinstance(symbol, _SymbolParamSource): raise ValueError("Symbol parameter has wrong type, are you sure you are using it from function passed " "to merge or join method?") params["SYMBOL_NAME"] = symbol.name request_substitute_symbol = True if start is not None: params["_START_TIME"] = start if end is not None: params["_END_TIME"] = end params.update(kwargs) return _QueryEvalWrapper(query, params, request_substitute_symbol=request_substitute_symbol)
def prepare_params(**kwargs): converted_params = {} for key, value in kwargs.items(): dtype = otp.types.get_object_type(value) if type(value) is str: if len(value) > otp.types.string.DEFAULT_LENGTH: dtype = otp.types.string[len(value)] param = _ParamColumn(key, dtype) converted_params[key] = param return converted_params