import datetime as dt
import inspect
import operator
import os
import sys
import warnings
import io
import math
from functools import partial
from typing import Optional, Union, Type, Iterable
import onetick.py as otp
import onetick.query as otq
import pandas as pd
import onetick.py.core._source
import onetick.py.functions
import onetick.py.db._inspection
from onetick.py.core._internal._param_column import _ParamColumn
from onetick.py.core._source._symbol_param_column import _SymbolParamColumn
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.core.column import _Column
from onetick.py.core.eval_query import _QueryEvalWrapper
from onetick.py.core.source import Source, _Source  # _Source for backward compatibility
from . import types as ott
from . import utils, configuration
from .core import _csv_inspector, query_inspector
from .core.column_operations._methods.methods import is_arithmetical
from .core.column_operations.base import _Operation
from .db.db import DB
from .db._inspection import DB as inspect_DB
from .aggregations._docs import docstring, _param_doc
from .aggregations.order_book import (
    OB_SNAPSHOT_DOC_PARAMS, OB_SNAPSHOT_WIDE_DOC_PARAMS, OB_SNAPSHOT_FLAT_DOC_PARAMS
)
_QUERY_PARAM_SPECIAL_CHARACTERS = "=,"
AdaptiveTickType = Union[str, Type[utils.adaptive]]
def update_node_tick_type(node: "Source", tick_type: AdaptiveTickType, db: Optional[str] = None):
    """Update node tick_type according to db name and tick_type.
    Don't change tick type for adaptive tick type.
    Parameters
    ----------
    node: Source
        node to set tick_type on
    tick_type: AdaptiveTickType
        string tick type or :py:class:`onetick.py.adaptive`
    db: Optional[str]
        optional db name
    """
    # do not change tick type for adaptive `tick_type`
    if not isinstance(tick_type, type) and tick_type is not utils.adaptive:
        if db:
            node.tick_type(db + "::" + tick_type)
        else:
            node.tick_type(tick_type)
class Tick(Source):
    """
    Generate single tick object
    Parameters
    ----------
    offset: int, default=0
        tick timestamp offset from query start time in `offset_part`
    offset_part: one of [nanosecond, millisecond, second, minute, hour, day, dayofyear, weekday, week, month, quarter, year], default=millisecond   #noqa
        offset union
    symbol:
        data symbol
    db:
        data database
    start:
        start time for tick
    end:
        end time for tick
    tick_type: AdaptiveTickType
        Special tick_type `TICK_GENERATOR` will be used by default. You can use
        :py:class:`onetick.py.adaptive` for the value if you want to use sink node tick type
        instead of defining your own.
    bucket_time:
        ???
    bucket_interval:
        ???
    kwargs
    See also
    --------
    **TICK_GENERATOR** OneTick event processor
    """
    def __init__(
        self,
        offset=0,
        offset_part='millisecond',
        time: ott.datetime = None,
        timezone_for_time=None,
        symbol=utils.adaptive_to_default,
        db=utils.adaptive_to_default,
        start=utils.adaptive,
        end=utils.adaptive,
        tick_type: Optional[AdaptiveTickType] = None,
        bucket_time: str = "start",
        bucket_interval: int = 0,
        **kwargs,
    ):
        if self._try_default_constructor(**kwargs):
            return
        if len(kwargs) == 0:
            raise ValueError("It is not allowed to have a tick without fields")
        if time is not None and offset != 0:
            raise ValueError("It's not allowed to set parameter 'datetime' and set non-zero offset at the same time")
        bucket_time = self._get_bucket_time(bucket_time)
        if tick_type is None:
            tick_type = "TICK_GENERATOR"
        columns = {}
        for key, value in kwargs.items():
            # the way to skip a field
            if value is None:
                continue
            if inspect.isclass(value):
                raise TypeError(f"Tick constructor expects values but not types, {value}")
            else:
                value_type = ott.get_object_type(value)
            if value_type is str:
                if isinstance(value, _Column) or is_arithmetical(value):
                    if value.dtype is not str:
                        value_type = value.dtype
                elif len(value) > ott.string.DEFAULT_LENGTH:
                    value_type = ott.string[len(value)]
            if value_type is bool:
                value_type = float
            if issubclass(value_type, (ott.datetime, ott.date, dt.datetime, dt.date, pd.Timestamp)):
                value_type = ott.nsectime
            columns[key] = value_type
        super().__init__(
            _symbols=symbol,
            _start=start,
            _end=end,
            _base_ep_func=lambda: self.base_ep(db=db,
                                               tick_type=tick_type,
                                               offset=offset,
                                               offset_part=offset_part,
                                               time=time,
                                               timezone_for_time=timezone_for_time,
                                               columns=columns,
                                               bucket_time=bucket_time,
                                               bucket_interval=bucket_interval,
                                               **kwargs),
            **columns,
        )
    def base_ep(self,
                db=utils.adaptive_to_default,
                tick_type="TICK_GENERATOR",
                offset=0,
                offset_part='millisecond',
                time=None,
                timezone_for_time=None,
                columns=None,
                bucket_time="start",
                bucket_interval=0,
                **kwargs):
        if columns is None:
            columns = {}
        if db is utils.adaptive_to_default:
            # if default database is not set, tick type will be set without it
            # and symbols will have to be specified in otp.run
            db = configuration.config.get('default_db')
        params = ",".join(
            ott.type2str(columns[key]) + " " + str(key) + "=" + ott.value2str(value)
            for key, value in kwargs.items()
            if value is not None
        )
        src = Source(
            otq.TickGenerator(
                bucket_interval=bucket_interval,
                bucket_time=bucket_time,
                fields=params
            ),
            **columns
        )
        update_node_tick_type(src, tick_type, db)
        # TIMESTAMP += offset will add redundant nodes to sort the timestamps.
        # No sorting needed for a single tick.
        if offset:
            src.sink(otq.UpdateField(field="TIMESTAMP",
                                     value=f"dateadd('{offset_part}', {offset}, TIMESTAMP, _TIMEZONE)"))
        elif time:
            src.sink(otq.UpdateField(field="TIMESTAMP",
                                     value=ott.datetime2expr(time, timezone_naive=timezone_for_time)))
        return src
    @staticmethod
    def _get_bucket_time(bucket_time):
        if bucket_time == "BUCKET_START":
            warnings.warn("BUCKET_START value is deprecated. Please, use 'start' instead", DeprecationWarning)
        elif bucket_time == "BUCKET_END":
            warnings.warn("BUCKET_END value is deprecated. Please, use 'end' instead", DeprecationWarning)
        elif bucket_time == "start":
            bucket_time = "BUCKET_START"
        elif bucket_time == "end":
            bucket_time = "BUCKET_END"
        else:
            raise ValueError(f"Only 'start' and 'end' values supported as bucket time, but you've passed {bucket_time}")
        return bucket_time
[docs]def Ticks(data=None,
          symbol=utils.adaptive_to_default,
          db=utils.adaptive_to_default,
          start=utils.adaptive,
          end=utils.adaptive,
          tick_type: Optional[AdaptiveTickType] = None,
          timezone_for_time=None,
          **inplace_data):
    """
    Data source that generates ticks.
    Ticks are placed with the 1 millisecond offset from
    each other starting from the start of the query interval.
    It has ability to change `distance` between ticks using the
    special reserved field name ``offset``, that specify time offset
    from a previous tick.
    Parameters
    ----------
    data: dict, list or pandas.DataFrame, optional
        Ticks values
        * ``dict`` -- <field_name>: <values>
        * ``list`` -- [[<field_names>], [<first_tick_values>], ..., [<n_tick_values>]]
        * :pandas:`DataFrame <pandas.DataFrame>` -- DataFrame with ``Time`` column
        * ``None`` -- ``inplace_data`` will be used
    symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`
        Symbol(s) from which data should be taken.
    db: str
        Database to use for tick generation
    start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \
                    :py:class:`onetick.py.adaptive`
        Timestamp for data generation
    tick_type: str
        tick type for data generation
    timezone_for_time: str
        timezone for data generation
    **inplace_data: dict
        <field_name>: list(<field_values>)
    See also
    --------
    **TICK_GENERATOR** OneTick event processor
    Examples
    --------
    Pass data in ``dict``
    >>> d = otp.Ticks({'A': [1, 2, 3], 'B': [4, 5, 6]})
    >>> otp.run(d)
                         Time  A  B
    0 2003-12-01 00:00:00.000  1  4
    1 2003-12-01 00:00:00.001  2  5
    2 2003-12-01 00:00:00.002  3  6
    Pass ``inplace_data``
    >>> d = otp.Ticks(A=[1, 2, 3], B=[4, 5, 6])
    >>> otp.run(d)
                         Time  A  B
    0 2003-12-01 00:00:00.000  1  4
    1 2003-12-01 00:00:00.001  2  5
    2 2003-12-01 00:00:00.002  3  6
    Pass data in ``list``
    >>> d = otp.Ticks([['A', 'B'],
    ...                [1, 4],
    ...                [2, 5],
    ...                [3, 6]])
    >>> otp.run(d)
                         Time  A  B
    0 2003-12-01 00:00:00.000  1  4
    1 2003-12-01 00:00:00.001  2  5
    2 2003-12-01 00:00:00.002  3  6
    Using the ``offset`` example
    >>> data = otp.Ticks(X=[1, 2, 3], offset=[0, otp.Nano(1), 1])
    >>> otp.run(data)
                               Time  X
    0 2003-12-01 00:00:00.000000000  1
    1 2003-12-01 00:00:00.000000001  2
    2 2003-12-01 00:00:00.001000000  3
    Using pandas.DataFrame
    >>> start_datetime = datetime(2023, 1, 1, 12)
    >>> time_array = [start_datetime + otp.Hour(1) + otp.Nano(1)]
    >>> a_array = [start_datetime - otp.Day(15) - otp.Nano(7)]
    >>> df = pd.DataFrame({'Time': time_array,'A': a_array})
    >>> data = otp.Ticks(df)
    >>> otp.run(data, start=start_datetime, end=start_datetime + otp.Day(1))
                               Time                             A
    0 2023-01-01 13:00:00.000000001 2022-12-17 11:59:59.999999993
    """
    if tick_type is None:
        tick_type = "TICK_GENERATOR"
    if db is utils.adaptive_to_default:
        db = configuration.config.get('default_db')
    if isinstance(data, pd.DataFrame):
        if 'Time' not in data.columns:
            raise ValueError('Field `Time` is required for constructing an `otp.Source` from `pandas.DataFrame`')
        data.rename(columns={"Time": "time"}, inplace=True)
        data = data.to_dict('list')
    if data and len(inplace_data) != 0:
        raise ValueError("Data can be passed only using either the `data` parameter "
                         "or inplace through the key-value args")
    if isinstance(data, list):
        reform = {}
        for inx, key in enumerate(data[0]):
            reform[key] = [sub_list[inx] for sub_list in data[1:]]
        data = reform
    if data is None:
        if inplace_data:
            data = inplace_data
        else:
            raise ValueError("You don't specify any date to create ticks from. "
                             "Please, use otp.Empty for creating empty data source")
    else:
        data = data.copy()
    value_len = -1
    for key, value in data.items():
        if value_len == -1:
            value_len = len(value)
        else:
            if value_len != len(value):
                # TODO: write test to cover that case
                raise ValueError(
                    f"It is not allowed to have different columns of different lengths, "
                    f"some of columns have {value_len} length, but column '{key}', as instance, has {len(value)}"
                )
    use_absolute_time = False
    if "offset" in data:
        if "time" in data:
            raise ValueError("You cannot specify offset and time at the same time")
    else:
        if "time" in data:
            use_absolute_time = True
        else:
            data["offset"] = list(range(value_len))
    if not use_absolute_time:
        offset_values = []
        offset_parts = []
        for ofv in data['offset']:
            if isinstance(ofv, ott.offsets.Tick):
                offset_values.append(ofv.n)
                offset_parts.append(str(ofv.datepart)[1:-1])
            else:
                offset_values.append(ofv)
                offset_parts.append('millisecond')
        data['offset'] = offset_values
        data['offset_part'] = offset_parts
    if value_len == 1:
        columns = {key: value[0] for key, value in data.items()}
        return Tick(db=db, symbol=symbol, tick_type=tick_type, start=start, end=end,
                    timezone_for_time=timezone_for_time, **columns)
    else:
        # select only columns that do not contain None there to support
        # heterogeneous data
        not_none_columns = []
        for key in data.keys():
            data[key] = [float(elem) if isinstance(elem, bool) else elem for elem in data[key]]
        for key, value in data.items():
            add = True
            for v in value:
                # we need it, because can't use _Column instances in if-clauses
                if isinstance(v, _Column):
                    continue
                if v is None:
                    add = False
                    break
            if add:
                not_none_columns.append(key)
        # if a field depends on a symbol parameter, it cannot be csv'd (it's dynamic)
        # likewise for otq parameters
        # if there's a better way to check whether a value is constant,
        # will be glad to hear about it
        is_outside_data_dependent = False
        for key, value in data.items():
            for v in value:
                str_rep = str(v)
                if ("_SYMBOL_NAME" in str_rep) or ("_SYMBOL_PARAM" in str_rep) or ("$" in str_rep):
                    is_outside_data_dependent = True
                    break
        # infinity() and (on windows) nan() cannot be natively read from a csv
        has_special_values = False
        for key, value in data.items():
            for v in value:
                if isinstance(v, ott._inf) or \
                    
