Source code for onetick.py.sources

import datetime as dt
import inspect
import operator
import os
import sys
import warnings
import io
import math

from functools import partial
from typing import Optional, Union, Type, Iterable

import onetick.py as otp
import onetick.query as otq
import pandas as pd

import onetick.py.core._source
import onetick.py.functions
import onetick.py.db._inspection
from onetick.py.core._internal._param_column import _ParamColumn
from onetick.py.core._source._symbol_param_column import _SymbolParamColumn
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.core.column import _Column
from onetick.py.core.eval_query import _QueryEvalWrapper
from onetick.py.core.source import Source, _Source  # _Source for backward compatibility

from . import types as ott
from . import utils, configuration
from .core import _csv_inspector, query_inspector
from .core.column_operations._methods.methods import is_arithmetical
from .core.column_operations.base import _Operation
from .db.db import DB
from .db._inspection import DB as inspect_DB

from .aggregations._docs import docstring, _param_doc
from .aggregations.order_book import (
    OB_SNAPSHOT_DOC_PARAMS, OB_SNAPSHOT_WIDE_DOC_PARAMS, OB_SNAPSHOT_FLAT_DOC_PARAMS
)


_QUERY_PARAM_SPECIAL_CHARACTERS = "=,"

AdaptiveTickType = Union[str, Type[utils.adaptive]]


def update_node_tick_type(node: "Source", tick_type: AdaptiveTickType, db: Optional[str] = None):
    """Update node tick_type according to db name and tick_type.

    Don't change tick type for adaptive tick type.

    Parameters
    ----------
    node: Source
        node to set tick_type on
    tick_type: AdaptiveTickType
        string tick type or :py:class:`onetick.py.adaptive`
    db: Optional[str]
        optional db name
    """
    # do not change tick type for adaptive `tick_type`
    if not isinstance(tick_type, type) and tick_type is not utils.adaptive:
        if db:
            node.tick_type(db + "::" + tick_type)
        else:
            node.tick_type(tick_type)


class Tick(Source):
    """
    Generate single tick object

    Parameters
    ----------
    offset: int, default=0
        tick timestamp offset from query start time in `offset_part`
    offset_part: one of [nanosecond, millisecond, second, minute, hour, day, dayofyear, weekday, week, month, quarter, year], default=millisecond   #noqa
        offset union
    symbol:
        data symbol
    db:
        data database
    start:
        start time for tick
    end:
        end time for tick
    tick_type: AdaptiveTickType
        Special tick_type `TICK_GENERATOR` will be used by default. You can use
        :py:class:`onetick.py.adaptive` for the value if you want to use sink node tick type
        instead of defining your own.
    bucket_time:
        ???
    bucket_interval:
        ???
    kwargs

    See also
    --------
    **TICK_GENERATOR** OneTick event processor
    """

    def __init__(
        self,
        offset=0,
        offset_part='millisecond',
        time: ott.datetime = None,
        timezone_for_time=None,
        symbol=utils.adaptive_to_default,
        db=utils.adaptive_to_default,
        start=utils.adaptive,
        end=utils.adaptive,
        tick_type: Optional[AdaptiveTickType] = None,
        bucket_time: str = "start",
        bucket_interval: int = 0,
        **kwargs,
    ):

        if self._try_default_constructor(**kwargs):
            return

        if len(kwargs) == 0:
            raise ValueError("It is not allowed to have a tick without fields")
        if time is not None and offset != 0:
            raise ValueError("It's not allowed to set parameter 'datetime' and set non-zero offset at the same time")
        bucket_time = self._get_bucket_time(bucket_time)
        if tick_type is None:
            tick_type = "TICK_GENERATOR"

        columns = {}
        for key, value in kwargs.items():
            # the way to skip a field
            if value is None:
                continue

            if inspect.isclass(value):
                raise TypeError(f"Tick constructor expects values but not types, {value}")
            else:
                value_type = ott.get_object_type(value)

            if value_type is str:
                if isinstance(value, _Column) or is_arithmetical(value):
                    if value.dtype is not str:
                        value_type = value.dtype
                elif len(value) > ott.string.DEFAULT_LENGTH:
                    value_type = ott.string[len(value)]

            if value_type is bool:
                value_type = float

            if issubclass(value_type, (ott.datetime, ott.date, dt.datetime, dt.date, pd.Timestamp)):
                value_type = ott.nsectime

            columns[key] = value_type

        super().__init__(
            _symbols=symbol,
            _start=start,
            _end=end,
            _base_ep_func=lambda: self.base_ep(db=db,
                                               tick_type=tick_type,
                                               offset=offset,
                                               offset_part=offset_part,
                                               time=time,
                                               timezone_for_time=timezone_for_time,
                                               columns=columns,
                                               bucket_time=bucket_time,
                                               bucket_interval=bucket_interval,
                                               **kwargs),
            **columns,
        )

    def base_ep(self,
                db=utils.adaptive_to_default,
                tick_type="TICK_GENERATOR",
                offset=0,
                offset_part='millisecond',
                time=None,
                timezone_for_time=None,
                columns=None,
                bucket_time="start",
                bucket_interval=0,
                **kwargs):
        if columns is None:
            columns = {}

        if db is utils.adaptive_to_default:
            # if default database is not set, tick type will be set without it
            # and symbols will have to be specified in otp.run
            db = configuration.config.get('default_db')

        params = ",".join(
            ott.type2str(columns[key]) + " " + str(key) + "=" + ott.value2str(value)
            for key, value in kwargs.items()
            if value is not None
        )

        src = Source(
            otq.TickGenerator(
                bucket_interval=bucket_interval,
                bucket_time=bucket_time,
                fields=params
            ),
            **columns
        )

        update_node_tick_type(src, tick_type, db)

        # TIMESTAMP += offset will add redundant nodes to sort the timestamps.
        # No sorting needed for a single tick.
        if offset:
            src.sink(otq.UpdateField(field="TIMESTAMP",
                                     value=f"dateadd('{offset_part}', {offset}, TIMESTAMP, _TIMEZONE)"))
        elif time:
            src.sink(otq.UpdateField(field="TIMESTAMP",
                                     value=ott.datetime2expr(time, timezone_naive=timezone_for_time)))
        return src

    @staticmethod
    def _get_bucket_time(bucket_time):
        if bucket_time == "BUCKET_START":
            warnings.warn("BUCKET_START value is deprecated. Please, use 'start' instead", DeprecationWarning)
        elif bucket_time == "BUCKET_END":
            warnings.warn("BUCKET_END value is deprecated. Please, use 'end' instead", DeprecationWarning)
        elif bucket_time == "start":
            bucket_time = "BUCKET_START"
        elif bucket_time == "end":
            bucket_time = "BUCKET_END"
        else:
            raise ValueError(f"Only 'start' and 'end' values supported as bucket time, but you've passed {bucket_time}")
        return bucket_time