(isinstance(v, ott._nan) or isinstance(v, float) and math.isnan(v)) \
                        
and sys.platform.startswith("win"):
                    has_special_values = True
                    break
        if (len(not_none_columns) == len(data)) and (not is_outside_data_dependent) and (not has_special_values):
            # Data is homogenous; CSV backing can be used
            return _DataCSV(data, value_len, db=db, symbol=symbol, tick_type=tick_type, start=start, end=end,
                            timezone_for_time=timezone_for_time, use_absolute_time=use_absolute_time)
        else:
            # Fallback is a merge of individual ticks
            ticks = []
            for inx in range(value_len):
                columns = {key: value[inx] for key, value in data.items()}
                ticks.append(Tick(db=db, symbol=symbol, tick_type=tick_type, start=start, end=end,
                                  timezone_for_time=timezone_for_time, **columns))
            return onetick.py.functions.merge(ticks, align_schema=not_none_columns) 
class _DataCSV(Source):
    def __init__(
        self,
        data=None,
        length=None,
        db=utils.adaptive_to_default,
        symbol=utils.adaptive_to_default,
        tick_type=None,
        start=utils.adaptive,
        end=utils.adaptive,
        use_absolute_time=False,
        timezone_for_time=None,
        **kwargs,
    ):
        if self._try_default_constructor(**kwargs):
            return
        if data is None or length is None:
            raise ValueError("'data' and 'length' parameters can't be None")
        if db is utils.adaptive_to_default:
            db = configuration.config.get('default_db')
        def datetime_to_expr(v):
            if ott.is_time_type(v):
                return ott.datetime2expr(v, timezone_naive=timezone_for_time)
            if isinstance(v, ott.nsectime):
                # TODO: change to ott.value2str after PY-441
                return f'NSECTIME({v})'
            if isinstance(v, ott.msectime):
                return ott.value2str(v)
            raise ValueError(f"Can't convert value {v} to datetime expression")
        if use_absolute_time:
            # converting values of "time" column to onetick expressions
            converted_times = []
            for d in data["time"]:
                converted_times.append(datetime_to_expr(d))
            data["time"] = converted_times
        def csv_rep(value):
            if issubclass(type(value), str):
                return '"' + value.replace("\\", "\\\\").replace('"', '\\"') + '"'
            else:
                return str(value)
        def get_type_of_column(key):
            def get_type_of_value(value):
                t = ott.get_object_type(value)
                if ott.is_time_type(t):
                    return ott.nsectime
                elif t is str:
                    if len(value) <= ott.string.DEFAULT_LENGTH:
                        return str
                    else:
                        return ott.string[len(value)]
                else:
                    return t
            types = [get_type_of_value(v) for v in data[key]]
            res, _ = utils.get_type_that_includes(types)
            return res
        columns = {key: get_type_of_column(key) for key in data}
        expression_columns = []
        header_columns = {}
        for key in list(columns):
            header_columns[key] = columns[key]
            # converting values of datetime columns to onetick expressions
            if columns[key] is ott.nsectime:
                data[key] = [datetime_to_expr(v) for v in data[key]]
                header_columns[key] = get_type_of_column(key)
                expression_columns.append(key)
        transposed_data = [[csv_rep(value[i]) for key, value in data.items()] for i in range(length)]
        text_header = ",".join(f"{ott.type2str(v)} {k}" for k, v in header_columns.items())
        text_data = "\n".join([",".join(data_row) for data_row in transposed_data])
        if use_absolute_time:
            del columns["time"]
        else:
            del columns["offset"]
            del columns["offset_part"]
        super().__init__(
            _symbols=symbol,
            _start=start,
            _end=end,
            _base_ep_func=lambda: self.base_ep(columns=columns,
                                               db=db,
                                               tick_type=tick_type,
                                               use_absolute_time=use_absolute_time,
                                               text_header=text_header,
                                               text_data=text_data,
                                               expression_columns=expression_columns),
            **columns,
        )
    def base_ep(self, columns, db, tick_type, use_absolute_time, text_header, text_data, expression_columns=None):
        node = Source(
            otq.CsvFileListing(
                discard_timestamp_column=True,
                time_assignment="_START_TIME",
                field_delimiters="','",
                quote_chars='"""',
                handle_escaped_chars=True,
                file_contents=text_data,
                first_line_is_title=False,
                fields=text_header,
            ),
            **columns,
        )
        update_node_tick_type(node, tick_type, db)
        if use_absolute_time:
            # don't trust UpdateField
            node.sink(otq.AddField(field='____TMP____', value="EVAL_EXPRESSION(time, 'datetime')"))
            node.sink(otq.UpdateField(field="TIMESTAMP", value="____TMP____"))
            node.sink(otq.Passthrough(fields="time,____TMP____", drop_fields="True"))
            node.sink(otq.OrderBy(order_by="TIMESTAMP ASC"))
        else:
            node.sink(otq.OrderBy(order_by="offset ASC"))
            node.sink(otq.UpdateField(field="TIMESTAMP", value="dateadd(offset_part, offset, TIMESTAMP, _TIMEZONE)"))
            node.sink(otq.Passthrough(fields="offset,offset_part", drop_fields="True"))
            node.sink(otq.OrderBy(order_by="TIMESTAMP ASC"))
        for column in expression_columns or []:
            # don't trust UpdateField
            node.sink(otq.RenameFields(f'{column}=____TMP____'))
            node.sink(otq.AddField(field=column, value="EVAL_EXPRESSION(____TMP____, 'datetime')"))
            node.sink(otq.Passthrough(fields='____TMP____', drop_fields=True))
        node.sink(otq.Table(keep_input_fields=True,
                            fields=', '.join(f'nsectime {column}' for column in expression_columns)))
        return node
def TTicks(data):
    """
    .. deprecated:: 1.3.101
    Transposed Ticks format.
    Parameters
    ----------
    data: list
        list of list, where the first sublist is the header, and other are values
    """
    warnings.warn("The nice and helpful function `TTicks` is going to be deprecated. "
                  "You could use the `Ticks` to pass data in the same format there",
                  DeprecationWarning)
    dt = {}
    for inx, key in enumerate(data[0]):
        dt[key] = [sub_list[inx] for sub_list in data[1:]]
    return Ticks(dt)
[docs]class Empty(Source):
    """
    Empty data source
    Parameters
    ----------
    db: str
        Name of the database from which to take schema.
    symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`
        Symbol(s) from which data should be taken.
    tick_type: str,
        Name of the tick_type from which to take schema.
    start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \
                    :py:class:`onetick.py.adaptive`
        Time interval from which the data should be taken.
    schema: schema to use in case db and/or tick_type are not set
    Examples
    --------
    We can define schema:
    >>> data = otp.Empty(A=str, B=int)
    >>> data.to_df()
    Empty DataFrame
    Columns: []
    Index: []
    >>> data.columns()
    {'A': <class 'str'>, 'B': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>,
    '_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>,
    '_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>}
    Or we can get schema from the database:
    >>> data = otp.Empty(db='SOME_DB', tick_type='TT')
    >>> data.columns()
    {'X': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>,
    '_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>,
    '_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>}
    """
    def __init__(
        self,
        db=utils.adaptive_to_default,
        symbol=utils.adaptive_to_default,
        tick_type=None,
        start=utils.adaptive,
        end=utils.adaptive,
        **schema,
    ):
        if self._try_default_constructor(**schema):
            return
        columns = {}
        if tick_type and db != configuration.config.get('default_db') and db is not utils.adaptive_to_default:
            try:
                db_obj = onetick.py.db._inspection.DB(db)
                params = {'tick_type': tick_type}
                if end is not utils.adaptive:
                    params['end'] = end
                columns = db_obj.schema(**params)
            except Exception:
                pass  # do not raise an exception if no data found, because it is empty _source and does not matter
        else:
            columns = schema
        super().__init__(
            _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **columns
        )
    def base_ep(self, db):
        if db is utils.adaptive_to_default:
            db = configuration.config.get('default_db')
        src = Source(otq.TickGenerator(fields="long ___NOTHING___=0"))
        if db is None:
            src.tick_type('TICK_GENERATOR')
        else:
            src.tick_type(db + "::TICK_GENERATOR")
        return src 