[docs]def Ticks(data=None, symbol=utils.adaptive_to_default, db=utils.adaptive_to_default, start=utils.adaptive, end=utils.adaptive, tick_type: Optional[AdaptiveTickType] = None, timezone_for_time=None, **inplace_data): """ Data source that generates ticks. Ticks are placed with the 1 millisecond offset from each other starting from the start of the query interval. It has ability to change `distance` between ticks using the special reserved field name ``offset``, that specify time offset from a previous tick. Parameters ---------- data: dict, list or pandas.DataFrame, optional Ticks values * ``dict`` -- <field_name>: <values> * ``list`` -- [[<field_names>], [<first_tick_values>], ..., [<n_tick_values>]] * :pandas:`DataFrame <pandas.DataFrame>` -- DataFrame with ``Time`` column * ``None`` -- ``inplace_data`` will be used symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>` Symbol(s) from which data should be taken. db: str Database to use for tick generation start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`onetick.py.adaptive` Timestamp for data generation tick_type: str tick type for data generation timezone_for_time: str timezone for data generation **inplace_data: dict <field_name>: list(<field_values>) See also -------- **TICK_GENERATOR** OneTick event processor Examples -------- Pass data in ``dict`` >>> d = otp.Ticks({'A': [1, 2, 3], 'B': [4, 5, 6]}) >>> otp.run(d) Time A B 0 2003-12-01 00:00:00.000 1 4 1 2003-12-01 00:00:00.001 2 5 2 2003-12-01 00:00:00.002 3 6 Pass ``inplace_data`` >>> d = otp.Ticks(A=[1, 2, 3], B=[4, 5, 6]) >>> otp.run(d) Time A B 0 2003-12-01 00:00:00.000 1 4 1 2003-12-01 00:00:00.001 2 5 2 2003-12-01 00:00:00.002 3 6 Pass data in ``list`` >>> d = otp.Ticks([['A', 'B'], ... [1, 4], ... [2, 5], ... [3, 6]]) >>> otp.run(d) Time A B 0 2003-12-01 00:00:00.000 1 4 1 2003-12-01 00:00:00.001 2 5 2 2003-12-01 00:00:00.002 3 6 Using the ``offset`` example >>> data = otp.Ticks(X=[1, 2, 3], offset=[0, otp.Nano(1), 1]) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000000000 1 1 2003-12-01 00:00:00.000000001 2 2 2003-12-01 00:00:00.001000000 3 Using pandas.DataFrame >>> start_datetime = datetime(2023, 1, 1, 12) >>> time_array = [start_datetime + otp.Hour(1) + otp.Nano(1)] >>> a_array = [start_datetime - otp.Day(15) - otp.Nano(7)] >>> df = pd.DataFrame({'Time': time_array,'A': a_array}) >>> data = otp.Ticks(df) >>> otp.run(data, start=start_datetime, end=start_datetime + otp.Day(1)) Time A 0 2023-01-01 13:00:00.000000001 2022-12-17 11:59:59.999999993 """ if tick_type is None: tick_type = "TICK_GENERATOR" if db is utils.adaptive_to_default: db = configuration.config.get('default_db') if isinstance(data, pd.DataFrame): if 'Time' not in data.columns: raise ValueError('Field `Time` is required for constructing an `otp.Source` from `pandas.DataFrame`') data = data.rename(columns={"Time": "time"}) data = data.to_dict('list') if data and len(inplace_data) != 0: raise ValueError("Data can be passed only using either the `data` parameter " "or inplace through the key-value args") if isinstance(data, list): reform = {} for inx, key in enumerate(data[0]): reform[key] = [sub_list[inx] for sub_list in data[1:]] data = reform if data is None: if inplace_data: data = inplace_data else: raise ValueError("You don't specify any date to create ticks from. " "Please, use otp.Empty for creating empty data source") else: data = data.copy() value_len = -1 for key, value in data.items(): if value_len == -1: value_len = len(value) else: if value_len != len(value): # TODO: write test to cover that case raise ValueError( f"It is not allowed to have different columns of different lengths, " f"some of columns have {value_len} length, but column '{key}', as instance, has {len(value)}" ) use_absolute_time = False if "offset" in data: if "time" in data: raise ValueError("You cannot specify offset and time at the same time") else: if "time" in data: use_absolute_time = True else: data["offset"] = list(range(value_len)) if not use_absolute_time: offset_values = [] offset_parts = [] for ofv in data['offset']: if isinstance(ofv, ott.offsets.Tick): offset_values.append(ofv.n) offset_parts.append(str(ofv.datepart)[1:-1]) else: offset_values.append(ofv) offset_parts.append('millisecond') data['offset'] = offset_values data['offset_part'] = offset_parts if value_len == 1: columns = {key: value[0] for key, value in data.items()} return Tick(db=db, symbol=symbol, tick_type=tick_type, start=start, end=end, timezone_for_time=timezone_for_time, **columns) else: # select only columns that do not contain None there to support # heterogeneous data not_none_columns = [] for key in data.keys(): data[key] = [float(elem) if isinstance(elem, bool) else elem for elem in data[key]] for key, value in data.items(): add = True for v in value: # we need it, because can't use _Column instances in if-clauses if isinstance(v, _Column): continue if v is None: add = False break if add: not_none_columns.append(key) # if a field depends on a symbol parameter, it cannot be csv'd (it's dynamic) # likewise for otq parameters # if there's a better way to check whether a value is constant, # will be glad to hear about it is_outside_data_dependent = False for key, value in data.items(): for v in value: str_rep = str(v) if ("_SYMBOL_NAME" in str_rep) or ("_SYMBOL_PARAM" in str_rep) or ("$" in str_rep): is_outside_data_dependent = True break # infinity() and (on windows) nan() cannot be natively read from a csv has_special_values = False for key, value in data.items(): for v in value: if isinstance(v, ott._inf) or \ (isinstance(v, ott._nan) or isinstance(v, float) and math.isnan(v)) \ and sys.platform.startswith("win"): has_special_values = True break if (len(not_none_columns) == len(data)) and (not is_outside_data_dependent) and (not has_special_values): # Data is homogenous; CSV backing can be used return _DataCSV(data, value_len, db=db, symbol=symbol, tick_type=tick_type, start=start, end=end, timezone_for_time=timezone_for_time, use_absolute_time=use_absolute_time) else: # Fallback is a merge of individual ticks ticks = [] for inx in range(value_len): columns = {key: value[inx] for key, value in data.items()} ticks.append(Tick(db=db, symbol=symbol, tick_type=tick_type, start=start, end=end, timezone_for_time=timezone_for_time, **columns)) return onetick.py.functions.merge(ticks, align_schema=not_none_columns)
class _DataCSV(Source): def __init__( self, data=None, length=None, db=utils.adaptive_to_default, symbol=utils.adaptive_to_default, tick_type=None, start=utils.adaptive, end=utils.adaptive, use_absolute_time=False, timezone_for_time=None, **kwargs, ): if self._try_default_constructor(**kwargs): return if data is None or length is None: raise ValueError("'data' and 'length' parameters can't be None") if db is utils.adaptive_to_default: db = configuration.config.get('default_db') def datetime_to_expr(v): if ott.is_time_type(v): return ott.datetime2expr(v, timezone_naive=timezone_for_time) if isinstance(v, ott.nsectime): # TODO: change to ott.value2str after PY-441 return f'NSECTIME({v})' if isinstance(v, ott.msectime): return ott.value2str(v) raise ValueError(f"Can't convert value {v} to datetime expression") if use_absolute_time: # converting values of "time" column to onetick expressions converted_times = [] for d in data["time"]: converted_times.append(datetime_to_expr(d)) data["time"] = converted_times def csv_rep(value): if issubclass(type(value), str): return '"' + value.replace("\\", "\\\\").replace('"', '\\"') + '"' else: return str(value) def get_type_of_column(key): def get_type_of_value(value): t = ott.get_object_type(value) if ott.is_time_type(t): return ott.nsectime elif t is str: if len(value) <= ott.string.DEFAULT_LENGTH: return str else: return ott.string[len(value)] else: return t types = [get_type_of_value(v) for v in data[key]] res, _ = utils.get_type_that_includes(types) return res columns = {key: get_type_of_column(key) for key in data} expression_columns = [] header_columns = {} for key in list(columns): header_columns[key] = columns[key] # converting values of datetime columns to onetick expressions if columns[key] is ott.nsectime: data[key] = [datetime_to_expr(v) for v in data[key]] header_columns[key] = get_type_of_column(key) expression_columns.append(key) transposed_data = [[csv_rep(value[i]) for key, value in data.items()] for i in range(length)] text_header = ",".join(f"{ott.type2str(v)} {k}" for k, v in header_columns.items()) text_data = "\n".join([",".join(data_row) for data_row in transposed_data]) if use_absolute_time: del columns["time"] else: del columns["offset"] del columns["offset_part"] super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(columns=columns, db=db, tick_type=tick_type, use_absolute_time=use_absolute_time, text_header=text_header, text_data=text_data, expression_columns=expression_columns), **columns, ) def base_ep(self, columns, db, tick_type, use_absolute_time, text_header, text_data, expression_columns=None): node = Source( otq.CsvFileListing( discard_timestamp_column=True, time_assignment="_START_TIME", field_delimiters="','", quote_chars='"""', handle_escaped_chars=True, file_contents=text_data, first_line_is_title=False, fields=text_header, ), **columns, ) update_node_tick_type(node, tick_type, db) if use_absolute_time: # don't trust UpdateField node.sink(otq.AddField(field='____TMP____', value="EVAL_EXPRESSION(time, 'datetime')")) node.sink(otq.UpdateField(field="TIMESTAMP", value="____TMP____")) node.sink(otq.Passthrough(fields="time,____TMP____", drop_fields="True")) node.sink(otq.OrderBy(order_by="TIMESTAMP ASC")) else: node.sink(otq.OrderBy(order_by="offset ASC")) node.sink(otq.UpdateField(field="TIMESTAMP", value="dateadd(offset_part, offset, TIMESTAMP, _TIMEZONE)")) node.sink(otq.Passthrough(fields="offset,offset_part", drop_fields="True")) node.sink(otq.OrderBy(order_by="TIMESTAMP ASC")) for column in expression_columns or []: # don't trust UpdateField node.sink(otq.RenameFields(f'{column}=____TMP____')) node.sink(otq.AddField(field=column, value="EVAL_EXPRESSION(____TMP____, 'datetime')")) node.sink(otq.Passthrough(fields='____TMP____', drop_fields=True)) node.sink(otq.Table(keep_input_fields=True, fields=', '.join(f'nsectime {column}' for column in expression_columns))) return node def TTicks(data): """ .. deprecated:: 1.3.101 Transposed Ticks format. Parameters ---------- data: list list of list, where the first sublist is the header, and other are values """ warnings.warn("The nice and helpful function `TTicks` is going to be deprecated. " "You could use the `Ticks` to pass data in the same format there", DeprecationWarning) dt = {} for inx, key in enumerate(data[0]): dt[key] = [sub_list[inx] for sub_list in data[1:]] return Ticks(dt)
[docs]class Empty(Source): """ Empty data source Parameters ---------- db: str Name of the database from which to take schema. symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>` Symbol(s) from which data should be taken. tick_type: str, Name of the tick_type from which to take schema. start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`onetick.py.adaptive` Time interval from which the data should be taken. schema: schema to use in case db and/or tick_type are not set Examples -------- We can define schema: >>> data = otp.Empty(A=str, B=int) >>> otp.run(data) Empty DataFrame Columns: [] Index: [] >>> data.columns() {'A': <class 'str'>, 'B': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>, '_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>, '_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>} Or we can get schema from the database: >>> data = otp.Empty(db='SOME_DB', tick_type='TT') >>> data.columns() {'X': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>, '_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>, '_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>} """ def __init__( self, db=utils.adaptive_to_default, symbol=utils.adaptive_to_default, tick_type=None, start=utils.adaptive, end=utils.adaptive, **schema, ): if self._try_default_constructor(**schema): return columns = {} if tick_type and db != configuration.config.get('default_db') and db is not utils.adaptive_to_default: try: db_obj = onetick.py.db._inspection.DB(db) params = {'tick_type': tick_type} if end is not utils.adaptive: params['end'] = end columns = db_obj.schema(**params) except Exception: pass # do not raise an exception if no data found, because it is empty _source and does not matter else: columns = schema super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **columns ) def base_ep(self, db): if db is utils.adaptive_to_default: db = configuration.config.get('default_db') src = Source(otq.TickGenerator(fields="long ___NOTHING___=0")) if db is None: src.tick_type('TICK_GENERATOR') else: src.tick_type(db + "::TICK_GENERATOR") return src
[docs]class CSV(Source): """ Construct source based on CSV file. There are several steps determining column types. 1. Initially, all column treated as ``str``. 2. If column name in CSV title have format ``type COLUMNNAME``, it will change type from ``str`` to specified type. 3. All column type are determined automatically from its data. 4. You could override determined types in ``dtype`` argument explicitly. 5. ``converters`` argument is applied after ``dtype`` and could also change column type. Parameters ---------- filepath_or_buffer: str, os.PathLike, FileBuffer Path to CSV file or :class:`file buffer <FileBuffer>` timestamp_name: str, default "Time" Name of TIMESTAMP column used for ticks. Used only if it is exists in CSV columns, otherwise ignored. first_line_is_title: bool Use first line of CSV file as a source for column names and types. If CSV file is started with # symbol, this parameter **must** be ``True``. - If ``True``, column names are inferred from the first line of the file, it is not allowed to have empty name for any column. - If ``False``, first line is processed as data, column names will be COLUMN_1, ..., COLUMN_N. You could specify column names in ``names`` argument. names: list, optional List of column names to use, or None. Length must be equal to columns number in file. Duplicates in this list are not allowed. dtype: dict, optional Data type for columns, as dict of pairs {column_name: type}. Will convert column type from ``str`` to specified type, before applying converters. converters: dict, optional Dict of functions for converting values in certain columns. Keys are column names. Function must be valid callable with ``onetick.py`` syntax, example:: converters={ "time_number": lambda c: c.apply(otp.nsectime), "stock": lambda c: c.str.lower(), } Converters applied *after* ``dtype`` conversion. order_ticks: bool, optional If ``True`` and ``timestamp_name`` column are used, then source will order tick by time. Note, that if ``False`` and ticks are not ordered in sequence, then OneTick will raise Exception in runtime. drop_index: bool, optional if ``True`` and 'Index' column is in the csv file then this column will be removed. change_date_to: datetime, date, optional change date from a timestamp column to a specific date. Default is None, means not changing timestamp column. See also -------- **CSV_FILE_LISTING** OneTick event processor Examples -------- Simple CSV file reading >>> data = otp.CSV(os.path.join(csv_path, "data.csv")) >>> otp.run(data) Time time_number px side 0 2003-12-01 00:00:00.000 1656690986953602304 30.89 Buy 1 2003-12-01 00:00:00.001 1656667706281508352 682.88 Buy Read CSV file and get timestamp for ticks from specific field. You need to specify query start/end interval including all ticks. >>> data = otp.CSV(os.path.join(csv_path, "data.csv"), ... timestamp_name="time_number", ... converters={"time_number": lambda c: c.apply(otp.nsectime)}, ... start=otp.dt(2010, 8, 1), ... end=otp.dt(2022, 9, 2)) >>> otp.run(data) Time px side 0 2022-07-01 11:56:26.953602304 30.89 Buy 1 2022-07-01 05:28:26.281508352 682.88 Buy """ def __init__(self, filepath_or_buffer=None, timestamp_name: Union[str, None] = "Time", first_line_is_title: bool = True, names: Union[list, None] = None, dtype: dict = {}, converters: dict = {}, order_ticks=False, drop_index=True, change_date_to=None, **kwargs): if self._try_default_constructor(**kwargs): return obj_to_inspect = filepath_or_buffer if isinstance(filepath_or_buffer, utils.FileBuffer): obj_to_inspect = io.StringIO(filepath_or_buffer.get()) if isinstance(obj_to_inspect, str) and not os.path.exists(obj_to_inspect): # if not found, probably, CSV file is located in OneTick CSV_FILE_PATH, check it for inspect_by_pandas() csv_paths = otp.utils.get_config_param(os.environ["ONE_TICK_CONFIG"], "CSV_FILE_PATH", default="") if csv_paths: for csv_path in csv_paths.split(","): csv_path = os.path.join(csv_path, obj_to_inspect) if os.path.exists(csv_path): obj_to_inspect = csv_path break columns, default_types, forced_title = _csv_inspector.inspect_by_pandas( obj_to_inspect, first_line_is_title, names) if "TIMESTAMP" in columns: raise ValueError( "It is not allowed to have 'TIMESTAMP' columns, because it is reserved name in OneTick" ) if "Time" in columns and timestamp_name != "Time": raise ValueError( "It is not allowed to have 'Time' column not used as timestamp field." ) ep_fields = ",".join( f'{ott.type2str(dtype)} {column}' if issubclass(dtype, otp.string) else column for column, dtype in columns.items() ) to_drop = [] if "TICK_STATUS" in columns: del columns["TICK_STATUS"] to_drop.append("TICK_STATUS") if "Index" in columns and drop_index: del columns["Index"] to_drop.append("Index") # determine start and end dates start = kwargs.get("start", utils.adaptive) end = kwargs.get("end", utils.adaptive) for t in dtype: if t not in columns: raise ValueError(f"dtype '{t}' not found in columns list") columns[t] = dtype[t] has_time = False if timestamp_name in columns: has_time = True # remove to resolve exception in Source.__init__ if timestamp_name == "Time": del columns["Time"] # redefine start/end time for change_date_to if change_date_to: start = dt.datetime(change_date_to.year, change_date_to.month, change_date_to.day) end = ott.next_day(start) if isinstance(filepath_or_buffer, utils.FileBuffer): symbols = 'DUMMY' else: # str, because there might passed an os.PathLike object symbols = str(filepath_or_buffer) super().__init__( _symbols=symbols, _start=start, _end=end, _base_ep_func=partial(self.base_ep, filepath_or_buffer, columns, forced_title, default_types, has_time, to_drop, timestamp_name, change_date_to, order_ticks, start, end, first_line_is_title, ep_fields, converters,), **columns, ) # fake run converters to set proper schema if converters: for column, converter in converters.items(): self.schema[column] = converter(self[column]).dtype if has_time and timestamp_name in self.schema: if self.schema[timestamp_name] not in [ott.nsectime, ott.msectime]: raise ValueError(f"CSV converter for {timestamp_name} is converting to {self.schema[timestamp_name]}" "type, but expected resulted type is ott.msectime or ott.nsectime") # remove timestamp_name column, if we use it as TIMESTAMP source if has_time and timestamp_name != "Time": del self[timestamp_name] def base_ep(self, filepath_or_buffer, columns, forced_title, default_types, has_time, to_drop, timestamp_name, change_date_to, order_ticks, start, end, first_line_is_title, ep_fields, converters={},): # initialize Source and set schema to columns. file_contents = '' if isinstance(filepath_or_buffer, utils.FileBuffer): file_contents = filepath_or_buffer.get() csv = Source( otq.CsvFileListing( field_delimiters="','", time_assignment="_START_TIME", # we don't use EP's first_line_is_title, because EP raise errror on empty column name, # and we explicitly define name for such columns in FIELDS arg. # but if first line started with # (forced_title=True), then this param ignored :( first_line_is_title=False, fields=ep_fields, file_contents=file_contents ), **columns, ) if first_line_is_title and not forced_title: # remove first line with titles for columns. csv.sink(otq.DeclareStateVariables(variables="long __TICK_INDEX=0")) csv.sink(otq.PerTickScript("STATE::__TICK_INDEX = STATE::__TICK_INDEX + 1;")) csv.sink(otq.WhereClause(discard_on_match=False, where="STATE::__TICK_INDEX > 1")) # set tick type to ANY csv.tick_type("LOCAL::ANY") # check whether need to update types, because if column type is not specified in header # then by default column has string type in OneTick update_columns = {} for name, dtype in columns.items(): if not issubclass(dtype, str) and name not in default_types: update_columns[name] = dtype for name, dtype in update_columns.items(): if dtype is int: csv.sink(otq.UpdateField(field=name, value="atol(" + name + ")")) elif dtype is float: csv.sink(otq.UpdateField(field=name, value="atof(" + name + ")")) elif dtype is ott.msectime: csv.sink(otq.UpdateField(field=name, value='"1970/01/01 00:00:00.000"', where=name + '=""')) csv.sink(otq.UpdateField(field=name, value=f'parse_time("%Y/%m/%d %H:%M:%S.%q",{name},_TIMEZONE)')) elif dtype is ott.nsectime: csv.sink(otq.UpdateField(field=name, value='"1970/1/1 00:00:00.000"', where=name + '=""')) csv.sink(otq.UpdateField(field=name, value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J",{name},_TIMEZONE)')) else: raise TypeError(f"Unsupported type '{dtype}'") # run converters if converters: for column, converter in converters.items(): if csv[column].dtype is not otp.nsectime and converter(csv[column]).dtype is otp.nsectime: # workaround for resolve bug on column type changing: # https://onemarketdata.atlassian.net/browse/PY-416 csv[f'_T_{name}'] = converter(csv[column]) del csv[column] csv[column] = csv[f'_T_{name}'] del csv[f'_T_{name}'] else: csv[column] = converter(csv[column]) if has_time: # if timestamp_name column is defined in the csv, then apply tick time adjustment if timestamp_name in converters: # we assume that if timestamp_name field in converters, # then it is already converted to otp.dt csv.sink( otq.UpdateField( field="TIMESTAMP", value=timestamp_name, allow_unordered_output_times=True, ) ) else: if change_date_to: change_date_to = change_date_to.strftime("%Y/%m/%d") csv.sink(otq.UpdateField(field="Time", value=f'"{change_date_to}" + substr({timestamp_name}, 10)')) # by default we parse timestamp_name into TIMESTAMP field # from typical/default Time format from OneTick dump csv.sink( otq.UpdateField( field="TIMESTAMP", value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J", {timestamp_name}, _TIMEZONE)', allow_unordered_output_times=True, ) ) # drop source timestamp_name field in favor of new TIMESTAMP field to_drop.append(timestamp_name) else: # default time for ticks are increaing from 0 csv.sink(otq.DeclareStateVariables(variables="long __TIMESTAMP_INC__ = 0")) csv.sink(otq.UpdateField(field="TIMESTAMP", value="TIMESTAMP + STATE::__TIMESTAMP_INC__")) csv.sink(otq.UpdateField(field="STATE::__TIMESTAMP_INC__", value="STATE::__TIMESTAMP_INC__ +1")) if order_ticks: csv.sort('TIMESTAMP', inplace=True) if to_drop: csv.sink(otq.Passthrough(fields=",".join(to_drop), drop_fields="True")) return csv
class Trades(Source): """ Trade source object. add 'PRICE' and 'SIZE' fields to schema """ def __init__(self, db=utils.adaptive_to_default, symbol=utils.adaptive, date=None, start=utils.adaptive, end=utils.adaptive, **kwargs): if db is utils.adaptive_to_default: db = configuration.config.default_db if date: start, end = date.start, date.end super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs ) self.schema['PRICE'] = float self.schema['SIZE'] = int def base_ep(self, db): db = str(db) src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True)) src.tick_type(db + "::TRD") return src class Quotes(Source): def __init__(self, db=utils.adaptive_to_default, symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, **kwargs): if db is utils.adaptive_to_default: db = configuration.config.default_db super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs ) self.schema['ASK_PRICE'] = float self.schema['BID_PRICE'] = float self.schema['ASK_SIZE'] = int self.schema['BID_SIZE'] = int def base_ep(self, db): db = str(db) src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True)) src.tick_type(db + "::QTE") return src class NBBO(Source): def __init__(self, db="TAQ_NBBO", symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, **kwargs): super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs ) self.schema['ASK_PRICE'] = float self.schema['BID_PRICE'] = float self.schema['ASK_SIZE'] = int self.schema['BID_SIZE'] = int def base_ep(self, db): db = str(db) src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True)) src.tick_type(db + "::NBBO") return src
[docs]class Query(Source): def __init__( self, query_object=None, out_pin=utils.adaptive, symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, params=None, **kwargs, ): """ Create data source object from .otq file or query object Parameters ---------- query_object: path or :class:`query` query to use as a data source out_pin: str query output pin name symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>` Symbol(s) from which data should be taken. start, end : :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>` or utils.adaptive Time interval from which the data should be taken. params: dict params to pass to query. Only applicable to string ``query_object`` """ if self._try_default_constructor(**kwargs): return if params is None: params = {} # Ignore because of the "Only @runtime_checkable protocols can be used with instance and class checks" if isinstance(query_object, (str, os.PathLike)): # type: ignore query_object = query(str(query_object), **params) elif isinstance(query_object, query): if len(params) > 0: raise ValueError("Cannot pass both params and a query() (not str) query_object parameter") else: raise ValueError("query_object parameter has to be either a str (path to the query) or a query object") if symbol == utils.adaptive: if not query_object.graph_info.has_unbound_sources: symbol = None super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(query_object, out_pin), **kwargs ) def base_ep(self, query, out_pin): nested = otq.NestedOtq(query.path, query.str_params) graph = query.graph_info if out_pin is utils.adaptive: if len(graph.nested_outputs) == 1: return Source(nested[graph.nested_outputs[0].NESTED_OUTPUT]) elif len(graph.nested_outputs) > 1: raise Exception( f'Query "{query.query_name}" has multiple outputs, but you have not ' "specified which one should be used. You could specify it" ' using "out_pin" parameter of the Query constructor.' ) else: # no output return Source(nested, _has_output=False) else: existed_out_pins = set(map(operator.attrgetter("NESTED_OUTPUT"), graph.nested_outputs)) if out_pin not in existed_out_pins: raise Exception( f'Query "{query.query_name}" does not have the "{out_pin}" output, there are only following ' f"output pins exist: {','.join(existed_out_pins)}" ) return Source(nested[out_pin])
[docs]class query: """ Constructs a query object with a certain path. Keyword arguments specify query parameters. You also can pass an instance of ``otp.query.config`` class as the second positional argument to specify a query. Parameters ---------- path : str path to an .otq file. If path is relative, then it's assumed that file is located in one of the directories specified in OneTick ``OTQ_FILE_PATH`` configuration variable. If there are more than one query in the file, then its name should be specified in the format ``<path>::<query-name>``. Also prefix ``remote://<database-name>::`` can be used to specify if query is located on the remote server. config: optional ``otp.query.config`` object. params: parameters for the query. Raises ------ ValueError, TypeError Examples -------- >>> otp.query('/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14) # doctest: +SKIP >>> otp.query('remote://DATABASE::/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14) # doctest: +SKIP """
[docs] class config: """ The config allows to specify different query options. """ special_values = {"input"} def __init__(self, output_columns=None): """ Parameters ---------- output_columns : str, list, dict, optional The parameter defines what the outputs columns are. Default value is ``None`` that means no output fields after applying query for every output pin. The ``input`` value means that output columns are the same as inputs for every output pin A list of tuples allows to define output columns with their types; for example [('x', int), ('y', float), ...]. Applicable for every output pin. A dict allows to specify output columns for every output pin. Raises ------ TypeError, ValueError """ if output_columns is not None: if isinstance(output_columns, list): self.validate_columns(output_columns) elif isinstance(output_columns, dict): for pin, columns in output_columns.items(): if not isinstance(pin, str): raise TypeError(f"Name of pin '{type(pin)}' is of non-str type '%s'") else: self.validate_columns(columns) elif not isinstance(output_columns, str): raise TypeError(f'"output_columns" does not support value of the "{type(output_columns)}" type') if isinstance(output_columns, str): if output_columns not in self.special_values: raise ValueError(f'Config does not support "{output_columns}" value') self.output_columns = output_columns
[docs] def validate_list_item(self, item): if isinstance(item, str): if item not in self.special_values: raise ValueError(f"Value {item} is not supported.") else: if not isinstance(item, (tuple, list)) or (len(item) != 2) or not isinstance(item[0], str): raise TypeError("Value %s is not a name-type tuple.")
[docs] def validate_columns(self, columns): if isinstance(columns, str): if columns not in self.special_values: raise ValueError(f"A pin has invalid output columns definition: '{columns}'") elif isinstance(columns, list): if columns.count("input") > 1: raise ValueError(f"More than one 'input' value in {columns}") for item in columns: self.validate_list_item(item) else: raise TypeError(f"A pin's columns definition is of unsupported type '{type(columns)}'")
[docs] def get_output_columns_for_pin(self, out_pin_name): if isinstance(self.output_columns, dict): if out_pin_name not in self.output_columns: raise ValueError(f"Pin {out_pin_name} wasn't declared in the config") else: return self.output_columns[out_pin_name] else: return self.output_columns
[docs] def apply(self, out_pin_name, src): """ Applying specified logic on a ceration object. Used internally in the functions.apply_query """ columns_descriptor = self.get_output_columns_for_pin(out_pin_name) if columns_descriptor is None: # drop columns by default, because we don't know # how an external query changes data schema src.drop_columns() elif columns_descriptor == "input": pass else: if "input" not in columns_descriptor: src.drop_columns() for item in columns_descriptor: if item != "input": src[item]
def __init__(self, path, *config, **params): path = str(path) if path.startswith('remote://'): self.path = path remote, path = path.split('::', maxsplit=1) else: self.path = f"remote://{configuration.config.get('default_db', 'LOCAL')}::" + path self.query_path, self.query_name = utils.query_to_path_and_name(path) # if query_path does not exist, then we try # to resolve it with OTQ_PATH assuming that # a relative path is passed if not os.path.exists(self.query_path): otq_path = utils.get_config_param(os.environ["ONE_TICK_CONFIG"], "OTQ_FILE_PATH", "") self.query_path = utils.abspath_to_query_by_otq_path(otq_path, self.query_path) if self.query_name is None: # it seems that query name was not passed, then try to find it queries = query_inspector.get_queries(self.query_path) if len(queries) > 1: raise Exception(f"{self.query_path} has more than one query, " f"but you have not specified which one to use.") self.query_name = queries[0] # prepare parameters self._str_params = None self.params = params self.update_params() # prepare configs if len(config) > 1: raise ValueError(f"It is allowed to specify only one config object, but passed {len(config)}") elif len(config) == 1: if not isinstance(config[0], self.config): raise TypeError( f'It is expected to see config of the "query.config" type, but got "{type(config[0])}"' ) self.config = config[0] else: self.config = self.config() self.graph_info = query_inspector.get_query_info(self.query_path, self.query_name) def __call__(self, *ticks, **pins): for key, value in pins.items(): if not isinstance(value, Source): raise ValueError(f'Input "{key}" pin does not support "{type(value)}" type') if len(pins) == 0 and len(ticks) == 1: if len(self.graph_info.nested_inputs) != 1: raise Exception( f'It is expected the query "{self.query_path}" to have one input, but it' f" has {len(self.graph_info.nested_inputs)}" ) pins[self.graph_info.nested_inputs[0].NESTED_INPUT] = ticks[0] elif len(pins) > 0 and len(ticks) == 0: pass elif len(pins) == 0 and len(ticks) == 0: # it is the valid case, when query has no input pins pass else: raise ValueError("It is allowed to pass only one non-specified input") outputs = self._outputs() outputs.query = self outputs.in_sources = pins return outputs class _outputs(object): def __getitem__(self, key): output_pins = [] if type(key) is tuple: output_pins = list(key) elif isinstance(key, str): output_pins = [key] elif key is None: # No output pass else: raise ValueError(f'Output pins can not be of "{type(key)}" type') return onetick.py.functions.apply_query( self.query, in_sources=self.in_sources, output_pins=output_pins, **self.query.params )
[docs] def to_eval_string(self): """Converts query object to `eval` string""" res = '"' + self.path + '"' if self.params: res += f', "{self._params_to_str(self.params, with_expr=True)}"' return "eval(" + res + ")"
[docs] def update_params(self, **new_params): if new_params: self.params.update(new_params)
@property def str_params(self): """Query parameters converted to string""" if self._str_params is None: self._str_params = self._params_to_str(self.params) return self._str_params @staticmethod def _params_to_str(params, *, with_expr=False): """ converts param to str Parameters ---------- params: dict Parameters as dict(name=value) with_expr: If true return all expression in expr() function Returns ------- result: str string representation of parameters ready for query evaluation """ def to_str(v): if isinstance(v, list): return "\\,".join(map(to_str, v)) else: if with_expr: is_dt = ott.is_time_type(v) if is_dt: v = ott.value2str(v) result = query._escape_quotes_in_eval(v) if isinstance(v, _Operation) and getattr(v, "name", None) != "_SYMBOL_NAME" or is_dt: result = f"expr({result})" else: result = query._escape_characters_in_query_param(str(v)) return result return ",".join(key + "=" + to_str(value) for key, value in params.items()) @staticmethod def _escape_quotes_in_eval(v): return str(v).translate(str.maketrans({"'": r"\'", '"': r'\"'})) @staticmethod def _escape_characters_in_query_param(result): # 0 - no need to add backslash, 1 - need to add char_map = [0] * len(result) # put 1 between two quotes symbols open_char = None last_inx = 0 for inx, c in enumerate(result): if open_char == c: open_char = None continue if not open_char and c == "'" or c == '"': open_char = c last_inx = inx + 1 continue if open_char: char_map[inx] = 1 # clean open tail if necessary if open_char: char_map[last_inx:] = [0] * (len(result) - last_inx) # apply mapping res = [] last_esc = False # do not add esc if the previous one is already esc n_brackets_in_expr_block = 0 # do not escape in expr(...) for inx, c in enumerate(result): if c == "(": if n_brackets_in_expr_block: n_brackets_in_expr_block += 1 elif result[inx - 4:inx] == "expr": n_brackets_in_expr_block = 1 if c == ")" and n_brackets_in_expr_block: n_brackets_in_expr_block -= 1 if c in _QUERY_PARAM_SPECIAL_CHARACTERS and char_map[inx] == 0: if not last_esc and not n_brackets_in_expr_block: c = "\\" + c last_esc = c == "\\" res.append(c) return "".join(res)
class Orders(Source): def __init__(self, db="S_ORDERS_FIX", symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, **kwargs): super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs ) self.schema['ID'] = str self.schema['BUY_FLAG'] = int self.schema['SIDE'] = str self.schema['STATE'] = str self.schema['ORDTYPE'] = str self.schema['PRICE'] = float self.schema['PRICE_FILLED'] = float self.schema['QTY'] = int self.schema['QTY_FILLED'] = int def base_ep(self, db): db = str(db) src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True)) src.tick_type(db + "::ORDER") return src _db_doc = _param_doc( name='db', desc=""" Name(s) of the database or the database object(s). """, str_annotation='str, list of str, :class:`otp.DB <onetick.py.DB>`', default=None, str_default='None', ) _symbol_doc = _param_doc( name='symbol', desc=""" Symbol(s) from which data should be taken. """, str_annotation='str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`', default=utils.adaptive, str_default=' :py:class:`onetick.py.adaptive`', ) _tick_type_doc = _param_doc( name='tick_type', desc=""" Tick type of the data. If not specified, all ticks from `db` will be taken. If ticks can't be found or there are many databases specified in `db` then default is "TRD". """, str_annotation='str, list of str', default=utils.adaptive, str_default=' :py:class:`onetick.py.adaptive`', ) _start_doc = _param_doc( name='start', desc=""" Start of the interval from which the data should be taken. Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time limits from the rest of the graph. """, str_annotation=( ':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,' ' :py:class:`onetick.py.adaptive`' ), default=utils.adaptive, str_default=' :py:class:`onetick.py.adaptive`', ) _end_doc = _param_doc( name='end', desc=""" End of the interval from which the data should be taken. Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time limits from the rest of the graph. """, str_annotation=( ':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,' ' :py:class:`onetick.py.adaptive`' ), default=utils.adaptive, str_default=' :py:class:`onetick.py.adaptive`', ) _date_doc = _param_doc( name='date', desc=""" Allows to specify a whole day instead of passing explicitly ``start`` and ``end`` parameters. If it is set along with the ``start`` and ``end`` parameters then last two are ignored. """, str_annotation=":class:`datetime.datetime`, :class:`otp.datetime <onetick.py.datetime>`, optional", default=None, ) _schema_policy_doc = _param_doc( name='schema_policy', desc=""" Schema deduction policy: - 'manual' The resulting schema is a combination of ``desired_schema`` and database schema. Compatibility with database schema will not be checked. - 'manual_strict' The resulting schema will be exactly ``desired_schema``. Compatibility with database schema will not be checked. - 'tolerant' The resulting schema is a combination of ``desired_schema`` and database schema. If the database schema can be deduced, it's checked to be type-compatible with a ``desired_schema``, and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays. - 'tolerant_strict' The resulting schema will be ``desired_schema`` if it's not empty. Otherwise, database schema is used. If the database schema can be deduced, it's checked if it lacks fields from the ``desired_schema`` and it's checked to be type-compatible with a ``desired_schema`` and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays. - 'fail' The same as 'tolerant', but if the database schema can't be deduced, raises an Exception. - 'fail_strict' The same as 'tolerant_strict', but if the database schema can't be deduced, raises an Exception. """, str_annotation="'tolerant', 'tolerant_strict', 'fail', 'fail_strict', 'manual', 'manual_strict'", default=None, ) _guess_schema_doc = _param_doc( name='guess_schema', desc=""" .. deprecated:: 1.3.16 Use ``schema_policy`` parameter instead. """, annotation=bool, default=None, ) _identify_input_ts_doc = _param_doc( name='identify_input_ts', desc=""" If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks. """, annotation=bool, default=False, ) _back_to_first_tick_doc = _param_doc( name='back_to_first_tick', desc=""" Determines how far back to go looking for the latest tick before ``start`` time. If one is found, it is inserted into the output time series with the timestamp set to ``start`` time. Note: it will be rounded to int, so otp.Millis(999) will be 0 seconds. """, str_annotation=('int, :ref:`offset <datetime_offsets>`, ' ':class:`otp.expr <onetick.py.expr>`, ' ':py:class:`~onetick.py.Operation`'), default=0, ) _keep_first_tick_timestamp_doc = _param_doc( name='keep_first_tick_timestamp', desc=""" If set, new field with this name will be added to source. This field contains original timestamp of the tick that was taken from before the start time of the query. For all other ticks value in this field will be equal to the value of Time field. This parameter is ignored if ``back_to_first_tick`` is not set. """, annotation=str, default=None, ) _presort_doc = _param_doc( name='presort', desc=""" Add the presort EP in case of bound symbols. Applicable only when ``symbols`` is not None. By default, it is set to True if ``symbols`` are set and to False otherwise. """, annotation=bool, default=utils.adaptive, str_default=' :py:class:`onetick.py.adaptive`', ) _concurrency_doc = _param_doc( name='concurrency', desc=""" Specifies number of CPU cores to utilize for the ``presort`` By default, the value from otp.config.default_concurrency is used. """, annotation=int, default=None, ) _batch_size_doc = _param_doc( name='batch_size', desc=""" Specifies the query batch size for the ``presort``. By default, the value from otp.config.default_batch_size is used. """, annotation=int, default=None, ) _desired_schema_doc = _param_doc( name='desired_schema', desc=""" List of <column name> -> <column type> pairs that the source is expected to have. If the type is irrelevant, provide None as the type in question. """, str_annotation='type[str]', kind=inspect.Parameter.VAR_KEYWORD, ) DATA_SOURCE_DOC_PARAMS = [ _db_doc, _symbol_doc, _tick_type_doc, _start_doc, _end_doc, _date_doc, _schema_policy_doc, _guess_schema_doc, _identify_input_ts_doc, _back_to_first_tick_doc, _keep_first_tick_timestamp_doc, _presort_doc, _concurrency_doc, _batch_size_doc, _desired_schema_doc, ]
[docs]class DataSource(Source): POLICY_MANUAL = "manual" POLICY_MANUAL_STRICT = "manual_strict" POLICY_TOLERANT = "tolerant" POLICY_TOLERANT_STRICT = "tolerant_strict" POLICY_FAIL = "fail" POLICY_FAIL_STRICT = "fail_strict" _VALID_POLICIES = frozenset([POLICY_MANUAL, POLICY_MANUAL_STRICT, POLICY_TOLERANT, POLICY_TOLERANT_STRICT, POLICY_FAIL, POLICY_FAIL_STRICT]) _PROPERTIES = Source._PROPERTIES + ["_p_db", "_p_strict", "_p_schema"] def __get_schema(self, db, start, schema_policy): schema = {} if start is utils.adaptive: start = None # means that use the last date with data if isinstance(db, list): ''' This case of a merge, since we need to get combined schema across different tick types and dbs ''' for t_db in db: _db = t_db.split(':')[0] _tt = t_db.split(':')[-1] db_obj = onetick.py.db._inspection.DB(_db) if schema_policy == self.POLICY_TOLERANT and start: # repeating the same logic as in db_obj.last_date start = db_obj.last_not_empty_date(start, days_back=5, tick_type=_tt) schema.update(db_obj.schema(date=start, tick_type=_tt)) if db is None or isinstance(db, _SymbolParamColumn): ''' In this case we can't get schema, because db is calculated dynamicatlly. Set to empty to indicate that in this case we expect the manualy set schema. ''' schema = {} return schema def __prepare_schema(self, db, start, end, schema_policy, guess_schema, desired_schema): if guess_schema is not None: warnings.warn( "guess_schema flag is deprecated; use schema_policy argument instead", DeprecationWarning, ) if schema_policy is not None: raise ValueError("guess_schema and schema_policy cannot be set at the same time") if guess_schema: schema_policy = self.POLICY_FAIL else: schema_policy = self.POLICY_MANUAL if schema_policy is None: schema_policy = self.POLICY_TOLERANT if schema_policy not in self._VALID_POLICIES: raise ValueError(f"Invalid schema_policy; allowed values are: {self._VALID_POLICIES}") actual_schema = {} if schema_policy not in (self.POLICY_MANUAL, self.POLICY_MANUAL_STRICT): actual_schema = self.__get_schema(db, start, schema_policy) dbs = ', '.join(db if isinstance(db, list) else []) if len(actual_schema) == 0: if schema_policy in (self.POLICY_FAIL, self.POLICY_FAIL_STRICT): raise Exception(f'No ticks found in database(-s) {dbs}') # lets try to use at least something return desired_schema.copy() for k, v in desired_schema.items(): field_type = actual_schema.get(k, None) if field_type is None: if self._p_strict or schema_policy in (self.POLICY_TOLERANT, self.POLICY_FAIL): raise ValueError(f"Database(-s) {dbs} schema has no {k} field") elif not issubclass(field_type, v): raise ValueError( f"Database(-s) {dbs} schema field {k} has type {field_type}, but {v} was requested" ) if not self._p_strict: desired_schema.update(actual_schema) table_schema = desired_schema.copy() if not self._p_strict: # in this case we will table only fields specified by user table_schema = { k: v for k, v in table_schema.items() if k not in actual_schema } return table_schema def __prepare_dates(self, date): if isinstance(date, ott.datetime) or isinstance(date, ott.date): start = date.start end = date.end if isinstance(date, dt.datetime) or isinstance(date, dt.date): start = dt.datetime(date.year, date.month, date.day) end = start + dt.timedelta(days=1, milliseconds=-1) return start, end def __prepare_db_tick_type(self, db, tick_type, symbol, start, end): if isinstance(db, list): ''' If everything is correct then this branch should leave the `db` var as a list of databases with tick types and the `tick_type` var is None. Valid cases: - Fully defined case. The `db` parameter has a list of databases where every database has a tick type, when the `tick_type` parameter has default value or None (for backward compatibility) - Partially defined case. The `db` parameter has a list of databases but not every database has a tick type, and meantime the `tick_type` is passed to not None value. In that case databases without tick type are exetended with a tick type from the `tick_type` parameter - No defined case. The `db` parameter has a list of databases and every database there has no tick type, and the `tick_type` is set to not None value. In that case every database is extended with the tick type from the `tick_type`. ''' def db_converter(_db): if isinstance(_db, DB): return _db.name else: return _db db = [db_converter(_db) for _db in db] res = all(('::' in _db and _db[-1] != ':' for _db in db)) if res: if tick_type is utils.adaptive or tick_type is None: tick_type = None # tick types is specified for all databases else: raise Exception('The `tick_type` is set as a parameter ' 'and also as a part of the `db` parameter' 'for every database') else: dbs_without_tt = [_db.split(':')[0] for _db in db if '::' not in _db or _db[-1] == ':'] if tick_type is utils.adaptive: tick_type = 'TRD' # default one for backward compatibility and testing usecase if tick_type is None: raise Exception('The tick type is not set for databases: ' + ', '.join(dbs_without_tt)) else: # extend databases with missing tick types from the tick tick parameter dbs_with_tt = [_db for _db in db if '::' in _db and _db[-1] != ':'] db = dbs_with_tt + [_db + '::' + tick_type for _db in dbs_without_tt] tick_type = None if isinstance(db, (DB, inspect_DB)): db = db.name # ... and we go to the next branch if isinstance(db, str): ''' The resulting `db` var contains a list with string value, that has the `db` concatenated with the `tick_type`. ''' if '::' in db: if tick_type is utils.adaptive or tick_type is None: tick_type = db.split(':')[-1] db = db.split('::')[0] else: raise Exception('The `tick_type` is set as a parameter ' 'and also as a part of the `db` parameter') else: if tick_type is utils.adaptive or tick_type is None: db_obj = onetick.py.db._inspection.DB(db) # try to find at least one common tick type # through all days tick_types = None if start is utils.adaptive: start = db_obj.last_date end = db_obj.last_date if start and end: # could be None if there is no data t_start = start while t_start <= end: t_tts = set(db_obj.tick_types(t_start)) t_start += dt.timedelta(days=1) if len(t_tts) == 0: continue if tick_types is None: tick_types = t_tts else: tick_types &= t_tts if len(tick_types) == 0: raise Exception(f'It seems that there is no common ' f'tick types for dates from {start} ' f'to {end}. Please specify a tick ' 'type') if tick_types is None: if tick_type is utils.adaptive: tick_types = ['TRD'] # the default one else: raise Exception(f'Could not find any data in from {start} ' f' to {end}. Could you check that tick type, ' ' database and date range are correct.') if len(tick_types) != 1: raise Exception('The tick type is not specified, found ' 'multiple tick types in the database : ' + ', '.join(tick_types)) tick_type = tick_types.pop() if not isinstance(tick_type, str) and isinstance(tick_type, Iterable): db = [f'{db}::{tt}' for tt in tick_type] else: db = [db + '::' + tick_type] tick_type = None if isinstance(db, _SymbolParamColumn): ''' Do nothing, because we don't know whether db will come with the tick type or not. The only one thing that definetely we know that tick_type can not be utils.adpative ''' if tick_type is utils.adaptive: # TODO: need to test this case raise Exception('The `db` is set to the symbol param, in that case ' 'the `tick_type` should be set explicitly to some value ' 'or to None') if db is None: ''' This case means that database comes with the symbol name, then tick type should be defined ''' if tick_type is utils.adaptive or tick_type is None: raise Exception('The `db` is not specified that means database is ' 'expected to be defined with the symbol name. ' 'In that case the `tick_type` should be defined.') if not isinstance(tick_type, str) and isinstance(tick_type, Iterable): tick_type = '+'.join(tick_type) return db, tick_type @docstring(parameters=DATA_SOURCE_DOC_PARAMS, add_self=True) def __init__( self, db=None, symbol=utils.adaptive, tick_type=utils.adaptive, start=utils.adaptive, end=utils.adaptive, date=None, schema_policy=None, guess_schema=None, identify_input_ts=False, back_to_first_tick=0, keep_first_tick_timestamp=None, *, symbols=None, presort=utils.adaptive, batch_size=None, concurrency=None, **desired_schema, ): """ Construct a source providing data from a given ``db``. Examples --------- Symbol can be a collection >>> # OTdirective: snippet-name:fetch data.simple; >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2']) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 -3 2 2003-12-01 00:00:00.001 2 3 2003-12-01 00:00:00.001 -2 4 2003-12-01 00:00:00.002 3 5 2003-12-01 00:00:00.002 -1 Source also can be passed as symbols, in such case magic named column SYMBOL_NAME will be transform to symbol and all other columns will be symbol parameters >>> # OTdirective: snippet-name:fetch data.symbols as a source; >>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2']) >>> data = otp.DataSource(db='SOME_DB', symbols=symbols, tick_type='TT') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 -3 2 2003-12-01 00:00:00.001 2 3 2003-12-01 00:00:00.001 -2 4 2003-12-01 00:00:00.002 3 5 2003-12-01 00:00:00.002 -1 Default schema policy is `tolerant`. >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float, date=otp.dt(2022, 3, 1)) >>> data.schema {'PRICE': <class 'float'>, 'SIZE': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=int, date=otp.dt(2022, 3, 1)) Traceback (most recent call last): ... ValueError: Database(-s) NYSE_TAQ::TRD schema field PRICE has type <class 'float'>, but <class 'int'> was requested Schema policy `manual` uses exactly desired_schema: >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float, \ date=otp.dt(2022, 3, 1), schema_policy='manual') >>> data.schema {'PRICE': <class 'float'>} Schema policy 'fail' raise an exception if the schema cannot be deduced >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2021, 3, 1), \ schema_policy='fail') Traceback (most recent call last): ... Exception: No ticks found in database(-s) NYSE_TAQ::TRD """ if self._try_default_constructor(**desired_schema): return # for cases when we want to explicitly convert into string, # it might be symbol param or join_with_query parameter if isinstance(tick_type, _ParamColumn): tick_type = str(tick_type)[1:-1] if date: # TODO: write a warning in that case start, end = self.__prepare_dates(date) db, tick_type = self.__prepare_db_tick_type(db, tick_type, symbol, start, end) self._p_db = db self._p_strict = schema_policy in (self.POLICY_FAIL_STRICT, self.POLICY_TOLERANT_STRICT, self.POLICY_MANUAL_STRICT) self._p_schema = self.__prepare_schema(db, # tick type is embedded into the db start, end, schema_policy, guess_schema, desired_schema) if symbols is not None: if symbol is utils.adaptive or symbol is None: symbol = symbols else: # TODO: test it raise Exception('You have set the `symbol` and `symbols` parameters' 'together, it is not allowed. Please, clarify parameters') if isinstance(symbol, Symbols) and symbol._p_db is None: symbol = Symbols.duplicate(symbol, db=db) if identify_input_ts: if "SYMBOL_NAME" in desired_schema: raise Exception() # TODO: think about how user could workaround it desired_schema["SYMBOL_NAME"] = str if "TICK_TYPE" in desired_schema: raise Exception() desired_schema["TICK_TYPE"] = str # unobvious way to convert otp.Minute/Hour/... to number of seconds if type(back_to_first_tick).__name__ == '_DatePartCls': back_to_first_tick = int((ott.dt(0) + back_to_first_tick).timestamp()) if isinstance(back_to_first_tick, _Operation): back_to_first_tick = otp.expr(back_to_first_tick) if back_to_first_tick != 0 and keep_first_tick_timestamp: desired_schema[keep_first_tick_timestamp] = ott.nsectime if ( isinstance(symbol, Source) or hasattr(symbol, "__iter__") and not isinstance(symbol, dict) and not isinstance(symbol, str) or isinstance(symbol, query) or isinstance(symbol, _QueryEvalWrapper) ): super().__init__( _start=start, _end=end, _base_ep_func=lambda: self._base_ep_for_cross_symbol( db, symbol, tick_type, identify_input_ts=identify_input_ts, back_to_first_tick=back_to_first_tick, keep_first_tick_timestamp=keep_first_tick_timestamp, presort=presort, batch_size=batch_size, concurrency=concurrency, ), **desired_schema ) else: super().__init__( _symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep( db, tick_type, identify_input_ts=identify_input_ts, back_to_first_tick=back_to_first_tick, keep_first_tick_timestamp=keep_first_tick_timestamp, ), **desired_schema ) @property def db(self): return self._p_db @staticmethod def _create_source(passthrough_ep, back_to_first_tick=0, keep_first_tick_timestamp=None): """Create graph that save original timestamp of first tick if needed""" if back_to_first_tick != 0 and keep_first_tick_timestamp: src = Source(otq.Passthrough()) src.sink(otq.AddField(field=keep_first_tick_timestamp, value='TIMESTAMP')) src.sink(passthrough_ep) return src return Source(passthrough_ep) def _table_schema(self, src): return src.table(**self._p_schema, strict=self._p_strict) def base_ep(self, db, tick_type, identify_input_ts, back_to_first_tick=0, keep_first_tick_timestamp=None): if db is not None: if isinstance(db, list): str_db = "+".join(db) else: str_db = str(db) if tick_type: if isinstance(db, _SymbolParamColumn): str_db = f"expr({str_db} + '::{tick_type}')" # TODO: test else: if "::" not in str_db: str_db += "::" + tick_type else: if isinstance(db, _SymbolParamColumn): str_db = f"expr({str_db})" # TODO: test else: str_db = tick_type if isinstance(db, list) or isinstance(db, _SymbolParamColumn): src = self._create_source(otq.Passthrough(go_back_to_first_tick=back_to_first_tick), back_to_first_tick=back_to_first_tick, keep_first_tick_timestamp=keep_first_tick_timestamp) src.sink(otq.Merge(identify_input_ts=identify_input_ts)) else: params = dict(go_back_to_first_tick=back_to_first_tick) if identify_input_ts: params["fields"] = "SYMBOL_NAME,TICK_TYPE" params["drop_fields"] = True src = self._create_source(otq.Passthrough(**params), back_to_first_tick=back_to_first_tick, keep_first_tick_timestamp=keep_first_tick_timestamp) src.tick_type(str_db) src = self._table_schema(src) return src def _base_ep_for_cross_symbol( self, db, symbol, tick_type, identify_input_ts, back_to_first_tick=0, keep_first_tick_timestamp=None, presort=utils.adaptive, batch_size=None, concurrency=None, ): tmp_otq = TmpOtq() if isinstance(symbol, _QueryEvalWrapper): symbol = symbol.to_eval_string(tmp_otq=tmp_otq) elif isinstance(symbol, query): symbol = symbol.to_eval_string() elif isinstance(symbol, Source): symbol = self._convert_symbol_to_string(symbol, tmp_otq) if db is not None: if isinstance(db, list): tick_type = "+".join(db) else: tick_type = f"{db}::{tick_type}" src = self._create_source(otq.Passthrough(go_back_to_first_tick=back_to_first_tick), back_to_first_tick=back_to_first_tick, keep_first_tick_timestamp=keep_first_tick_timestamp) if presort is utils.adaptive: presort = True if presort: if batch_size is None: batch_size = otp.config.default_batch_size if concurrency is None: concurrency = ( otp.config.default_concurrency if otp.config.default_concurrency is not None # otq.Presort does not support None else '' ) src.sink( otq.Presort(batch_size=batch_size, max_concurrency=concurrency).symbols(symbol).tick_type(tick_type) ) src.sink(otq.Merge(identify_input_ts=identify_input_ts)) else: src.sink( otq.Merge(identify_input_ts=identify_input_ts).symbols(symbol).tick_type(tick_type) ) src._tmp_otq.merge(tmp_otq) src = self._table_schema(src) return src
Custom = DataSource # for backward compatiblity, previously we had only Custom
[docs]class Symbols(Source): """ Construct a source that returns ticks with information about symbols in a database. The SYMBOL_NAME field is populated with symbol names. The TICK_TYPE field contains corresponding tick type (enabled by the ``show_tick_type`` parameter). Parameters ---------- db: str Name of the database where to search symbols tick_type: str Tick type to use. Default is `ANY` start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`onetick.py.adaptive` Time interval from which the data should be taken. date: :py:class:`datetime.date` Alternative way of setting instead of start/end times keep_db: bool Flag that indicates whether symbols should have a db prefix. pattern: str SQL syntax patter for symbols. Default is '%' for_tick_type: str Fetch only symbols belong to this tick type, if specified. show_tick_type: bool Add the TICK_TYPE column with the information about tick type symbology: str The destination symbology for a symbol name translation. Translation is performed, if destination symbology is not empty and is different from that of the queried database. show_original_symbols: bool Switches original symbol name propagation as a tick field ORIGINAL_SYMBOL_NAME if symbol name translation is performed (if `symbology` is set). Note that if this parameter is set to True, database symbols with missing translations are also propagated. Note ---- Additional fields that can be added to Symbols will be converted to symbol parameters See also -------- | :ref:`Symbols guide <Symbols>` | **FIND_DB_SYMBOLS** OneTick event processor Examples -------- This class can be used to get a list of all symbols in the database: >>> symbols = otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1)) >>> otp.run(symbols) Time SYMBOL_NAME 0 2022-03-01 AAP 1 2022-03-01 AAPL Also this class can be used to specify symbols for the main query: >>> symbols = otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1)) >>> data = otp.DataSource('NYSE_TAQ', tick_type='TRD', date=otp.dt(2022, 3, 1)) >>> result = otp.run(data, symbols=symbols) >>> result['AAPL'] Time PRICE SIZE 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> result['AAP'] Time PRICE 0 2022-03-01 00:00:00.000 45.37 1 2022-03-01 00:00:00.001 45.41 Additional fields of the ``otp.Symbols`` can be used in the main query as symbol parameters: >>> symbols = otp.Symbols('SOME_DB', show_tick_type=True, keep_db=True) >>> symbols['PARAM'] = symbols['SYMBOL_NAME'] + '__' + symbols['TICK_TYPE'] >>> data = otp.DataSource('SOME_DB') >>> data['S_PARAM'] = data.Symbol.PARAM >>> data = otp.merge([data], symbols=symbols) >>> otp.run(data) Time X S_PARAM 0 2003-12-01 00:00:00.000 1 SOME_DB::S1__TT 1 2003-12-01 00:00:00.000 -3 SOME_DB::S2__TT 2 2003-12-01 00:00:00.001 2 SOME_DB::S1__TT 3 2003-12-01 00:00:00.001 -2 SOME_DB::S2__TT 4 2003-12-01 00:00:00.002 3 SOME_DB::S1__TT 5 2003-12-01 00:00:00.002 -1 SOME_DB::S2__TT """ _PROPERTIES = Source._PROPERTIES + ["_p_db", "_p_pattern", "_p_start", "_p_end", "_p_for_tick_type", "_p_keep_db"] def __init__( self, db=None, tick_type="ANY", start=utils.adaptive, end=utils.adaptive, date=None, find_params=None, keep_db=False, pattern='%', for_tick_type=None, show_tick_type=False, symbology='', show_original_symbols=False, **kwargs ): if self._try_default_constructor(**kwargs): return self._p_db = db self._p_pattern = pattern self._p_start = start self._p_end = end self._p_keep_db = keep_db self._p_for_tick_type = for_tick_type if date: if isinstance(date, ott.datetime) or isinstance(date, ott.date): start = date.start end = date.end _symbol = utils.adaptive if db: if isinstance(db, list): _symbol = [f"{str(_db).split(':')[0]}::" for _db in db] # noqa else: _symbol = f"{str(db).split(':')[0]}::" # noqa _find_params = find_params if find_params is not None else {} _find_params.setdefault('pattern', pattern) if for_tick_type: _find_params['tick_type_field'] = for_tick_type _find_params.setdefault('show_tick_type', show_tick_type) _find_params.setdefault('symbology', symbology) _find_params.setdefault('show_original_symbols', show_original_symbols) super().__init__( _symbols=_symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(ep_tick_type=tick_type, keep_db=keep_db, **_find_params), ) self.schema['SYMBOL_NAME'] = str if _find_params['show_tick_type']: self.schema['TICK_TYPE'] = str if _find_params['symbology'] and _find_params['show_original_symbols']: self.schema['ORIGINAL_SYMBOL_NAME'] = str def base_ep(self, ep_tick_type, keep_db, **params): src = Source(otq.FindDbSymbols(**params)) src.tick_type(ep_tick_type) src.schema['SYMBOL_NAME'] = str if not keep_db: src["SYMBOL_NAME"] = src["SYMBOL_NAME"].str.regex_replace('.*::', '') return src @staticmethod def duplicate(obj, db=None): return Symbols(db=obj._p_db if db is None else db, pattern=obj._p_pattern, start=obj._p_start, end=obj._p_end, keep_db=obj._p_keep_db, for_tick_type=obj._p_for_tick_type)
def default_date_converter(date): return pd.to_datetime(date, format='%Y%m%d%H%M%S.%f') def to_timestamp_nanos(date, date_converter, tz): date = date_converter(date) if isinstance(date, ott.dt): date = date.ts else: date = pd.to_datetime(date) return date.tz_localize(tz) def LocalCSVTicks(path, start=utils.adaptive, end=utils.adaptive, date_converter=default_date_converter, additional_date_columns=None, converters=None, tz=None, ): """ Loads ticks from csv file, and creating otp.Ticks object from them Parameters ---------- path: str Absolute path to csv file start: datetime object Start of the query interval end: datetime object End of the query interval date_converter: A converter from string to datetime format, by default used only to TIMESTAMP column additional_date_columns: Other columns to convert to datetime format converters: Non default converters to columns from strings tz: timezone Returns ------- otp.Ticks """ if tz is None: tz = configuration.config.tz c = {'TIMESTAMP': partial(to_timestamp_nanos, date_converter=date_converter, tz=tz)} if converters is not None: c.update(converters) if additional_date_columns is not None: c.update({column: partial(to_timestamp_nanos, date_converter=date_converter, tz=tz, ) for column in additional_date_columns}) df = pd.read_csv(path, converters=c) df['TS_'] = df['TIMESTAMP'] df['SYMBOL_NAME'] = df['#SYMBOL_NAME'] d = df.to_dict(orient='list') del d['TIMESTAMP'] del d['#SYMBOL_NAME'] ticks = Ticks(d, start=start, end=end) ticks['TIMESTAMP'] = ticks['TS_'] ticks = ticks.drop('TS_') return ticks class SymbologyMapping(Source): _PROPERTIES = Source._PROPERTIES + ["_p_dest_symbology"] def __init__(self, dest_symbology: str = None, tick_type: str = None, start=utils.adaptive, end=utils.adaptive, symbols=utils.adaptive, **desired_schema): if self._try_default_constructor(**desired_schema): return if not dest_symbology or not tick_type: raise TypeError("Missing required argument: 'dest_symbology' or 'tick_type'") self._p_dest_symbology = dest_symbology super().__init__( _symbols=symbols, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(dest_symbology, tick_type), **desired_schema ) self.schema['MAPPED_SYMBOL_NAME'] = str self.schema['END_DATETIME'] = ott.nsectime @property def dest_symbology(self): return self._p_dest_symbology def base_ep(self, dest_symbology, tick_type): src = Source(otq.SymbologyMapping(dest_symbology=dest_symbology)) src.tick_type(tick_type) return src class SplitQueryOutputBySymbol(Source): def __init__(self, query=None, symbol_field=None, single_invocation=False, db=utils.adaptive_to_default, tick_type=utils.adaptive, start=utils.adaptive, end=utils.adaptive, symbols=utils.adaptive, **desired_schema): if self._try_default_constructor(**desired_schema): return if isinstance(query, Source): # TODO: support already existing queries query = query.copy() otq_query = query._save_as_tmp_otq() q_start, q_end, _ = query._get_date_range() if start is utils.adaptive and end is utils.adaptive: start, end = q_start, q_end else: raise Exception('Non supported type of the `query` is specified') if db is utils.adaptive_to_default: db = configuration.config.get('default_db') if tick_type is utils.adaptive: tick_type = 'SPLIT_BY_SYMBOL' super().__init__( _symbols=symbols, _start=start, _end=end, _base_ep_func=partial(self.build, db, tick_type, symbol_field, otq_query, single_invocation), **desired_schema ) def build(self, db, tick_type, symbol_field_name, otq_query, single_invocation): src = Source(otq.SplitQueryOutputBySymbol(otq_query=otq_query, symbol_field_name=str(symbol_field_name), ensure_single_invocation=single_invocation)) if db: tick_type = str(db) + f'::{tick_type}' src.tick_type(tick_type) return src
[docs]def by_symbol(src: Source, symbol_field, single_invocation=None, db=utils.adaptive_to_default, tick_type=utils.adaptive, start=utils.adaptive, end=utils.adaptive, ) -> Source: """ Create a separate data series for each unique value of ``symbol_field`` in the output of ``src``. ``src`` must specify enough parameters to be run (e.g., symbols, query range). A typical use case is to split a single data series (e.g., from a CSV file) into separate data series by symbol. This method is a source. Parameters ---------- src: Source a query which output is to be split by ``symbol_field`` symbol_field: str the name of the field carrying symbol name in the ``src`` query single_invocation: bool, optional ``True`` means that the ``src`` query is run once and the result stored in memory speeding up the execution. ``False`` means that the ``src`` query is run for every symbol of the query saving memory but slowing down query execution. ``None`` means that this flag will be automatically chosen as ``True`` for CSV sources and ``False`` for other sources. db: str, optional Database for running the query. Doesn't affect the ``src`` query. The default value is ``otp.config['default_db']``. tick_type: str, optional Tick type for the query. Doesn't affect the ``src`` query. start: otp.dt, optional By default it is taken from the ``src`` start time end: otp.dt, optional By default it is taken from the ``src`` end time See also -------- **SPLIT_QUERY_OUTPUT_BY_SYMBOL** OneTick event processor Examples -------- >>> executions = otp.CSV( # doctest: +SKIP ... otp.utils.file(os.path.join(cur_dir, 'data', 'example_events.csv')), ... converters={"time_number": lambda c: c.apply(otp.nsectime)}, ... timestamp_name="time_number", ... start=otp.dt(2022, 7, 1), ... end=otp.dt(2022, 7, 2), ... order_ticks=True ... )[['stock', 'px']] >>> csv = otp.by_symbol(executions, 'stock') # doctest: +SKIP >>> trd = otp.DataSource( # doctest: +SKIP ... db='NYSE_TAQ', ... tick_type='TRD', ... start=otp.dt(2022, 7, 1), ... end=otp.dt(2022, 7, 2) ... )[['PRICE', 'SIZE']] >>> data = otp.funcs.join_by_time([csv, trd]) # doctest: +SKIP >>> result = otp.run(data, symbols=executions.distinct(keys='stock')[['stock']], concurrency=8) # doctest: +SKIP >>> result['THG'] # doctest: +SKIP Time stock px PRICE SIZE 0 2022-07-01 11:37:56.432947200 THG 148.02 146.48 1 >>> result['TFX'] # doctest: +SKIP Time stock px PRICE SIZE 0 2022-07-01 11:39:45.882808576 TFX 255.61 251.97 1 >>> result['BURL'] # doctest: +SKIP Time stock px PRICE SIZE 0 2022-07-01 11:42:35.125718016 BURL 137.53 135.41 2 """ if single_invocation is None: if isinstance(src, CSV): single_invocation = True else: single_invocation = False result = SplitQueryOutputBySymbol(src, symbol_field=symbol_field, single_invocation=single_invocation, db=db, tick_type=tick_type, start=start, end=end) result.schema.set(**src.schema) return result
[docs]@docstring(parameters=OB_SNAPSHOT_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS) def ObSnapshot(*args, **kwargs): """ Construct a source providing order book snapshot for a given ``db``. This is just a shortcut for otp.DataSource + otp.agg.ob_snapshot. See also -------- | :class:`onetick.py.DataSource` | :meth:`onetick.py.Source.ob_snapshot` | :func:`onetick.py.agg.ob_snapshot` | **OB_SNAPSHOT** OneTick event processor Examples --------- >>> data = otp.ObSnapshot(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time PRICE UPDATE_TIME SIZE LEVEL BUY_SELL_FLAG 0 2003-12-04 2.0 2003-12-01 00:00:00.003 6 1 1 1 2003-12-04 5.0 2003-12-01 00:00:00.004 7 1 0 """ aggregation_params = { param.name: kwargs.pop(param.name, param.default) for _, param in OB_SNAPSHOT_DOC_PARAMS } src = otp.DataSource(*args, **kwargs) return otp.agg.ob_snapshot(**aggregation_params).apply(src)
[docs]@docstring(parameters=OB_SNAPSHOT_WIDE_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS) def ObSnapshotWide(*args, **kwargs): """ Construct a source providing order book wide snapshot for a given ``db``. This is just a shortcut for otp.DataSource + otp.agg.ob_snapshot_wide. See also -------- | :class:`onetick.py.DataSource` | :meth:`onetick.py.Source.ob_snapshot_wide` | :func:`onetick.py.agg.ob_snapshot_wide` | **OB_SNAPSHOT_WIDE** OneTick event processor Examples --------- >>> data = otp.ObSnapshotWide(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time BID_PRICE BID_UPDATE_TIME BID_SIZE ASK_PRICE ASK_UPDATE_TIME ASK_SIZE LEVEL 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 1 """ aggregation_params = { param.name: kwargs.pop(param.name, param.default) for _, param in OB_SNAPSHOT_WIDE_DOC_PARAMS } src = otp.DataSource(*args, **kwargs) return otp.agg.ob_snapshot_wide(**aggregation_params).apply(src)
[docs]@docstring(parameters=OB_SNAPSHOT_FLAT_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS) def ObSnapshotFlat(*args, **kwargs): """ Construct a source providing order book flat snapshot for a given ``db``. This is just a shortcut for otp.DataSource + otp.agg.ob_snapshot_flat. See also -------- | :class:`onetick.py.DataSource` | :meth:`onetick.py.Source.ob_snapshot_flat` | :func:`onetick.py.agg.ob_snapshot_flat` | **OB_SNAPSHOT_FLAT** OneTick event processor Examples --------- >>> data = otp.ObSnapshotFlat(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time BID_PRICE1 BID_UPDATE_TIME1 BID_SIZE1 ASK_PRICE1 ASK_UPDATE_TIME1 ASK_SIZE1 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 """ aggregation_params = { param.name: kwargs.pop(param.name, param.default) for _, param in OB_SNAPSHOT_FLAT_DOC_PARAMS } src = otp.DataSource(*args, **kwargs) return otp.agg.ob_snapshot_flat(**aggregation_params).apply(src)