[docs]class CSV(Source):
    """
    Construct source based on CSV file.
    There are several steps determining column types.
    1. Initially, all column treated as ``str``.
    2. If column name in CSV title have format ``type COLUMNNAME``,
       it will change type from ``str`` to specified type.
    3. All column type are determined automatically from its data.
    4. You could override determined types in ``dtype`` argument explicitly.
    5. ``converters`` argument is applied after ``dtype`` and could also change column type.
    Parameters
    ----------
    filepath_or_buffer: str, os.PathLike, FileBuffer
        Path to CSV file or :class:`file buffer <FileBuffer>`
    timestamp_name: str, default "Time"
        Name of TIMESTAMP column used for ticks. Used only if it is exists in CSV columns, otherwise ignored.
    first_line_is_title: bool
        Use first line of CSV file as a source for column names and types.
        If CSV file is started with # symbol, this parameter **must** be ``True``.
        - If ``True``, column names are inferred from the first line of the file,
          it is not allowed to have empty name for any column.
        - If ``False``, first line is processed as data, column names will be COLUMN_1, ..., COLUMN_N.
          You could specify column names in ``names`` argument.
    names: list, optional
        List of column names to use, or None.
        Length must be equal to columns number in file.
        Duplicates in this list are not allowed.
    dtype: dict, optional
        Data type for columns, as dict of pais {column_name: type}.
        Will convert column type from ``str`` to specified type, before applying converters.
    converters: dict, optional
        Dict of functions for converting values in certain columns. Keys are column names.
        Function must be valid callable with ``onetick.py`` syntax, example::
            converters={
                "time_number": lambda c: c.apply(otp.nsectime),
                "stock": lambda c: c.str.lower(),
            }
        Converters applied *after* ``dtype`` conversion.
    order_ticks: bool, optional
        If ``True`` and ``timestamp_name`` column are used, then source will order tick by time.
        Note, that if ``False`` and ticks are not ordered in sequence, then OneTick will raise Exception in runtime.
    drop_index: bool, optional
        if ``True`` and 'Index' column is in the csv file then this column will be removed.
    change_date_to: datetime, date, optional
        change date from a timestamp column to a specific date. Default is None, means not changing timestamp column.
    See also
    --------
    **CSV_FILE_LISTING** OneTick event processor
    Examples
    --------
    Simple CSV file reading
    >>> data = otp.CSV(os.path.join(csv_path, "data.csv"))
    >>> otp.run(data)
                         Time          time_number      px side
    0 2003-12-01 00:00:00.000  1656690986953602304   30.89  Buy
    1 2003-12-01 00:00:00.001  1656667706281508352  682.88  Buy
    Read CSV file and get timestamp for ticks from specific field.
    You need to specify query start/end interval including all ticks.
    >>> data = otp.CSV(os.path.join(csv_path, "data.csv"),
    ...                timestamp_name="time_number",
    ...                converters={"time_number": lambda c: c.apply(otp.nsectime)},
    ...                start=otp.dt(2010, 8, 1),
    ...                end=otp.dt(2022, 9, 2))
    >>> otp.run(data)
                               Time      px side
    0 2022-07-01 11:56:26.953602304   30.89  Buy
    1 2022-07-01 05:28:26.281508352  682.88  Buy
    """
    def __init__(self,
                 filepath_or_buffer=None,
                 timestamp_name: Union[str, None] = "Time",
                 first_line_is_title: bool = True,
                 names: Union[list, None] = None,
                 dtype: dict = {},
                 converters: dict = {},
                 order_ticks=False,
                 drop_index=True,
                 change_date_to=None,
                 **kwargs):
        if self._try_default_constructor(**kwargs):
            return
        obj_to_inspect = filepath_or_buffer
        if isinstance(filepath_or_buffer, utils.FileBuffer):
            obj_to_inspect = io.StringIO(filepath_or_buffer.get())
        if isinstance(obj_to_inspect, str) and not os.path.exists(obj_to_inspect):
            # if not found, probably, CSV file is located in OneTick CSV_FILE_PATH, check it for inspect_by_pandas()
            csv_paths = otp.utils.get_config_param(os.environ["ONE_TICK_CONFIG"], "CSV_FILE_PATH", default="")
            if csv_paths:
                for csv_path in csv_paths.split(","):
                    csv_path = os.path.join(csv_path, obj_to_inspect)
                    if os.path.exists(csv_path):
                        obj_to_inspect = csv_path
                        break
        columns, default_types, forced_title = _csv_inspector.inspect_by_pandas(
            obj_to_inspect,
            first_line_is_title,
            names)
        if "TIMESTAMP" in columns:
            raise ValueError(
                "It is not allowed to have 'TIMESTAMP' columns, because it is reserved name in OneTick"
            )
        if "Time" in columns and timestamp_name != "Time":
            raise ValueError(
                "It is not allowed to have 'Time' column not used as timestamp field."
            )
        ep_fields = ",".join(
            f'{ott.type2str(dtype)} {column}' if issubclass(dtype, otp.string) else column
            for column, dtype in columns.items()
        )
        to_drop = []
        if "TICK_STATUS" in columns:
            del columns["TICK_STATUS"]
            to_drop.append("TICK_STATUS")
        if "Index" in columns and drop_index:
            del columns["Index"]
            to_drop.append("Index")
        # determine start and end dates
        start = kwargs.get("start", utils.adaptive)
        end = kwargs.get("end", utils.adaptive)
        for t in dtype:
            if t not in columns:
                raise ValueError(f"dtype '{t}' not found in columns list")
            columns[t] = dtype[t]
        has_time = False
        if timestamp_name in columns:
            has_time = True
            # remove to resolve exception in Source.__init__
            if timestamp_name == "Time":
                del columns["Time"]
        # redefine start/end time for change_date_to
        if change_date_to:
            start = dt.datetime(change_date_to.year, change_date_to.month, change_date_to.day)
            end = ott.next_day(start)
        if isinstance(filepath_or_buffer, utils.FileBuffer):
            symbols = 'DUMMY'
        else:
            # str, because there might passed an os.PathLike object
            symbols = str(filepath_or_buffer)
        super().__init__(
            _symbols=symbols,
            _start=start,
            _end=end,
            _base_ep_func=partial(self.base_ep,
                                  filepath_or_buffer,
                                  columns,
                                  forced_title,
                                  default_types,
                                  has_time,
                                  to_drop,
                                  timestamp_name,
                                  change_date_to,
                                  order_ticks,
                                  start,
                                  end,
                                  first_line_is_title,
                                  ep_fields,
                                  converters,),
            **columns,
        )
        # fake run converters to set proper schema
        if converters:
            for column, converter in converters.items():
                self.schema[column] = converter(self[column]).dtype
        if has_time and timestamp_name in self.schema:
            if self.schema[timestamp_name] not in [ott.nsectime, ott.msectime]:
                raise ValueError(f"CSV converter for {timestamp_name} is converting to {self.schema[timestamp_name]}"
                                 "type, but expected resulted type is ott.msectime or ott.nsectime")
        # remove timestamp_name column, if we use it as TIMESTAMP source
        if has_time and timestamp_name != "Time":
            del self[timestamp_name]
    def base_ep(self,
                filepath_or_buffer,
                columns,
                forced_title,
                default_types,
                has_time,
                to_drop,
                timestamp_name,
                change_date_to,
                order_ticks,
                start,
                end,
                first_line_is_title,
                ep_fields,
                converters={},):
        # initialize Source and set schema to columns.
        file_contents = ''
        if isinstance(filepath_or_buffer, utils.FileBuffer):
            file_contents = filepath_or_buffer.get()
        csv = Source(
            otq.CsvFileListing(
                field_delimiters="','",
                time_assignment="_START_TIME",
                # we don't use EP's first_line_is_title, because EP raise errror on empty column name,
                # and we explicitly define name for such columns in FIELDS arg.
                # but if first line started with # (forced_title=True), then this param ignored :(
                first_line_is_title=False,
                fields=ep_fields,
                file_contents=file_contents
            ),
            **columns,
        )
        if first_line_is_title and not forced_title:
            # remove first line with titles for columns.
            csv.sink(otq.DeclareStateVariables(variables="long __TICK_INDEX=0"))
            csv.sink(otq.PerTickScript("STATE::__TICK_INDEX = STATE::__TICK_INDEX + 1;"))
            csv.sink(otq.WhereClause(discard_on_match=False, where="STATE::__TICK_INDEX > 1"))
        # set tick type to ANY
        csv.tick_type("LOCAL::ANY")
        # check whether need to update types, because if column type is not specified in header
        # then by default column has string type in OneTick
        update_columns = {}
        for name, dtype in columns.items():
            if not issubclass(dtype, str) and name not in default_types:
                update_columns[name] = dtype
        for name, dtype in update_columns.items():
            if dtype is int:
                csv.sink(otq.UpdateField(field=name, value="atol(" + name + ")"))
            elif dtype is float:
                csv.sink(otq.UpdateField(field=name, value="atof(" + name + ")"))
            elif dtype is ott.msectime:
                csv.sink(otq.UpdateField(field=name, value='"1970/01/01 00:00:00.000"', where=name + '=""'))
                csv.sink(otq.UpdateField(field=name, value=f'parse_time("%Y/%m/%d %H:%M:%S.%q",{name},_TIMEZONE)'))
            elif dtype is ott.nsectime:
                csv.sink(otq.UpdateField(field=name, value='"1970/1/1 00:00:00.000"', where=name + '=""'))
                csv.sink(otq.UpdateField(field=name, value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J",{name},_TIMEZONE)'))
            else:
                raise TypeError(f"Unsupported type '{dtype}'")
        # run converters
        if converters:
            for column, converter in converters.items():
                if csv[column].dtype is not otp.nsectime and converter(csv[column]).dtype is otp.nsectime:
                    # workaround for resolve bug on column type changing:
                    # https://onemarketdata.atlassian.net/browse/PY-416
                    csv[f'_T_{name}'] = converter(csv[column])
                    del csv[column]
                    csv[column] = csv[f'_T_{name}']
                    del csv[f'_T_{name}']
                else:
                    csv[column] = converter(csv[column])
        if has_time:
            # if timestamp_name column is defined in the csv, then apply tick time adjustment
            if timestamp_name in converters:
                # we assume that if timestamp_name field in converters,
                # then it is already converted to otp.dt
                csv.sink(
                    otq.UpdateField(
                        field="TIMESTAMP",
                        value=timestamp_name,
                        allow_unordered_output_times=True,
                    )
                )
            else:
                if change_date_to:
                    change_date_to = change_date_to.strftime("%Y/%m/%d")
                    csv.sink(otq.UpdateField(field="Time", value=f'"{change_date_to}" + substr({timestamp_name}, 10)'))
                # by default we parse timestamp_name into TIMESTAMP field
                # from typical/default Time format from OneTick dump
                csv.sink(
                    otq.UpdateField(
                        field="TIMESTAMP",
                        value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J", {timestamp_name}, _TIMEZONE)',
                        allow_unordered_output_times=True,
                    )
                )
            # drop source timestamp_name field in favor of new TIMESTAMP field
            to_drop.append(timestamp_name)
        else:
            # default time for ticks are increaing from 0
            csv.sink(otq.DeclareStateVariables(variables="long __TIMESTAMP_INC__ = 0"))
            csv.sink(otq.UpdateField(field="TIMESTAMP", value="TIMESTAMP + STATE::__TIMESTAMP_INC__"))
            csv.sink(otq.UpdateField(field="STATE::__TIMESTAMP_INC__", value="STATE::__TIMESTAMP_INC__ +1"))
        if order_ticks:
            csv.sort('TIMESTAMP', inplace=True)
        if to_drop:
            csv.sink(otq.Passthrough(fields=",".join(to_drop), drop_fields="True"))
        return csv 
class Trades(Source):
    """
    Trade source object.
    add 'PRICE' and 'SIZE' fields to schema
    """
    def __init__(self, db=utils.adaptive_to_default, symbol=utils.adaptive,
                 date=None,
                 start=utils.adaptive, end=utils.adaptive, **kwargs):
        if db is utils.adaptive_to_default:
            db = configuration.config.default_db
        if date:
            start, end = date.start, date.end
        super().__init__(
            _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
        )
        self.schema['PRICE'] = float
        self.schema['SIZE'] = int
    def base_ep(self, db):
        db = str(db)
        src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
        src.tick_type(db + "::TRD")
        return src
class Quotes(Source):
    def __init__(self, db=utils.adaptive_to_default, symbol=utils.adaptive,
                 start=utils.adaptive, end=utils.adaptive, **kwargs):
        if db is utils.adaptive_to_default:
            db = configuration.config.default_db
        super().__init__(
            _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
        )
        self.schema['ASK_PRICE'] = float
        self.schema['BID_PRICE'] = float
        self.schema['ASK_SIZE'] = int
        self.schema['BID_SIZE'] = int
    def base_ep(self, db):
        db = str(db)
        src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
        src.tick_type(db + "::QTE")
        return src
class NBBO(Source):
    def __init__(self, db="TAQ_NBBO", symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, **kwargs):
        super().__init__(
            _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
        )
        self.schema['ASK_PRICE'] = float
        self.schema['BID_PRICE'] = float
        self.schema['ASK_SIZE'] = int
        self.schema['BID_SIZE'] = int
    def base_ep(self, db):
        db = str(db)
        src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
        src.tick_type(db + "::NBBO")
        return src
[docs]class Query(Source):
    def __init__(
        self,
        query_object=None,
        out_pin=utils.adaptive,
        symbol=utils.adaptive,
        start=utils.adaptive,
        end=utils.adaptive,
        params=None,
        **kwargs,
    ):
        """
        Create data source object from otq file or query object
        Parameters
        ----------
        query_object: path or :class:`query`
            query to use as a data source
        out_pin: str
             query output pin name
        symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`
            Symbol(s) from which data should be taken.
        start, end : :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>` or utils.adaptive
            Time interval from which the data should be taken.
        params: dict
            params to pass to query.
            Only applicable to string ``query_object``
        """
        if self._try_default_constructor(**kwargs):
            return
        if params is None:
            params = {}
        # Ignore because of the "Only @runtime_checkable protocols can be used with instance and class checks"
        if isinstance(query_object, (str, os.PathLike)):  # type: ignore
            query_object = query(str(query_object), **params)
        elif isinstance(query_object, query):
            if len(params) > 0:
                raise ValueError("Cannot pass both params and a query() (not str) query_object parameter")
        else:
            raise ValueError("query_object parameter has to be either a str (path to the query) or a query object")
        if symbol == utils.adaptive:
            if not query_object.graph_info.has_unbound_sources:
                symbol = None
        super().__init__(
            _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(query_object, out_pin), **kwargs
        )
    def base_ep(self, query, out_pin):
        nested = otq.NestedOtq(query.path, query.str_params)
        graph = query.graph_info
        if out_pin is utils.adaptive:
            if len(graph.nested_outputs) == 1:
                return Source(nested[graph.nested_outputs[0].NESTED_OUTPUT])
            elif len(graph.nested_outputs) > 1:
                raise Exception(
                    f'Query "{query.query_name}" has multiple outputs, but you have not '
                    "specified which one should be used. You could specify it"
                    ' using "out_pin" parameter of the Query constructor.'
                )
            else:
                # no output
                return Source(nested, _has_output=False)
        else:
            existed_out_pins = set(map(operator.attrgetter("NESTED_OUTPUT"), graph.nested_outputs))
            if out_pin not in existed_out_pins:
                raise Exception(
                    f'Query "{query.query_name}" does not have the "{out_pin}" output, there are only following '
                    f"output pins exist: {','.join(existed_out_pins)}"
                )
            return Source(nested[out_pin]) 
[docs]class query:
    """
    Constructs a query object with a certain path.
    Keyword arguments specify query parameters.
    You also can pass an instance of ``otp.query.config`` class as the second positional argument to
    specify a query.
    Parameters
    ----------
    path : str
        path to an .otq file.
        If path is relative, then it's assumed that file is located in one of the directories
        specified in OneTick ``OTQ_FILE_PATH`` configuration variable.
        If there are more than one query in the file, then its name should be specified
        in the format ``<path>::<query-name>``.
        Also prefix ``remote://<database-name>::`` can be used to specify if query is located
        on the remote server.
    config:
        optional ``otp.query.config`` object.
    params:
        parameters for the query.
    Raises
    ------
    ValueError, TypeError
    Examples
    --------
    >>> otp.query('/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14)  # doctest: +SKIP
    >>> otp.query('remote://DATABASE::/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14)  # doctest: +SKIP
    """
[docs]    class config:
        """
        The config allows to specify different query options.
        """
        special_values = {"input"}
        def __init__(self, output_columns=None):
            """
            Parameters
            ----------
            output_columns : str, list, dict, optional
                The parameter defines what the outputs columns are.
                Default value is ``None`` that means no output fields after applying query
                for every output pin.
                The ``input`` value means that output columns are the same as inputs for
                every output pin
                A list of tuples allows to define output columns with their types;
                for example [('x', int), ('y', float), ...]. Applicable for every output
                pin.
                A dict allows to specify output columns for every output pin.
            Raises
            ------
            TypeError, ValueError
            """
            if output_columns is not None:
                if isinstance(output_columns, list):
                    self.validate_columns(output_columns)
                elif isinstance(output_columns, dict):
                    for pin, columns in output_columns.items():
                        if not isinstance(pin, str):
                            raise TypeError(f"Name of pin '{type(pin)}' is of non-str type '%s'")
                        else:
                            self.validate_columns(columns)
                elif not isinstance(output_columns, str):
                    raise TypeError(f'"output_columns" does not support value of the "{type(output_columns)}" type')
                if isinstance(output_columns, str):
                    if output_columns not in self.special_values:
                        raise ValueError(f'Config does not support "{output_columns}" value')
            self.output_columns = output_columns
[docs]        def validate_list_item(self, item):
            if isinstance(item, str):
                if item not in self.special_values:
                    raise ValueError(f"Value {item} is not supported.")
            else:
                if not isinstance(item, (tuple, list)) or (len(item) != 2) or not isinstance(item[0], str):
                    raise TypeError("Value %s is not a name-type tuple.") 
[docs]        def validate_columns(self, columns):
            if isinstance(columns, str):
                if columns not in self.special_values:
                    raise ValueError(f"A pin has invalid output columns definition: '{columns}'")
            elif isinstance(columns, list):
                if columns.count("input") > 1:
                    raise ValueError(f"More than one 'input' value in {columns}")
                for item in columns:
                    self.validate_list_item(item)
            else:
                raise TypeError(f"A pin's columns definition is of unsupported type '{type(columns)}'") 
[docs]        def get_output_columns_for_pin(self, out_pin_name):
            if isinstance(self.output_columns, dict):
                if out_pin_name not in self.output_columns:
                    raise ValueError(f"Pin {out_pin_name} wasn't declared in the config")
                else:
                    return self.output_columns[out_pin_name]
            else:
                return self.output_columns 
[docs]        def apply(self, out_pin_name, src):
            """
            Applying specified logic on a ceration object. Used internally in the functions.apply_query
            """
            columns_descriptor = self.get_output_columns_for_pin(out_pin_name)
            if columns_descriptor is None:
                # drop columns by default, because we don't know
                # how an external query changes data schema
                src.drop_columns()
            elif columns_descriptor == "input":
                pass
            else:
                if "input" not in columns_descriptor:
                    src.drop_columns()
                for item in columns_descriptor:
                    if item != "input":
                        src[item]  
    def __init__(self, path, *config, **params):
        path = str(path)
        if path.startswith('remote://'):
            self.path = path
            remote, path = path.split('::', maxsplit=1)
        else:
            self.path = f"remote://{configuration.config.get('default_db', 'LOCAL')}::" + path
        self.query_path, self.query_name = utils.query_to_path_and_name(path)
        # if query_path does not exist, then we try
        # to resolve it with OTQ_PATH assuming that
        # a relative path is passed
        if not os.path.exists(self.query_path):
            otq_path = utils.get_config_param(os.environ["ONE_TICK_CONFIG"], "OTQ_FILE_PATH", "")
            self.query_path = utils.abspath_to_query_by_otq_path(otq_path, self.query_path)
        if self.query_name is None:
            # it seems that query name was not passed, then try to find it
            queries = query_inspector.get_queries(self.query_path)
            if len(queries) > 1:
                raise Exception(f"{self.query_path} has more than one query, "
                                f"but you have not specified which one to use.")
            self.query_name = queries[0]
        # prepare parameters
        self._str_params = None
        self.params = params
        self.update_params()
        # prepare configs
        if len(config) > 1:
            raise ValueError(f"It is allowed to specify only one config object, but passed {len(config)}")
        elif len(config) == 1:
            if not isinstance(config[0], self.config):
                raise TypeError(
                    f'It is expected to see config of the "query.config" type, but got "{type(config[0])}"'
                )
            self.config = config[0]
        else:
            self.config = self.config()
        self.graph_info = query_inspector.get_query_info(self.query_path, self.query_name)
    def __call__(self, *ticks, **pins):
        for key, value in pins.items():
            if not isinstance(value, Source):
                raise ValueError(f'Input "{key}" pin does not support "{type(value)}" type')
        if len(pins) == 0 and len(ticks) == 1:
            if len(self.graph_info.nested_inputs) != 1:
                raise Exception(
                    f'It is expected the query "{self.query_path}" to have one input, but it'
                    f" has {len(self.graph_info.nested_inputs)}"
                )
            pins[self.graph_info.nested_inputs[0].NESTED_INPUT] = ticks[0]
        elif len(pins) > 0 and len(ticks) == 0:
            pass
        elif len(pins) == 0 and len(ticks) == 0:
            # it is the valid case, when query has no input pins
            pass
        else:
            raise ValueError("It is allowed to pass only one non-specified input")
        outputs = self._outputs()
        outputs.query = self
        outputs.in_sources = pins
        return outputs
    class _outputs(object):
        def __getitem__(self, key):
            output_pins = []
            if type(key) is tuple:
                output_pins = list(key)
            elif isinstance(key, str):
                output_pins = [key]
            elif key is None:
                # No output
                pass
            else:
                raise ValueError(f'Output pins can not be of "{type(key)}" type')
            return onetick.py.functions.apply_query(
                self.query, in_sources=self.in_sources, output_pins=output_pins, **self.query.params
            )
[docs]    def to_eval_string(self):
        """Converts query object to `eval` string"""
        res = '"' + self.path + '"'
        if self.params:
            res += f', "{self._params_to_str(self.params, with_expr=True)}"'
        return "eval(" + res + ")" 
[docs]    def update_params(self, **new_params):
        if new_params:
            self.params.update(new_params) 
    @property
    def str_params(self):
        """Query parameters converted to string"""
        if self._str_params is None:
            self._str_params = self._params_to_str(self.params)
        return self._str_params
    @staticmethod
    def _params_to_str(params, *, with_expr=False):
        """ converts param to str
        Parameters
        ----------
        params: dict
            Parameters as dict(name=value)
        with_expr:
            If true return all expression in expr() function
        Returns
        -------
        result: str
            string representation of parameters ready for query evaluation
        """
        def to_str(v):
            if isinstance(v, list):
                return "\\,".join(map(to_str, v))
            else:
                if with_expr:
                    is_dt = ott.is_time_type(v)
                    if is_dt:
                        v = ott.value2str(v)
                    result = query._escape_quotes_in_eval(v)
                    if isinstance(v, _Operation) and getattr(v, "name", None) != "_SYMBOL_NAME" or is_dt:
                        result = f"expr({result})"
                else:
                    result = query._escape_characters_in_query_param(str(v))
                return result
        return ",".join(key + "=" + to_str(value) for key, value in params.items())
    @staticmethod
    def _escape_quotes_in_eval(v):
        return str(v).translate(str.maketrans({"'": r"\'", '"': r'\"'}))
    @staticmethod
    def _escape_characters_in_query_param(result):
        # 0 - no need to add backslash, 1 - need to add
        char_map = [0] * len(result)
        # put 1 between two quotes symbols
        open_char = None
        last_inx = 0
        for inx, c in enumerate(result):
            if open_char == c:
                open_char = None
                continue
            if not open_char and c == "'" or c == '"':
                open_char = c
                last_inx = inx + 1
                continue
            if open_char:
                char_map[inx] = 1
        # clean open tail if necessary
        if open_char:
            char_map[last_inx:] = [0] * (len(result) - last_inx)
        # apply mapping
        res = []
        last_esc = False  # do not add esc if the previous one is already esc
        n_brackets_in_expr_block = 0  # do not escape in expr(...)
        for inx, c in enumerate(result):
            if c == "(":
                if n_brackets_in_expr_block:
                    n_brackets_in_expr_block += 1
                elif result[inx - 4:inx] == "expr":
                    n_brackets_in_expr_block = 1
            if c == ")" and n_brackets_in_expr_block:
                n_brackets_in_expr_block -= 1
            if c in _QUERY_PARAM_SPECIAL_CHARACTERS and char_map[inx] == 0:
                if not last_esc and not n_brackets_in_expr_block:
                    c = "\\" + c
            last_esc = c == "\\"
            res.append(c)
        return "".join(res) 
class Orders(Source):
    def __init__(self, db="S_ORDERS_FIX", symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, **kwargs):
        super().__init__(
            _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
        )
        self.schema['ID'] = str
        self.schema['BUY_FLAG'] = int
        self.schema['SIDE'] = str
        self.schema['STATE'] = str
        self.schema['ORDTYPE'] = str
        self.schema['PRICE'] = float
        self.schema['PRICE_FILLED'] = float
        self.schema['QTY'] = int
        self.schema['QTY_FILLED'] = int
    def base_ep(self, db):
        db = str(db)
        src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
        src.tick_type(db + "::ORDER")
        return src
_db_doc = _param_doc(
    name='db',
    desc="""
    Name(s) of the database or the database object(s).
    """,
    str_annotation='str, list of str, :class:`otp.DB <onetick.py.DB>`',
    default=None,
    str_default='None',
)
_symbol_doc = _param_doc(
    name='symbol',
    desc="""
    Symbol(s) from which data should be taken.
    """,
    str_annotation='str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`',
    default=utils.adaptive,
    str_default=' :py:class:`onetick.py.adaptive`',
)
_tick_type_doc = _param_doc(
    name='tick_type',
    desc="""
    Tick type of the data.
    If not specified, all ticks from `db` will be taken.
    If ticks can't be found or there are many databases specified in `db` then default is "TRD".
    """,
    str_annotation='str, list of str',
    default=utils.adaptive,
    str_default=' :py:class:`onetick.py.adaptive`',
)
_start_doc = _param_doc(
    name='start',
    desc="""
    Start of the interval from which the data should be taken.
    Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time
    limits from the rest of the graph.
    """,
    str_annotation=(
        ':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,'
        ' :py:class:`onetick.py.adaptive`'
    ),
    default=utils.adaptive,
    str_default=' :py:class:`onetick.py.adaptive`',
)
_end_doc = _param_doc(
    name='end',
    desc="""
    End of the interval from which the data should be taken.
    Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time
    limits from the rest of the graph.
    """,
    str_annotation=(
        ':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,'
        ' :py:class:`onetick.py.adaptive`'
    ),
    default=utils.adaptive,
    str_default=' :py:class:`onetick.py.adaptive`',
)
_date_doc = _param_doc(
    name='date',
    desc="""
    Allows to specify a whole day instead of passing explicitly ``start`` and ``end`` parameters.
    If it is set along with the ``start`` and ``end`` parameters then last two are ignored.
    """,
    str_annotation=":class:`datetime.datetime`, :class:`otp.datetime <onetick.py.datetime>`, optional",
    default=None,
)
_schema_policy_doc = _param_doc(
    name='schema_policy',
    desc="""
    Schema deduction policy:
    - 'manual'
      means the schema will be exactly desired_schema.
    - 'tolerant'
      If the schema cannot be deduced, use desired_schema.
      If the schema can be deduced and it lacks fields from the desired_schema,
      or it has a field with a type incompatible with a desired_schema field,
      raise a ValueError.
      Otherwise, use desired_schema with deduced fields added.
      Also, with this policy database is scanned 5 days back to find the schema.
      It is useful when database is misconfigured or in case of holidays.
    - 'fail'
      If the schema cannot be deduced, raise an Exception.
      If the schema can be deduced and is incompatible with desired_schema,
      raise a ValueError.
    """,
    str_annotation="'tolerant', 'fail', 'manual'",
    default=None,
)
_guess_schema_doc = _param_doc(
    name='guess_schema',
    desc="""
    .. deprecated:: 1.3.16
    Use ``schema_policy`` parameter instead.
    """,
    annotation=bool,
    default=None,
)
_identify_input_ts_doc = _param_doc(
    name='identify_input_ts',
    desc="""
    If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks.
    """,
    annotation=bool,
    default=False,
)
_back_to_first_tick_doc = _param_doc(
    name='back_to_first_tick',
    desc="""
    Determines how far back to go looking for the latest tick before ``start`` time.
    If one is found, it is inserted into the output time series with the timestamp set to ``start`` time.
    Note: it will be rounded to int, so otp.Millis(999) will be 0 seconds.
    """,
    str_annotation=('int, :ref:`offset <datetime_offsets>`, '
                    ':class:`otp.expr <onetick.py.expr>`, '
                    ':py:class:`~onetick.py.Operation`'),
    default=0,
)
_keep_first_tick_timestamp_doc = _param_doc(
    name='keep_first_tick_timestamp',
    desc="""
    If set, new field with this name will be added to source.
    This field contains original timestamp of the tick that was taken from before the start time of the query.
    For all other ticks value in this field will be equal to the value of Time field.
    This parameter is ignored if ``back_to_first_tick`` is not set.
    """,
    annotation=str,
    default=None,
)
_desired_schema_doc = _param_doc(
    name='desired_schema',
    desc="""
    List of <column name> -> <column type> pairs that the source is expected to have.
    If the type is irrelevant, provide None as the type in question.
    """,
    str_annotation='type[str]',
    kind=inspect.Parameter.VAR_KEYWORD,
)
DATA_SOURCE_DOC_PARAMS = [
    _db_doc, _symbol_doc, _tick_type_doc,
    _start_doc, _end_doc, _date_doc,
    _schema_policy_doc, _guess_schema_doc,
    _identify_input_ts_doc,
    _back_to_first_tick_doc, _keep_first_tick_timestamp_doc,
    _desired_schema_doc,
]
[docs]class DataSource(Source):
    POLICY_MANUAL = "manual"
    POLICY_TOLERANT = "tolerant"
    POLICY_FAIL = "fail"
    _VALID_POLICIES = frozenset([POLICY_MANUAL, POLICY_TOLERANT, POLICY_FAIL])
    _PROPERTIES = Source._PROPERTIES + ["_p_db"]
    def __get_schema(self, db, start, schema_policy):
        schema = {}
        if start is utils.adaptive:
            start = None  # means that use the last date with data
        if isinstance(db, list):
            ''' This case of a merge, since we need to get combined schema
            across different tick types and dbs '''
            for t_db in db:
                _db = t_db.split(':')[0]
                _tt = t_db.split(':')[-1]
                db_obj = onetick.py.db._inspection.DB(_db)
                if schema_policy == self.POLICY_TOLERANT and start:
                    # repeating the same logic as in db_obj.last_date
                    start = db_obj.last_not_empty_date(start, days_back=5)
                schema.update(db_obj.schema(date=start, tick_type=_tt))
        if db is None or isinstance(db, _SymbolParamColumn):
            ''' In this case we can't get schema, because db is calculated dynamicatlly.
            Set to empty to indicate that in this case we expect the manualy set schema. '''
            schema = {}
        return schema
    def __prepare_schema(self, db, start, end, schema_policy, guess_schema, desired_schema):
        # TODO: improve searching an actual date from the start to end
        if guess_schema is not None:
            warnings.warn(
                """guess_schema flag is deprecated; use schema_policy argument instead:
                             'manual' for no automatic schema guessing;
                             'fail' for automatic schema guessing with an exception being risen when there is no data;
                             'tolerant' for automatic schema guessing with no schema provided when no data is present)""",  # noqa
                DeprecationWarning,
            )
            if schema_policy is not None:
                raise ValueError("guess_schema and schema_policy cannot be set at the same time")
            if guess_schema:
                schema_policy = self.POLICY_FAIL
            else:
                schema_policy = self.POLICY_MANUAL
        if schema_policy is None:
            schema_policy = self.POLICY_TOLERANT
        if schema_policy not in self._VALID_POLICIES:
            raise ValueError("Invalid schema_policy; allowed values are: manual, fail, tolerant")
        elif (schema_policy == self.POLICY_FAIL) or (schema_policy == self.POLICY_TOLERANT):
            actual_schema = self.__get_schema(db, start, schema_policy)
            if len(actual_schema) == 0:
                if schema_policy == self.POLICY_FAIL:
                    raise Exception('No ticks found in database(-s) ' + ', '.join(db))
                else:
                    actual_schema = desired_schema  # lets try to use at least something
            for k, v in desired_schema.items():
                field_type = actual_schema.get(k, None)
                if field_type is None:
                    raise ValueError(f"Database(-s) {', '.join(db)} schema has no {k} field")
                elif not issubclass(field_type, v):
                    raise ValueError(
                        f"Database(-s) {', '.join(db)} schema field {k} has type {field_type}, but {v} was requested"
                    )
            desired_schema.update(actual_schema)
    def __prepare_dates(self, date):
        if isinstance(date, ott.datetime) or isinstance(date, ott.date):
            start = date.start
            end = date.end
        if isinstance(date, dt.datetime) or isinstance(date, dt.date):
            start = dt.datetime(date.year, date.month, date.day)
            end = start + dt.timedelta(days=1, milliseconds=-1)
        return start, end
    def __prepare_db_tick_type(self, db, tick_type, symbol, start, end):
        if isinstance(db, list):
            ''' If everything is correct then this branch should leave
            the `db` var as a list of databases with tick types and the
            `tick_type` var is None.
            Valid cases:
                - Fully defined case. The `db` parameter has a list of databases where
                  every database has a tick type, when the `tick_type`
                  parameter has default value or None (for backward compatibility)
                - Partially defined case. The `db` parameter has a list of databases but
                  not every database has a tick type, and meantime the `tick_type`
                  is passed to not None value. In that case databases without tick type
                  are exetended with a tick type from the `tick_type` parameter
                - No defined case. The `db` parameter has a list of databases and
                  every database there has no tick type, and the `tick_type` is
                  set to not None value. In that case every database is extended with
                  the tick type from the `tick_type`.
            '''
            def db_converter(_db):
                if isinstance(_db, DB):
                    return _db.name
                else:
                    return _db
            db = [db_converter(_db) for _db in db]
            res = all(('::' in _db and _db[-1] != ':' for _db in db))
            if res:
                if tick_type is utils.adaptive or tick_type is None:
                    tick_type = None  # tick types is specified for all databases
                else:
                    raise Exception('The `tick_type` is set as a parameter '
                                    'and also as a part of the `db` parameter'
                                    'for every database')
            else:
                dbs_without_tt = [_db.split(':')[0] for _db in db
                                  if '::' not in _db or _db[-1] == ':']
                if tick_type is utils.adaptive:
                    tick_type = 'TRD'  # default one for backward compatibility and testing usecase
                if tick_type is None:
                    raise Exception('The tick type is not set for databases: ' +
                                    ', '.join(dbs_without_tt))
                else:
                    # extend databases with missing tick types from the tick tick parameter
                    dbs_with_tt = [_db for _db in db
                                   if '::' in _db and _db[-1] != ':']
                    db = dbs_with_tt + [_db + '::' + tick_type for _db in dbs_without_tt]
                    tick_type = None
        if isinstance(db, (DB, inspect_DB)):
            db = db.name  # ... and we go to the next branch
        if isinstance(db, str):
            ''' The resulting `db` var contains a list with string value, that has the `db`
            concatenated with the `tick_type`. '''
            if '::' in db:
                if tick_type is utils.adaptive or tick_type is None:
                    tick_type = db.split(':')[-1]
                    db = db.split('::')[0]
                else:
                    raise Exception('The `tick_type` is set as a parameter '
                                    'and also as a part of the `db` parameter')
            else:
                if tick_type is utils.adaptive or tick_type is None:
                    db_obj = onetick.py.db._inspection.DB(db)
                    # try to find at least one common tick type
                    # through all days
                    tick_types = None
                    if start is utils.adaptive:
                        start = db_obj.last_date
                        end = db_obj.last_date
                    if start and end:  # could be None if there is no data
                        t_start = start
                        while t_start <= end:
                            t_tts = set(db_obj.tick_types(t_start))
                            t_start += dt.timedelta(days=1)
                            if len(t_tts) == 0:
                                continue
                            if tick_types is None:
                                tick_types = t_tts
                            else:
                                tick_types &= t_tts
                            if len(tick_types) == 0:
                                raise Exception(f'It seems that there is no common '
                                                f'tick types for dates from {start} '
                                                f'to {end}. Please specify a tick '
                                                'type')
                    if tick_types is None:
                        if tick_type is utils.adaptive:
                            tick_types = ['TRD']  # the default one
                        else:
                            raise Exception(f'Could not find any data in from {start} '
                                            f' to {end}. Could you check that tick type, '
                                            ' database and date range are correct.')
                    if len(tick_types) != 1:
                        raise Exception('The tick type is not specified, found '
                                        'multiple tick types in the database : ' +
                                        ', '.join(tick_types))
                    tick_type = tick_types.pop()
            if not isinstance(tick_type, str) and isinstance(tick_type, Iterable):
                db = [f'{db}::{tt}' for tt in tick_type]
            else:
                db = [db + '::' + tick_type]
            tick_type = None
        if isinstance(db, _SymbolParamColumn):
            ''' Do nothing, because we don't know whether db will come with the tick
            type or not. The only one thing that definetely we know that tick_type
            can not be utils.adpative '''
            if tick_type is utils.adaptive:
                # TODO: need to test this case
                raise Exception('The `db` is set to the symbol param, in that case '
                                'the `tick_type` should be set explicitly to some value '
                                'or to None')
        if db is None:
            ''' This case means that database comes with the symbol name, then tick type
            should be defined '''
            if tick_type is utils.adaptive or tick_type is None:
                raise Exception('The `db` is not specified that means database is '
                                'expected to be defined with the symbol name. '
                                'In that case the `tick_type` should be defined.')
            if not isinstance(tick_type, str) and isinstance(tick_type, Iterable):
                tick_type = '+'.join(tick_type)
        return db, tick_type
    @docstring(parameters=DATA_SOURCE_DOC_PARAMS, add_self=True)
    def __init__(
        self,
        db=None,
        symbol=utils.adaptive,
        tick_type=utils.adaptive,
        start=utils.adaptive,
        end=utils.adaptive,
        date=None,
        schema_policy=None,
        guess_schema=None,
        identify_input_ts=False,
        back_to_first_tick=0,
        keep_first_tick_timestamp=None,
        *,
        symbols=None,
        **desired_schema,
    ):
        """
        Construct a source providing data from a given ``db``.
        Examples
        ---------
        Symbol can be a collection
        >>> # OTdirective: snippet-name:fetch data.simple;
        >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2'])
        >>> otp.run(data)
                             Time  X
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.000 -3
        2 2003-12-01 00:00:00.001  2
        3 2003-12-01 00:00:00.001 -2
        4 2003-12-01 00:00:00.002  3
        5 2003-12-01 00:00:00.002 -1
        Source also can be passed as symbols, in such case magic named column SYMBOL_NAME will be transform to symbol
        and all other columns will be symbol parameters
        >>> # OTdirective: snippet-name:fetch data.symbols as a source;
        >>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'])
        >>> data = otp.DataSource(db='SOME_DB', symbols=symbols, tick_type='TT')
        >>> otp.run(data)
                             Time  X
        0 2003-12-01 00:00:00.000  1
        1 2003-12-01 00:00:00.000 -3
        2 2003-12-01 00:00:00.001  2
        3 2003-12-01 00:00:00.001 -2
        4 2003-12-01 00:00:00.002  3
        5 2003-12-01 00:00:00.002 -1
        Default schema policy is `tolerant`.
        >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float, date=otp.dt(2022, 3, 1))
        >>> data.schema
        {'PRICE': <class 'float'>, 'SIZE': <class 'int'>}
        >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=int, date=otp.dt(2022, 3, 1))
        Traceback (most recent call last):
          ...
        ValueError: Database(-s) NYSE_TAQ::TRD schema field PRICE has type <class 'float'>,
        but <class 'int'> was requested
        Schema policy `manual` uses exactly desired_schema:
        >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float, \
                                      date=otp.dt(2022, 3, 1), schema_policy='manual')
        >>> data.schema
        {'PRICE': <class 'float'>}
        Schema policy 'fail' raise an exception if the schema cannot be deduced
        >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2021, 3, 1), \
                                     schema_policy='fail')
        Traceback (most recent call last):
          ...
        Exception: No ticks found in database(-s) NYSE_TAQ::TRD
        """
        if self._try_default_constructor(**desired_schema):
            return
        # for cases when we want to explicitly convert into string,
        # it might be symbol param or join_with_query parameter
        if isinstance(tick_type, _ParamColumn):
            tick_type = str(tick_type)[1:-1]
        if date:
            # TODO: write a warning in that case
            start, end = self.__prepare_dates(date)
        db, tick_type = self.__prepare_db_tick_type(db,
                                                    tick_type,
                                                    symbol,
                                                    start,
                                                    end)
        self._p_db = db
        self.__prepare_schema(db,  # tick type is embedded into the db
                              start,
                              end,
                              schema_policy,
                              guess_schema,
                              desired_schema)
        if symbols is not None:
            if symbol is utils.adaptive or symbol is None:
                symbol = symbols
            else:
                # TODO: test it
                raise Exception('You have set the `symbol` and `symbols` parameters'
                                'together, it is not allowed. Please, clarify parameters')
        if isinstance(symbol, Symbols) and symbol._p_db is None:
            symbol = Symbols.duplicate(symbol, db=db)
        if identify_input_ts:
            if "SYMBOL_NAME" in desired_schema:
                raise Exception()  # TODO: think about how user could workaround it
            desired_schema["SYMBOL_NAME"] = str
            if "TICK_TYPE" in desired_schema:
                raise Exception()
            desired_schema["TICK_TYPE"] = str
        # unobvious way to convert otp.Minute/Hour/... to number of seconds
        if type(back_to_first_tick).__name__ == '_DatePartCls':
            back_to_first_tick = int((ott.dt(0) + back_to_first_tick).timestamp())
        if isinstance(back_to_first_tick, _Operation):
            back_to_first_tick = otp.expr(back_to_first_tick)
        if back_to_first_tick != 0 and keep_first_tick_timestamp:
            desired_schema[keep_first_tick_timestamp] = ott.nsectime
        if (
            isinstance(symbol, Source)
            or hasattr(symbol, "__iter__")
            and not isinstance(symbol, dict)
            and not isinstance(symbol, str)
            or isinstance(symbol, query)
            or isinstance(symbol, _QueryEvalWrapper)
        ):
            super().__init__(
                _start=start,
                _end=end,
                _base_ep_func=lambda: self._base_ep_for_cross_symbol(
                    db, symbol, tick_type,
                    identify_input_ts=identify_input_ts,
                    back_to_first_tick=back_to_first_tick,
                    keep_first_tick_timestamp=keep_first_tick_timestamp
                ),
                **desired_schema
            )
        else:
            super().__init__(
                _symbols=symbol,
                _start=start,
                _end=end,
                _base_ep_func=lambda: self.base_ep(
                    db,
                    tick_type,
                    identify_input_ts=identify_input_ts,
                    back_to_first_tick=back_to_first_tick,
                    keep_first_tick_timestamp=keep_first_tick_timestamp,
                ),
                **desired_schema
            )
    @property
    def db(self):
        return self._p_db
    @staticmethod
    def _create_source(passthrough_ep, back_to_first_tick=0, keep_first_tick_timestamp=None):
        """Create graph that save original timestamp of first tick if needed"""
        if back_to_first_tick != 0 and keep_first_tick_timestamp:
            src = Source(otq.Passthrough())
            src.sink(otq.AddField(field=keep_first_tick_timestamp, value='TIMESTAMP'))
            src.sink(passthrough_ep)
            return src
        return Source(passthrough_ep)
    def base_ep(self, db, tick_type, identify_input_ts, back_to_first_tick=0, keep_first_tick_timestamp=None):
        if db is not None:
            if isinstance(db, list):
                str_db = "+".join(db)
            else:
                str_db = str(db)
            if tick_type:
                if isinstance(db, _SymbolParamColumn):
                    str_db = f"expr({str_db} + '::{tick_type}')"    # TODO: test
                else:
                    if "::" not in str_db:
                        str_db += "::" + tick_type
            else:
                if isinstance(db, _SymbolParamColumn):
                    str_db = f"expr({str_db})"  # TODO: test
        else:
            str_db = tick_type
        if isinstance(db, list) or isinstance(db, _SymbolParamColumn):
            src = self._create_source(otq.Passthrough(go_back_to_first_tick=back_to_first_tick),
                                      back_to_first_tick=back_to_first_tick,
                                      keep_first_tick_timestamp=keep_first_tick_timestamp)
            src.sink(otq.Merge(identify_input_ts=identify_input_ts))
        else:
            params = dict(go_back_to_first_tick=back_to_first_tick)
            if identify_input_ts:
                params["fields"] = "SYMBOL_NAME,TICK_TYPE"
                params["drop_fields"] = True
            src = self._create_source(otq.Passthrough(**params),
                                      back_to_first_tick=back_to_first_tick,
                                      keep_first_tick_timestamp=keep_first_tick_timestamp)
        src.tick_type(str_db)
        return src
    def _base_ep_for_cross_symbol(
        self, db, symbol, tick_type, identify_input_ts, back_to_first_tick=0, keep_first_tick_timestamp=None
    ):
        tmp_otq = TmpOtq()
        if isinstance(symbol, _QueryEvalWrapper):
            symbol = symbol.to_eval_string(tmp_otq=tmp_otq)
        elif isinstance(symbol, query):
            symbol = symbol.to_eval_string()
        elif isinstance(symbol, Source):
            symbol = self._convert_symbol_to_string(symbol, tmp_otq)
        if db is not None:
            if isinstance(db, list):
                tick_type = "+".join(db)
            else:
                tick_type = f"{db}::{tick_type}"
        src = self._create_source(otq.Passthrough(go_back_to_first_tick=back_to_first_tick),
                                  back_to_first_tick=back_to_first_tick,
                                  keep_first_tick_timestamp=keep_first_tick_timestamp)
        src.sink(otq.Merge(identify_input_ts=identify_input_ts).symbols(symbol).tick_type(tick_type))
        src._tmp_otq.merge(tmp_otq)
        return src 
Custom = DataSource  # for backward compatiblity, previously we had only Custom
[docs]class Symbols(Source):
    """
    Construct a source that returns ticks with information about symbols in a database.
    The SYMBOL_NAME field is populated with symbol names. The TICK_TYPE field contains
    corresponding tick type (enabled by the ``show_tick_type`` parameter).
    Parameters
    ----------
    db: str
        Name of the database where to search symbols
    tick_type: str
        Tick type to use. Default is `ANY`
    start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \
                    :py:class:`onetick.py.adaptive`
        Time interval from which the data should be taken.
    date: :py:class:`datetime.date`
        Alernative way of setting instead of start/end times
    keep_db: bool
        Flag that indicates whether symbols should have a db prefix.
    pattern: str
        SQL syntax patter for symbols. Default is '%'
    for_tick_type: str
        Fetch only symbols belong to this tick type, if specified.
    show_tick_type: bool
        Add the TICK_TYPE column with the information about tick type
    symbology: str
        The destination symbology for a symbol name translation.
        Translation is performed, if destination symbology is not empty
        and is different from that of the queried database.
    show_original_symbols: bool
        Switches original symbol name propagation as a tick field ORIGINAL_SYMBOL_NAME
        if symbol name translation is performed (if `symbology` is set).
        Note that if this parameter is set to True,
        database symbols with missing translations are also propagated.
    Note
    ----
    Additional fields that can be added to Symbols will be converted to symbol parameters
    See also
    --------
    | :ref:`Symbols guide <Symbols>`
    | **FIND_DB_SYMBOLS** OneTick event processor
    Examples
    --------
    This class can be used to get a list of all symbols in the database:
    >>> otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1)).to_df()
            Time  SYMBOL_NAME
    0 2022-03-01          AAP
    1 2022-03-01         AAPL
    Also this class can be used to specify symbols for the main query:
    >>> symbols = otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1))
    >>> data = otp.DataSource('NYSE_TAQ', tick_type='TRD', date=otp.dt(2022, 3, 1))
    >>> result = otp.run(data, symbols=symbols)
    >>> result['AAPL']
                         Time  PRICE  SIZE
    0 2022-03-01 00:00:00.000    1.3   100
    1 2022-03-01 00:00:00.001    1.4    10
    2 2022-03-01 00:00:00.002    1.4    50
    >>> result['AAP']
                         Time  PRICE
    0 2022-03-01 00:00:00.000  45.37
    1 2022-03-01 00:00:00.001  45.41
    Additional fields of the ``otp.Symbols`` can be used in the main query as symbol parameters:
    >>> symbols = otp.Symbols('SOME_DB', show_tick_type=True, keep_db=True)
    >>> symbols['PARAM'] = symbols['SYMBOL_NAME'] + '__' + symbols['TICK_TYPE']
    >>> data = otp.DataSource('SOME_DB')
    >>> data['S_PARAM'] = data.Symbol.PARAM
    >>> data = otp.merge([data], symbols=symbols)
    >>> otp.run(data)
                         Time   X          S_PARAM
    0 2003-12-01 00:00:00.000   1  SOME_DB::S1__TT
    1 2003-12-01 00:00:00.000  -3  SOME_DB::S2__TT
    2 2003-12-01 00:00:00.001   2  SOME_DB::S1__TT
    3 2003-12-01 00:00:00.001  -2  SOME_DB::S2__TT
    4 2003-12-01 00:00:00.002   3  SOME_DB::S1__TT
    5 2003-12-01 00:00:00.002  -1  SOME_DB::S2__TT
    """
    _PROPERTIES = Source._PROPERTIES + ["_p_db",
                                        "_p_pattern",
                                        "_p_start",
                                        "_p_end",
                                        "_p_for_tick_type",
                                        "_p_keep_db"]
    def __init__(
        self,
        db=None,
        tick_type="ANY",
        start=utils.adaptive,
        end=utils.adaptive,
        date=None,
        find_params=None,
        keep_db=False,
        pattern='%',
        for_tick_type=None,
        show_tick_type=False,
        symbology='',
        show_original_symbols=False,
        **kwargs
    ):
        if self._try_default_constructor(**kwargs):
            return
        self._p_db = db
        self._p_pattern = pattern
        self._p_start = start
        self._p_end = end
        self._p_keep_db = keep_db
        self._p_for_tick_type = for_tick_type
        if date:
            if isinstance(date, ott.datetime) or isinstance(date, ott.date):
                start = date.start
                end = date.end
        _symbol = utils.adaptive
        if db:
            if isinstance(db, list):
                _symbol = [f"{str(_db).split(':')[0]}::" for _db in db] # noqa
            else:
                _symbol = f"{str(db).split(':')[0]}::"  # noqa
        _find_params = find_params if find_params is not None else {}
        _find_params.setdefault('pattern', pattern)
        if for_tick_type:
            _find_params['tick_type_field'] = for_tick_type
        _find_params.setdefault('show_tick_type', show_tick_type)
        _find_params.setdefault('symbology', symbology)
        _find_params.setdefault('show_original_symbols', show_original_symbols)
        super().__init__(
            _symbols=_symbol,
            _start=start,
            _end=end,
            _base_ep_func=lambda: self.base_ep(ep_tick_type=tick_type, keep_db=keep_db, **_find_params),
        )
        self.schema['SYMBOL_NAME'] = str
        if _find_params['show_tick_type']:
            self.schema['TICK_TYPE'] = str
        if _find_params['symbology'] and _find_params['show_original_symbols']:
            self.schema['ORIGINAL_SYMBOL_NAME'] = str
    def base_ep(self, ep_tick_type, keep_db, **params):
        src = Source(otq.FindDbSymbols(**params))
        src.tick_type(ep_tick_type)
        src.schema['SYMBOL_NAME'] = str
        if not keep_db:
            src["SYMBOL_NAME"] = src["SYMBOL_NAME"].str.regex_replace('.*::', '')
        return src
    @staticmethod
    def duplicate(obj, db=None):
        return Symbols(db=obj._p_db if db is None else db,
                       pattern=obj._p_pattern,
                       start=obj._p_start,
                       end=obj._p_end,
                       keep_db=obj._p_keep_db,
                       for_tick_type=obj._p_for_tick_type) 
def default_date_converter(date):
    return pd.to_datetime(date, format='%Y%m%d%H%M%S.%f')
def to_timestamp_nanos(date, date_converter, tz):
    date = date_converter(date)
    if isinstance(date, ott.dt):
        date = date.ts
    else:
        date = pd.to_datetime(date)
    return date.tz_localize(tz)
def LocalCSVTicks(path,
                  start=utils.adaptive,
                  end=utils.adaptive,
                  date_converter=default_date_converter,
                  additional_date_columns=None,
                  converters=None,
                  tz=None,
                  ):
    """
    Loads ticks from csv file, and creating otp.Ticks object from them
    Parameters
    ----------
    path: str
        Absolute path to csv file
    start: datetime object
        Start of the query interval
    end: datetime object
        End of the query interval
    date_converter:
        A converter from string to datetime format, by default used only to TIMESTAMP column
    additional_date_columns:
        Other columns to convert to datetime format
    converters:
        Non default converters to columns from strings
    tz:
        timezone
    Returns
    -------
    otp.Ticks
    """
    if tz is None:
        tz = configuration.config.tz
    c = {'TIMESTAMP': partial(to_timestamp_nanos, date_converter=date_converter, tz=tz)}
    if converters is not None:
        c.update(converters)
    if additional_date_columns is not None:
        c.update({column: partial(to_timestamp_nanos,
                                  date_converter=date_converter,
                                  tz=tz,
                                  ) for column in additional_date_columns})
    df = pd.read_csv(path, converters=c)
    df['TS_'] = df['TIMESTAMP']
    df['SYMBOL_NAME'] = df['#SYMBOL_NAME']
    d = df.to_dict(orient='list')
    del d['TIMESTAMP']
    del d['#SYMBOL_NAME']
    ticks = Ticks(d, start=start, end=end)
    ticks['TIMESTAMP'] = ticks['TS_']
    ticks = ticks.drop('TS_')
    return ticks
class SymbologyMapping(Source):
    _PROPERTIES = Source._PROPERTIES + ["_p_dest_symbology"]
    def __init__(self,
                 dest_symbology: str = None,
                 tick_type: str = None,
                 start=utils.adaptive,
                 end=utils.adaptive,
                 symbols=utils.adaptive,
                 **desired_schema):
        if self._try_default_constructor(**desired_schema):
            return
        if not dest_symbology or not tick_type:
            raise TypeError("Missing required argument: 'dest_symbology' or 'tick_type'")
        self._p_dest_symbology = dest_symbology
        super().__init__(
            _symbols=symbols,
            _start=start,
            _end=end,
            _base_ep_func=lambda: self.base_ep(dest_symbology, tick_type),
            **desired_schema
        )
        self.schema['MAPPED_SYMBOL_NAME'] = str
        self.schema['END_DATETIME'] = ott.nsectime
    @property
    def dest_symbology(self):
        return self._p_dest_symbology
    def base_ep(self, dest_symbology, tick_type):
        src = Source(otq.SymbologyMapping(dest_symbology=dest_symbology))
        src.tick_type(tick_type)
        return src
class SplitQueryOutputBySymbol(Source):
    def __init__(self,
                 query=None,
                 symbol_field=None,
                 single_invocation=False,
                 db=utils.adaptive_to_default,
                 tick_type=utils.adaptive,
                 start=utils.adaptive,
                 end=utils.adaptive,
                 symbols=utils.adaptive,
                 **desired_schema):
        if self._try_default_constructor(**desired_schema):
            return
        if isinstance(query, Source):  # TODO: support already existing queries
            query = query.copy()
            otq_query = query._save_as_tmp_otq()
            q_start, q_end, _ = query._get_date_range()
            if start is utils.adaptive and end is utils.adaptive:
                start, end = q_start, q_end
        else:
            raise Exception('Non supported type of the `query` is specified')
        if db is utils.adaptive_to_default:
            db = configuration.config.get('default_db')
        if tick_type is utils.adaptive:
            tick_type = 'SPLIT_BY_SYMBOL'
        super().__init__(
            _symbols=symbols,
            _start=start,
            _end=end,
            _base_ep_func=partial(self.build, db, tick_type, symbol_field, otq_query, single_invocation),
            **desired_schema
        )
    def build(self, db, tick_type, symbol_field_name, otq_query, single_invocation):
        src = Source(otq.SplitQueryOutputBySymbol(otq_query=otq_query,
                                                  symbol_field_name=str(symbol_field_name),
                                                  ensure_single_invocation=single_invocation))
        if db:
            tick_type = str(db) + f'::{tick_type}'
        src.tick_type(tick_type)
        return src
[docs]def by_symbol(src: Source,
              symbol_field,
              single_invocation=None,
              db=utils.adaptive_to_default,
              tick_type=utils.adaptive,
              start=utils.adaptive,
              end=utils.adaptive,
              ) -> Source:
    """
    Create a separate data series for each unique value of ``symbol_field`` in the output of ``src``.
    ``src`` must specify enough parameters to be run (e.g., symbols, query range). A typical use case is to split a
    single data series (e.g., from a CSV file) into separate data series by symbol. This method is a source.
    Parameters
    ----------
    src: Source
        a query which output is to be split by ``symbol_field``
    symbol_field: str
        the name of the field carrying symbol name in the ``src`` query
    single_invocation: bool, optional
        ``True`` means that the ``src`` query is run once and the result stored in memory speeding up the execution.
        ``False`` means that the ``src`` query is run for every symbol of the query saving memory
        but slowing down query execution.
        ``None`` means that this flag will be automatically chosen as ``True`` for CSV sources and ``False`` for other
        sources.
    db: str, optional
        Database for running the query. Doesn't affect the ``src`` query. The default value
        is ``otp.config['default_db']``.
    tick_type: str, optional
        Tick type for the query. Doesn't affect the ``src`` query.
    start: otp.dt, optional
        By default it is taken from the ``src`` start time
    end: otp.dt, optional
        By default it is taken from the ``src`` end time
    See also
    --------
    **SPLIT_QUERY_OUTPUT_BY_SYMBOL** OneTick event processor
    Examples
    --------
    >>> executions = otp.CSV( # doctest: +SKIP
    ...     otp.utils.file(os.path.join(cur_dir, 'data', 'example_events.csv')),
    ...     converters={"time_number": lambda c: c.apply(otp.nsectime)},
    ...     timestamp_name="time_number",
    ...     start=otp.dt(2022, 7, 1),
    ...     end=otp.dt(2022, 7, 2),
    ...     order_ticks=True
    ... )[['stock', 'px']]
    >>> csv = otp.by_symbol(executions, 'stock') # doctest: +SKIP
    >>> trd = otp.DataSource( # doctest: +SKIP
    ...     db='NYSE_TAQ',
    ...     tick_type='TRD',
    ...     start=otp.dt(2022, 7, 1),
    ...     end=otp.dt(2022, 7, 2)
    ... )[['PRICE', 'SIZE']]
    >>> data = otp.funcs.join_by_time([csv, trd]) # doctest: +SKIP
    >>> result = otp.run(data, symbols=executions.distinct(keys='stock')[['stock']], concurrency=8) # doctest: +SKIP
    >>> result['THG'] # doctest: +SKIP
                               Time stock      px   PRICE  SIZE
    0 2022-07-01 11:37:56.432947200   THG  148.02  146.48     1
    >>> result['TFX'] # doctest: +SKIP
                               Time stock      px   PRICE  SIZE
    0 2022-07-01 11:39:45.882808576   TFX  255.61  251.97     1
    >>> result['BURL'] # doctest: +SKIP
                               Time stock      px   PRICE  SIZE
    0 2022-07-01 11:42:35.125718016  BURL  137.53  135.41     2
    """
    if single_invocation is None:
        if isinstance(src, CSV):
            single_invocation = True
        else:
            single_invocation = False
    result = SplitQueryOutputBySymbol(src,
                                      symbol_field=symbol_field,
                                      single_invocation=single_invocation,
                                      db=db,
                                      tick_type=tick_type,
                                      start=start,
                                      end=end)
    result.schema.set(**src.schema)
    return result 
[docs]@docstring(parameters=OB_SNAPSHOT_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS)
def ObSnapshot(*args, **kwargs):
    """
    Construct a source providing order book snapshot for a given ``db``.
    This is just a shorcut for otp.DataSource + otp.agg.ob_snapshot.
    See also
    --------
    | :class:`onetick.py.DataSource`
    | :meth:`onetick.py.Source.ob_snapshot`
    | :func:`onetick.py.agg.ob_snapshot`
    | **OB_SNAPSHOT** OneTick event processor
    Examples
    ---------
    >>> data = otp.ObSnapshot(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP
    >>> data.to_df() # doctest: +SKIP
            Time  PRICE             UPDATE_TIME  SIZE  LEVEL  BUY_SELL_FLAG
    0 2003-12-04    2.0 2003-12-01 00:00:00.003     6      1              1
    1 2003-12-04    5.0 2003-12-01 00:00:00.004     7      1              0
    """
    aggregation_params = {
        param.name: kwargs.pop(param.name, param.default)
        for _, param in OB_SNAPSHOT_DOC_PARAMS
    }
    src = otp.DataSource(*args, **kwargs)
    return otp.agg.ob_snapshot(**aggregation_params).apply(src) 
[docs]@docstring(parameters=OB_SNAPSHOT_WIDE_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS)
def ObSnapshotWide(*args, **kwargs):
    """
    Construct a source providing order book wide snapshot for a given ``db``.
    This is just a shorcut for otp.DataSource + otp.agg.ob_snapshot_wide.
    See also
    --------
    | :class:`onetick.py.DataSource`
    | :meth:`onetick.py.Source.ob_snapshot_wide`
    | :func:`onetick.py.agg.ob_snapshot_wide`
    | **OB_SNAPSHOT_WIDE** OneTick event processor
    Examples
    ---------
    >>> data = otp.ObSnapshotWide(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP
    >>> data.to_df() # doctest: +SKIP
            Time  BID_PRICE         BID_UPDATE_TIME  BID_SIZE  ASK_PRICE         ASK_UPDATE_TIME  ASK_SIZE  LEVEL
    0 2003-12-03        5.0 2003-12-01 00:00:00.004         7        2.0 2003-12-01 00:00:00.003         6      1
    """
    aggregation_params = {
        param.name: kwargs.pop(param.name, param.default)
        for _, param in OB_SNAPSHOT_WIDE_DOC_PARAMS
    }
    src = otp.DataSource(*args, **kwargs)
    return otp.agg.ob_snapshot_wide(**aggregation_params).apply(src) 
[docs]@docstring(parameters=OB_SNAPSHOT_FLAT_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS)
def ObSnapshotFlat(*args, **kwargs):
    """
    Construct a source providing order book flat snapshot for a given ``db``.
    This is just a shorcut for otp.DataSource + otp.agg.ob_snapshot_flat.
    See also
    --------
    | :class:`onetick.py.DataSource`
    | :meth:`onetick.py.Source.ob_snapshot_flat`
    | :func:`onetick.py.agg.ob_snapshot_flat`
    | **OB_SNAPSHOT_FLAT** OneTick event processor
    Examples
    ---------
    >>> data = otp.ObSnapshotFlat(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP
    >>> data.to_df() # doctest: +SKIP
            Time  BID_PRICE1        BID_UPDATE_TIME1  BID_SIZE1  ASK_PRICE1        ASK_UPDATE_TIME1  ASK_SIZE1
    0 2003-12-03         5.0 2003-12-01 00:00:00.004          7         2.0 2003-12-01 00:00:00.003          6
    """
    aggregation_params = {
        param.name: kwargs.pop(param.name, param.default)
        for _, param in OB_SNAPSHOT_FLAT_DOC_PARAMS
    }
    src = otp.DataSource(*args, **kwargs)
    return otp.agg.ob_snapshot_flat(**aggregation_params).apply(src)