import datetime
import datetime as dt
import inspect
import operator
import os
import sys
import warnings
import io
import math
import string
from functools import partial
from typing import Optional, Union, Type, Iterable
import onetick.py as otp
import onetick.query as otq
import pandas as pd
import onetick.py.core._source
import onetick.py.functions
import onetick.py.db._inspection
from onetick.py.core._internal._param_column import _ParamColumn
from onetick.py.core._source._symbol_param_column import _SymbolParamColumn
from onetick.py.core._source._symbol_param_source import _SymbolParamSource
from onetick.py.core._source.tmp_otq import TmpOtq
from onetick.py.core.column import _Column
from onetick.py.core.eval_query import _QueryEvalWrapper
from onetick.py.core.source import Source, _Source # _Source for backward compatibility
from onetick.py.core.column_operations.base import Raw
from . import types as ott
from . import utils, configuration
from .core import _csv_inspector, query_inspector
from .core.column_operations._methods.methods import is_arithmetical
from .core.column_operations.base import _Operation
from .db.db import DB
from .db._inspection import DB as inspect_DB
from .compatibility import is_supported_bucket_units_for_tick_generator
from .aggregations.order_book import (
OB_SNAPSHOT_DOC_PARAMS, OB_SNAPSHOT_WIDE_DOC_PARAMS, OB_SNAPSHOT_FLAT_DOC_PARAMS
)
from .aggregations._docs import _bucket_interval_doc, _bucket_time_doc
from onetick.py.docs.utils import docstring, param_doc
_QUERY_PARAM_SPECIAL_CHARACTERS = "=,"
AdaptiveTickType = Union[str, Type[utils.adaptive]]
def update_node_tick_type(node: "Source", tick_type: AdaptiveTickType, db: Optional[str] = None):
"""Update node tick_type according to db name and tick_type.
Don't change tick type for adaptive tick type.
Parameters
----------
node: Source
node to set tick_type on
tick_type: AdaptiveTickType
string tick type or :py:class:`onetick.py.adaptive`
db: Optional[str]
optional db name
"""
# do not change tick type for adaptive `tick_type`
if not isinstance(tick_type, type) and tick_type is not utils.adaptive:
if db:
node.tick_type(db + "::" + tick_type)
else:
node.tick_type(tick_type)
[docs]class Tick(Source):
@docstring(parameters=[_bucket_interval_doc, _bucket_time_doc], add_self=True)
def __init__(
self,
offset=0,
offset_part='millisecond',
time: ott.datetime = None,
timezone_for_time=None,
symbol=utils.adaptive_to_default,
db=utils.adaptive_to_default,
start=utils.adaptive,
end=utils.adaptive,
tick_type: Optional[AdaptiveTickType] = None,
bucket_time: str = "start",
bucket_interval: int = 0,
bucket_units: str = utils.adaptive,
**kwargs,
):
"""
Generate single tick object
Parameters
----------
offset: int, default=0
tick timestamp offset from query start time in `offset_part`
offset_part: one of [nanosecond, millisecond, second, minute, hour, day, dayofyear, weekday, week, month, quarter, year], default=millisecond #noqa
unit of time to calculate ``offset`` from.
time: :py:class:`otp.datetime <onetick.py.datetime>`
fixed time to set to all ticks.
Note that this time should be inside time interval set by ``start`` and ``end`` parameters
or by query time range.
timezone_for_time: str
timezone of the ``time``
symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`
Symbol(s) from which data should be taken.
db: str
Database to use for tick generation
start: :py:class:`otp.datetime <onetick.py.datetime>`
start time for tick generation. By default the start time of the query will be used.
end: :py:class:`otp.datetime <onetick.py.datetime>`
end time for tick generation. By default the end time of the query will be used.
tick_type: str
Special tick_type `TICK_GENERATOR` will be used by default. You can use
:py:class:`onetick.py.adaptive` for the value if you want to use sink node tick type
instead of defining your own.
bucket_units: 'seconds', 'days' or 'months'
Unit for value in ``bucket_interval``.
Default is 'seconds'.
kwargs:
dictionary of columns names with their values.
See also
--------
| **TICK_GENERATOR** OneTick event processor
| :py:class:`otp.Ticks <onetick.py.Ticks>`
Examples
--------
Simple usage, generate single tick:
>>> t = otp.Tick(A=1, B='string', C=3.14, D=otp.dt(2000, 1, 1, 1, 1, 1, 1))
>>> otp.run(t)
Time A B C D
0 2003-12-01 1 string 3.14 2000-01-01 01:01:01.000001
Generate one tick for each day in a week:
>>> t = otp.Tick(A=1, start=otp.dt(2023, 1, 1), end=otp.dt(2023, 1, 8), bucket_interval=24 * 60 * 60)
>>> otp.run(t)
Time A
0 2023-01-01 1
1 2023-01-02 1
2 2023-01-03 1
3 2023-01-04 1
4 2023-01-05 1
5 2023-01-06 1
6 2023-01-07 1
Generate tick every hour and add 1 minute offset to ticks' timestamps:
>>> t = otp.Tick(A=1, offset=1, offset_part='minute', bucket_interval=60 * 60)
>>> t.head(5)
Time A
0 2003-12-01 00:01:00 1
1 2003-12-01 01:01:00 1
2 2003-12-01 02:01:00 1
3 2003-12-01 03:01:00 1
4 2003-12-01 04:01:00 1
Generate tick every hour and set fixed time:
>>> t = otp.Tick(A=1, time=otp.dt(2023, 1, 2, 3, 4, 5, 6), bucket_interval=60 * 60,
... start=otp.dt(2023, 1, 1), end=otp.dt(2023, 1, 8))
>>> t.head(5)
Time A
0 2023-01-02 03:04:05.000006 1
1 2023-01-02 03:04:05.000006 1
2 2023-01-02 03:04:05.000006 1
3 2023-01-02 03:04:05.000006 1
4 2023-01-02 03:04:05.000006 1
"""
if self._try_default_constructor(**kwargs):
return
if len(kwargs) == 0:
raise ValueError("It is not allowed to have a tick without fields")
if time is not None and offset != 0:
raise ValueError("It's not allowed to set parameter 'datetime' and set non-zero offset at the same time")
bucket_time = self._get_bucket_time(bucket_time)
if tick_type is None:
tick_type = "TICK_GENERATOR"
columns = {}
for key, value in kwargs.items():
# the way to skip a field
if value is None:
continue
if inspect.isclass(value):
raise TypeError(f"Tick constructor expects values but not types, {value}")
else:
value_type = ott.get_object_type(value)
if value_type is str:
if isinstance(value, _Column) or is_arithmetical(value):
if value.dtype is not str:
value_type = value.dtype
elif len(value) > ott.string.DEFAULT_LENGTH:
value_type = ott.string[len(value)]
if value_type is bool:
value_type = float
if issubclass(value_type, (ott.datetime, ott.date, dt.datetime, dt.date, pd.Timestamp)):
value_type = ott.nsectime
columns[key] = value_type
super().__init__(
_symbols=symbol,
_start=start,
_end=end,
_base_ep_func=lambda: self.base_ep(db=db,
tick_type=tick_type,
offset=offset,
offset_part=offset_part,
time=time,
timezone_for_time=timezone_for_time,
columns=columns,
bucket_time=bucket_time,
bucket_interval=bucket_interval,
bucket_units=bucket_units,
**kwargs),
**columns,
)
def base_ep(self,
db=utils.adaptive_to_default,
tick_type="TICK_GENERATOR",
offset=0,
offset_part='millisecond',
time=None,
timezone_for_time=None,
columns=None,
bucket_time="start",
bucket_interval=0,
bucket_units=utils.adaptive,
**kwargs):
if columns is None:
columns = {}
if db is utils.adaptive_to_default:
# if default database is not set, tick type will be set without it
# and symbols will have to be specified in otp.run
db = configuration.config.get('default_db')
params = ",".join(
ott.type2str(columns[key]) + " " + str(key) + "=" + ott.value2str(value)
for key, value in kwargs.items()
if value is not None
)
tick_generator_kwargs = {}
if bucket_units is not utils.adaptive and is_supported_bucket_units_for_tick_generator(throw_warning=True):
tick_generator_kwargs['bucket_interval_units'] = bucket_units.upper()
src = Source(
otq.TickGenerator(
bucket_interval=bucket_interval,
bucket_time=bucket_time,
fields=params,
**tick_generator_kwargs,
),
**columns
)
update_node_tick_type(src, tick_type, db)
# TIMESTAMP += offset will add redundant nodes to sort the timestamps.
# No sorting needed for a single tick.
if offset:
src.sink(otq.UpdateField(field="TIMESTAMP",
value=f"dateadd('{offset_part}', {offset}, TIMESTAMP, _TIMEZONE)"))
elif time:
src.sink(otq.UpdateField(field="TIMESTAMP",
value=ott.datetime2expr(time, timezone_naive=timezone_for_time)))
return src
@staticmethod
def _get_bucket_time(bucket_time):
if bucket_time == "BUCKET_START":
warnings.warn("BUCKET_START value is deprecated. Please, use 'start' instead", DeprecationWarning)
elif bucket_time == "BUCKET_END":
warnings.warn("BUCKET_END value is deprecated. Please, use 'end' instead", DeprecationWarning)
elif bucket_time == "start":
bucket_time = "BUCKET_START"
elif bucket_time == "end":
bucket_time = "BUCKET_END"
else:
raise ValueError(f"Only 'start' and 'end' values supported as bucket time, but you've passed {bucket_time}")
return bucket_time
[docs]def Ticks(data=None,
symbol=utils.adaptive_to_default,
db=utils.adaptive_to_default,
start=utils.adaptive,
end=utils.adaptive,
tick_type: Optional[AdaptiveTickType] = None,
timezone_for_time=None,
**inplace_data):
"""
Data source that generates ticks.
Ticks are placed with the 1 millisecond offset from
each other starting from the start of the query interval.
It has ability to change `distance` between ticks using the
special reserved field name ``offset``, that specify time offset
from a previous tick.
Parameters
----------
data: dict, list or pandas.DataFrame, optional
Ticks values
* ``dict`` -- <field_name>: <values>
* ``list`` -- [[<field_names>], [<first_tick_values>], ..., [<n_tick_values>]]
* :pandas:`DataFrame <pandas.DataFrame>` -- DataFrame with ``Time`` column
* ``None`` -- ``inplace_data`` will be used
symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`
Symbol(s) from which data should be taken.
db: str
Database to use for tick generation
start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \
:py:class:`onetick.py.adaptive`
Timestamp for data generation
tick_type: str
tick type for data generation
timezone_for_time: str
timezone for data generation
**inplace_data: list
<field_name>: list(<field_values>)
See also
--------
| **TICK_GENERATOR** OneTick event processor
| :py:class:`otp.Tick <onetick.py.Tick>`
Examples
--------
Pass data in ``dict``
>>> d = otp.Ticks({'A': [1, 2, 3], 'B': [4, 5, 6]})
>>> otp.run(d)
Time A B
0 2003-12-01 00:00:00.000 1 4
1 2003-12-01 00:00:00.001 2 5
2 2003-12-01 00:00:00.002 3 6
Pass ``inplace_data``
>>> d = otp.Ticks(A=[1, 2, 3], B=[4, 5, 6])
>>> otp.run(d)
Time A B
0 2003-12-01 00:00:00.000 1 4
1 2003-12-01 00:00:00.001 2 5
2 2003-12-01 00:00:00.002 3 6
Pass data in ``list``
>>> d = otp.Ticks([['A', 'B'],
... [1, 4],
... [2, 5],
... [3, 6]])
>>> otp.run(d)
Time A B
0 2003-12-01 00:00:00.000 1 4
1 2003-12-01 00:00:00.001 2 5
2 2003-12-01 00:00:00.002 3 6
Using the ``offset`` example
>>> data = otp.Ticks(X=[1, 2, 3], offset=[0, otp.Nano(1), 1])
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00.000000000 1
1 2003-12-01 00:00:00.000000001 2
2 2003-12-01 00:00:00.001000000 3
Using pandas.DataFrame
>>> start_datetime = datetime(2023, 1, 1, 12)
>>> time_array = [start_datetime + otp.Hour(1) + otp.Nano(1)]
>>> a_array = [start_datetime - otp.Day(15) - otp.Nano(7)]
>>> df = pd.DataFrame({'Time': time_array,'A': a_array})
>>> data = otp.Ticks(df)
>>> otp.run(data, start=start_datetime, end=start_datetime + otp.Day(1))
Time A
0 2023-01-01 13:00:00.000000001 2022-12-17 11:59:59.999999993
"""
if tick_type is None:
tick_type = "TICK_GENERATOR"
if db is utils.adaptive_to_default:
db = configuration.config.get('default_db')
if isinstance(data, pd.DataFrame):
if 'Time' not in data.columns:
raise ValueError('Field `Time` is required for constructing an `otp.Source` from `pandas.DataFrame`')
data = data.rename(columns={"Time": "time"})
data = data.to_dict('list')
if data and len(inplace_data) != 0:
raise ValueError("Data can be passed only using either the `data` parameter "
"or inplace through the key-value args")
if isinstance(data, list):
reform = {}
for inx, key in enumerate(data[0]):
reform[key] = [sub_list[inx] for sub_list in data[1:]]
data = reform
if data is None:
if inplace_data:
data = inplace_data
else:
raise ValueError("You don't specify any date to create ticks from. "
"Please, use otp.Empty for creating empty data source")
else:
data = data.copy()
value_len = -1
for key, value in data.items():
if value_len == -1:
value_len = len(value)
else:
if value_len != len(value):
# TODO: write test to cover that case
raise ValueError(
f"It is not allowed to have different columns of different lengths, "
f"some of columns have {value_len} length, but column '{key}', as instance, has {len(value)}"
)
use_absolute_time = False
if "offset" in data:
if "time" in data:
raise ValueError("You cannot specify offset and time at the same time")
else:
if "time" in data:
use_absolute_time = True
else:
data["offset"] = list(range(value_len))
if not use_absolute_time:
offset_values = []
offset_parts = []
for ofv in data['offset']:
if isinstance(ofv, ott.offsets.Tick):
offset_values.append(ofv.n)
offset_parts.append(str(ofv.datepart)[1:-1])
else:
offset_values.append(ofv)
offset_parts.append('millisecond')
data['offset'] = offset_values
data['offset_part'] = offset_parts
if value_len == 1:
columns = {key: value[0] for key, value in data.items()}
return Tick(db=db, symbol=symbol, tick_type=tick_type, start=start, end=end,
timezone_for_time=timezone_for_time, **columns)
else:
# select only columns that do not contain None there to support
# heterogeneous data
not_none_columns = []
for key in data.keys():
data[key] = [float(elem) if isinstance(elem, bool) else elem for elem in data[key]]
for key, value in data.items():
add = True
for v in value:
# we need it, because can't use _Column instances in if-clauses
if isinstance(v, _Column):
continue
if v is None:
add = False
break
if add:
not_none_columns.append(key)
# if a field depends on a symbol parameter, it cannot be csv'd (it's dynamic)
# likewise for otq parameters
# if there's a better way to check whether a value is constant,
# will be glad to hear about it
is_outside_data_dependent = False
for key, value in data.items():
for v in value:
str_rep = str(v)
if ("_SYMBOL_NAME" in str_rep) or ("_SYMBOL_PARAM" in str_rep) or ("$" in str_rep):
is_outside_data_dependent = True
break
# infinity() and (on windows) nan() cannot be natively read from a csv
has_special_values = False
for key, value in data.items():
for v in value:
if isinstance(v, ott._inf) or \
(isinstance(v, ott._nan) or isinstance(v, float) and math.isnan(v)) \
and sys.platform.startswith("win"):
has_special_values = True
break
if (len(not_none_columns) == len(data)) and (not is_outside_data_dependent) and (not has_special_values):
# Data is homogenous; CSV backing can be used
return _DataCSV(data, value_len, db=db, symbol=symbol, tick_type=tick_type, start=start, end=end,
timezone_for_time=timezone_for_time, use_absolute_time=use_absolute_time)
else:
# Fallback is a merge of individual ticks
ticks = []
for inx in range(value_len):
columns = {key: value[inx] for key, value in data.items()}
ticks.append(Tick(db=db, symbol=symbol, tick_type=tick_type, start=start, end=end,
timezone_for_time=timezone_for_time, **columns))
return onetick.py.functions.merge(ticks, align_schema=not_none_columns)
class _DataCSV(Source):
def __init__(
self,
data=None,
length=None,
db=utils.adaptive_to_default,
symbol=utils.adaptive_to_default,
tick_type=None,
start=utils.adaptive,
end=utils.adaptive,
use_absolute_time=False,
timezone_for_time=None,
**kwargs,
):
if self._try_default_constructor(**kwargs):
return
if data is None or length is None:
raise ValueError("'data' and 'length' parameters can't be None")
if db is utils.adaptive_to_default:
db = configuration.config.get('default_db')
def datetime_to_expr(v):
if ott.is_time_type(v):
return ott.datetime2expr(v, timezone_naive=timezone_for_time)
if isinstance(v, ott.nsectime):
# TODO: change to ott.value2str after PY-441
return f'NSECTIME({v})'
if isinstance(v, ott.msectime):
return ott.value2str(v)
raise ValueError(f"Can't convert value {v} to datetime expression")
if use_absolute_time:
# converting values of "time" column to onetick expressions
converted_times = []
for d in data["time"]:
converted_times.append(datetime_to_expr(d))
data["time"] = converted_times
def csv_rep(value):
if issubclass(type(value), str):
return '"' + value.replace("\\", "\\\\").replace('"', '\\"') + '"'
else:
return str(value)
def get_type_of_column(key):
def get_type_of_value(value):
t = ott.get_object_type(value)
if ott.is_time_type(t):
return ott.nsectime
elif t is str:
if len(value) <= ott.string.DEFAULT_LENGTH:
return str
else:
return ott.string[len(value)]
else:
return t
types = [get_type_of_value(v) for v in data[key]]
res, _ = utils.get_type_that_includes(types)
return res
columns = {key: get_type_of_column(key) for key in data}
expression_columns = []
header_columns = {}
for key in list(columns):
header_columns[key] = columns[key]
# converting values of datetime columns to onetick expressions
if columns[key] is ott.nsectime:
data[key] = [datetime_to_expr(v) for v in data[key]]
header_columns[key] = get_type_of_column(key)
expression_columns.append(key)
transposed_data = [[csv_rep(value[i]) for key, value in data.items()] for i in range(length)]
text_header = ",".join(f"{ott.type2str(v)} {k}" for k, v in header_columns.items())
text_data = "\n".join([",".join(data_row) for data_row in transposed_data])
if use_absolute_time:
del columns["time"]
else:
del columns["offset"]
del columns["offset_part"]
super().__init__(
_symbols=symbol,
_start=start,
_end=end,
_base_ep_func=lambda: self.base_ep(columns=columns,
db=db,
tick_type=tick_type,
use_absolute_time=use_absolute_time,
text_header=text_header,
text_data=text_data,
expression_columns=expression_columns),
**columns,
)
def base_ep(self, columns, db, tick_type, use_absolute_time, text_header, text_data, expression_columns=None):
node = Source(
otq.CsvFileListing(
discard_timestamp_column=True,
time_assignment="_START_TIME",
field_delimiters="','",
quote_chars='"""',
handle_escaped_chars=True,
file_contents=text_data,
first_line_is_title=False,
fields=text_header,
),
**columns,
)
update_node_tick_type(node, tick_type, db)
if use_absolute_time:
# don't trust UpdateField
node.sink(otq.AddField(field='____TMP____', value="EVAL_EXPRESSION(time, 'datetime')"))
node.sink(otq.UpdateField(field="TIMESTAMP", value="____TMP____"))
node.sink(otq.Passthrough(fields="time,____TMP____", drop_fields="True"))
node.sink(otq.OrderBy(order_by="TIMESTAMP ASC"))
else:
node.sink(otq.OrderBy(order_by="offset ASC"))
node.sink(otq.UpdateField(field="TIMESTAMP", value="dateadd(offset_part, offset, TIMESTAMP, _TIMEZONE)"))
node.sink(otq.Passthrough(fields="offset,offset_part", drop_fields="True"))
node.sink(otq.OrderBy(order_by="TIMESTAMP ASC"))
for column in expression_columns or []:
# don't trust UpdateField
node.sink(otq.RenameFields(f'{column}=____TMP____'))
node.sink(otq.AddField(field=column, value="EVAL_EXPRESSION(____TMP____, 'datetime')"))
node.sink(otq.Passthrough(fields='____TMP____', drop_fields=True))
node.sink(otq.Table(keep_input_fields=True,
fields=', '.join(f'nsectime {column}' for column in expression_columns)))
return node
def TTicks(data):
"""
.. deprecated:: 1.3.101
Transposed Ticks format.
Parameters
----------
data: list
list of list, where the first sublist is the header, and other are values
"""
warnings.warn("The nice and helpful function `TTicks` is going to be deprecated. "
"You could use the `Ticks` to pass data in the same format there",
DeprecationWarning)
dt = {}
for inx, key in enumerate(data[0]):
dt[key] = [sub_list[inx] for sub_list in data[1:]]
return Ticks(dt)
[docs]class Empty(Source):
"""
Empty data source
Parameters
----------
db: str
Name of the database from which to take schema.
symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`
Symbol(s) from which data should be taken.
tick_type: str,
Name of the tick_type from which to take schema.
start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \
:py:class:`onetick.py.adaptive`
Time interval from which the data should be taken.
schema: schema to use in case db and/or tick_type are not set
Examples
--------
We can define schema:
>>> data = otp.Empty(A=str, B=int)
>>> otp.run(data)
Empty DataFrame
Columns: []
Index: []
>>> data.columns()
{'A': <class 'str'>, 'B': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>,
'_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>,
'_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>}
Or we can get schema from the database:
>>> data = otp.Empty(db='SOME_DB', tick_type='TT')
>>> data.columns()
{'X': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>,
'_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>,
'_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>}
"""
def __init__(
self,
db=utils.adaptive_to_default,
symbol=utils.adaptive_to_default,
tick_type=None,
start=utils.adaptive,
end=utils.adaptive,
**schema,
):
if self._try_default_constructor(**schema):
return
columns = {}
if tick_type and db != configuration.config.get('default_db') and db is not utils.adaptive_to_default:
try:
db_obj = onetick.py.db._inspection.DB(db)
params = {'tick_type': tick_type}
if end is not utils.adaptive:
params['end'] = end
columns = db_obj.schema(**params)
except Exception:
pass # do not raise an exception if no data found, because it is empty _source and does not matter
else:
columns = schema
super().__init__(
_symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **columns
)
def base_ep(self, db):
if db is utils.adaptive_to_default:
db = configuration.config.get('default_db')
src = Source(otq.TickGenerator(fields="long ___NOTHING___=0"))
if db is None:
src.tick_type('TICK_GENERATOR')
else:
src.tick_type(db + "::TICK_GENERATOR")
return src
[docs]def CSV(
filepath_or_buffer=None,
timestamp_name: Union[str, None] = "Time",
first_line_is_title: bool = True,
names: Union[list, None] = None,
dtype: dict = None,
converters: dict = None,
order_ticks=False,
drop_index=True,
change_date_to=None,
auto_increase_timestamps=True,
db='LOCAL',
field_delimiter=',',
handle_escaped_chars=False,
quote_char='"',
**kwargs,
):
"""
Construct source based on CSV file.
There are several steps determining column types.
1. Initially, all column treated as ``str``.
2. If column name in CSV title have format ``type COLUMNNAME``,
it will change type from ``str`` to specified type.
3. All column type are determined automatically from its data.
4. You could override determined types in ``dtype`` argument explicitly.
5. ``converters`` argument is applied after ``dtype`` and could also change column type.
NOTE: Double quotes are not supported in CSV files for escaping quotes in strings,
you should use escape character ``\\`` before the quote instead,
for example: ``"I'm a string with a \\"quotes\\" inside"``. And then set `handle_escaped_chars=True`.
Parameters
----------
filepath_or_buffer: str, os.PathLike, FileBuffer, optional
Path to CSV file or :class:`file buffer <FileBuffer>`. If None value is taken through symbol.
When taken from symbol, symbol must have ``LOCAL::`` prefix.
In that case you should set the columns otherwise schema will be empty.
timestamp_name: str, default "Time"
Name of TIMESTAMP column used for ticks. Used only if it is exists in CSV columns, otherwise ignored.
Output data will be sorted by this column.
first_line_is_title: bool
Use first line of CSV file as a source for column names and types.
If CSV file is started with # symbol, this parameter **must** be ``True``.
- If ``True``, column names are inferred from the first line of the file,
it is not allowed to have empty name for any column.
- If ``False``, first line is processed as data, column names will be COLUMN_1, ..., COLUMN_N.
You could specify column names in ``names`` argument.
names: list, optional
List of column names to use, or None.
Length must be equal to columns number in file.
Duplicates in this list are not allowed.
dtype: dict, optional
Data type for columns, as dict of pairs {column_name: type}.
Will convert column type from ``str`` to specified type, before applying converters.
converters: dict, optional
Dict of functions for converting values in certain columns. Keys are column names.
Function must be valid callable with ``onetick.py`` syntax, example::
converters={
"time_number": lambda c: c.apply(otp.nsectime),
"stock": lambda c: c.str.lower(),
}
Converters applied *after* ``dtype`` conversion.
order_ticks: bool, optional
If ``True`` and ``timestamp_name`` column are used, then source will order tick by time.
Note, that if ``False`` and ticks are not ordered in sequence, then OneTick will raise Exception in runtime.
drop_index: bool, optional
if ``True`` and 'Index' column is in the csv file then this column will be removed.
change_date_to: datetime, date, optional
change date from a timestamp column to a specific date. Default is None, means not changing timestamp column.
auto_increase_timestamps: bool, optional
Only used if provided CSV file does not have a TIMESTAMP column. If ``True``, timestamps of loaded ticks
would start at ``start_time`` and on each next tick, would increase by 1 millisecond.
If ``False``, timestamps of all loaded ticks would be equal to ``start_time``
db: str, optional
Name of a database to define a destination where the csv file will be transported for processing.
``LOCAL`` is default value that means OneTick will process it on the site where a query runs.
field_delimiter: str, optional
A character that is used to tokenize each line of the CSV file.
For a tab character \t (back-slash followed by t) should be specified.
handle_escaped_chars: bool, optional
If set, the backslash char ``\\`` gets a special meaning and everywhere in the input text
the combinations ``\\'``, ``\\"`` and ``\\\\`` are changed correspondingly by ``'``, ``"`` and ``\\``,
which are processed then as regular chars.
Besides, combinations like ``\\x??``, where ?-s are hexadecimal digits (0-9, a-f or A-F),
are changed by the chars with the specified ASCII code.
For example, ``\\x0A`` will be replaced by a newline character, ``\\x09`` will be replaced by tab, and so on.
Default: False
quote_char: str
Character used to denote the start and end of a quoted item. Quoted items can include the delimiter,
and it will be ignored. The same character cannot be marked both as the quote character and as the
field delimiter. Besides, space characters cannot be used as quote.
Default: " (double quotes)
See also
--------
**CSV_FILE_LISTING** OneTick event processor
Examples
--------
Simple CSV file reading
>>> data = otp.CSV(os.path.join(csv_path, "data.csv"))
>>> otp.run(data)
Time time_number px side
0 2003-12-01 00:00:00.000 1656690986953602371 30.89 Buy
1 2003-12-01 00:00:00.001 1656667706281508365 682.88 Buy
Read CSV file and get timestamp for ticks from specific field.
You need to specify query start/end interval including all ticks.
>>> data = otp.CSV(os.path.join(csv_path, "data.csv"),
... timestamp_name="time_number",
... converters={"time_number": lambda c: c.apply(otp.nsectime)},
... start=otp.dt(2010, 8, 1),
... end=otp.dt(2022, 9, 2))
>>> otp.run(data)
Time px side
0 2022-07-01 05:28:26.281508365 682.88 Buy
1 2022-07-01 11:56:26.953602371 30.89 Buy
Path to csv can be passed via symbol with `LOCAL::` prefix:
>>> data = otp.CSV()
>>> otp.run(data, symbols=f"LOCAL::{os.path.join(csv_path, 'data.csv')}")
Time time_number px side
0 2003-12-01 00:00:00.000 1656690986953602371 30.89 Buy
1 2003-12-01 00:00:00.001 1656667706281508365 682.88 Buy
Field delimiters can be set via ``field_delimiters`` parameter:
>>> data = otp.CSV(os.path.join(csv_path, 'data_diff_delimiters.csv'),
... field_delimiter=' ',
... first_line_is_title=False)
>>> otp.run(data)
Time COLUMN_0 COLUMN_1
0 2003-12-01 00:00:00.000 1,2 3
1 2003-12-01 00:00:00.001 4 5,6
Quote char can be set via ``quote_char`` parameter:
>>> data = otp.CSV(os.path.join(csv_path, 'data_diff_quote_chars.csv'),
... quote_char="'",
... first_line_is_title=False)
>>> otp.run(data)
Time COLUMN_0 COLUMN_1
0 2003-12-01 00:00:00.000 1,"2 3"
1 2003-12-01 00:00:00.001 "1 2",3
"""
csv_source = _CSV(
filepath_or_buffer=filepath_or_buffer,
timestamp_name=timestamp_name,
first_line_is_title=first_line_is_title,
names=names,
dtype=dtype,
converters=converters,
order_ticks=order_ticks,
drop_index=drop_index,
change_date_to=change_date_to,
auto_increase_timestamps=auto_increase_timestamps,
db=db,
field_delimiter=field_delimiter,
handle_escaped_chars=handle_escaped_chars,
quote_char=quote_char,
**kwargs,
)
csv_source = csv_source.sort(csv_source['Time'])
return otp.merge([csv_source, otp.Empty(db=db)])
class _CSV(Source):
_PROPERTIES = Source._PROPERTIES + [
"_dtype",
"_names",
"_columns",
"_forced_title",
"_default_types",
"_has_time",
"_to_drop",
"_start",
"_end",
"_ep_fields",
"_symbols",
"_field_delimiter",
"_converters",
"_order_ticks",
"_auto_increase_timestamps",
"_db",
"_drop_index",
"_change_date_to",
"_timestamp_name",
"_filepath_or_buffer",
"_first_line_is_title",
"_handle_escaped_chars",
"_quote_char",
]
def __init__(self,
filepath_or_buffer=None,
timestamp_name: Union[str, None] = "Time",
first_line_is_title: bool = True,
names: Union[list, None] = None,
dtype: dict = None,
converters: dict = None,
order_ticks=False,
drop_index=True,
change_date_to=None,
auto_increase_timestamps=True,
db='LOCAL',
field_delimiter=',',
handle_escaped_chars=False,
quote_char='"',
**kwargs):
self._dtype = dtype or {}
self._names = names
self._converters = converters or {}
if (len(field_delimiter) != 1 and field_delimiter != '\t') or field_delimiter == '"' or field_delimiter == "'":
raise ValueError(f'`field_delimiter` can be single character (except quotes) '
f'or "\t" but "{field_delimiter}" was passed')
self._field_delimiter = field_delimiter
if len(quote_char) > 1:
raise ValueError(f'quote_char should be single char but `{quote_char}` was passed')
if self._field_delimiter == quote_char:
raise ValueError(f'`{self._field_delimiter}` is both field_delimiter and quote_char')
if quote_char in string.whitespace:
raise ValueError('Whitespace can not be a quote_char')
self._quote_char = quote_char
self._order_ticks = order_ticks
self._auto_increase_timestamps = auto_increase_timestamps
self._db = db
self._drop_index = drop_index
self._change_date_to = change_date_to
self._timestamp_name = timestamp_name
self._filepath_or_buffer = filepath_or_buffer
self._first_line_is_title = first_line_is_title
self._handle_escaped_chars = handle_escaped_chars
if self._try_default_constructor(**kwargs):
return
if self._filepath_or_buffer is not None and not isinstance(self._filepath_or_buffer, _SymbolParamSource):
self._columns, self._default_types, self._forced_title, self._symbols = self._parse_file()
else:
self._filepath_or_buffer = None
names = self._names or []
self._columns = {name: str for name in names}
self._default_types = {}
# we don't know it is actually forced, but otherwise we would ignore the first not commented-out line
self._forced_title = self._first_line_is_title
self._symbols = None
self._check_time_column()
for t in self._dtype:
if t not in self._columns:
raise ValueError(f"dtype '{t}' not found in columns list")
self._columns[t] = self._dtype[t]
self._ep_fields = ",".join(
f'{ott.type2str(dtype)} {column}' if issubclass(dtype, otp.string) else column
for column, dtype in self._columns.items()
)
self._to_drop = self._get_to_drop()
self._has_time, self._start, self._end = self._get_start_end(**kwargs)
super().__init__(
_symbols=self._symbols,
_start=self._start,
_end=self._end,
_base_ep_func=self.base_ep,
**self._columns,
)
# fake run converters to set proper schema
if self._converters:
for column, converter in self._converters.items():
self.schema[column] = converter(self[column]).dtype
if self._has_time and self._timestamp_name in self.schema:
if self.schema[self._timestamp_name] not in [ott.nsectime, ott.msectime]:
raise ValueError(f"CSV converter for {self._timestamp_name} is converting to "
f"{self.schema[timestamp_name]} type, but expected resulted type is "
f"ott.msectime or ott.nsectime")
# remove timestamp_name column, if we use it as TIMESTAMP source
if self._has_time and self._timestamp_name != "Time":
del self[self._timestamp_name]
def _check_time_column(self):
if "TIMESTAMP" in self._columns:
raise ValueError(
"It is not allowed to have 'TIMESTAMP' columns, because it is reserved name in OneTick"
)
if "Time" in self._columns and self._timestamp_name != "Time":
raise ValueError(
"It is not allowed to have 'Time' column not used as timestamp field."
)
def _get_to_drop(self):
to_drop = []
if "TICK_STATUS" in self._columns:
del self._columns["TICK_STATUS"]
to_drop.append("TICK_STATUS")
if "Index" in self._columns and self._drop_index:
del self._columns["Index"]
to_drop.append("Index")
return to_drop
def _get_start_end(self, **kwargs):
start = kwargs.get("start", utils.adaptive)
end = kwargs.get("end", utils.adaptive)
has_time = False
if self._timestamp_name in self._columns:
has_time = True
# remove to resolve exception in Source.__init__
if self._timestamp_name == "Time":
del self._columns["Time"]
# redefine start/end time for change_date_to
if self._change_date_to:
start = dt.datetime(self._change_date_to.year, self._change_date_to.month, self._change_date_to.day)
end = ott.next_day(start)
return has_time, start, end
def _parse_file(self):
"""
This function finds the file and get columns names, default types and checks if first line is title via pandas.
Is also sets the correct value for symbols.
"""
obj_to_inspect = self._filepath_or_buffer
if isinstance(obj_to_inspect, utils.FileBuffer):
obj_to_inspect = io.StringIO(obj_to_inspect.get())
if isinstance(obj_to_inspect, str) and not os.path.exists(obj_to_inspect):
# if not found, probably, CSV file is located in OneTick CSV_FILE_PATH, check it for inspect_by_pandas()
csv_paths = otp.utils.get_config_param(os.environ["ONE_TICK_CONFIG"], "CSV_FILE_PATH", default="")
if csv_paths:
for csv_path in csv_paths.split(","):
csv_path = os.path.join(csv_path, obj_to_inspect)
if os.path.exists(csv_path):
obj_to_inspect = csv_path
break
columns, default_types, forced_title = _csv_inspector.inspect_by_pandas(
obj_to_inspect,
self._first_line_is_title,
self._names,
self._field_delimiter,
self._quote_char,
)
if isinstance(self._filepath_or_buffer, utils.FileBuffer):
symbols = 'DUMMY'
else:
# str, because there might passed an os.PathLike object
symbols = str(obj_to_inspect)
return columns, default_types, forced_title, symbols
def base_ep(self):
# initialize Source and set schema to columns.
file_contents = ''
if isinstance(self._filepath_or_buffer, utils.FileBuffer):
file_contents = self._filepath_or_buffer.get()
csv = Source(
otq.CsvFileListing(
field_delimiters=f"'{self._field_delimiter}'",
time_assignment="_START_TIME",
# we use EP's first_line_is_title only when file path is passed through symbol
# otherwise we don't use EP's first_line_is_title, because EP raise error on empty column name,
# and we explicitly define name for such columns in FIELDS arg.
# but if first line started with # (forced_title=True), then this param ignored :(
first_line_is_title=self._filepath_or_buffer is None and self._first_line_is_title,
fields=self._ep_fields,
file_contents=file_contents,
handle_escaped_chars=self._handle_escaped_chars,
quote_chars=f"'{self._quote_char}'",
),
**self._columns,
)
if self._first_line_is_title and not self._forced_title:
# remove first line with titles for columns.
csv.sink(otq.DeclareStateVariables(variables="long __TICK_INDEX=0"))
csv.sink(otq.PerTickScript("STATE::__TICK_INDEX = STATE::__TICK_INDEX + 1;"))
csv.sink(otq.WhereClause(discard_on_match=False, where="STATE::__TICK_INDEX > 1"))
# set tick type to ANY
csv.tick_type(f"{self._db}::ANY")
# check whether need to update types, because if column type is not specified in header
# then by default column has string type in OneTick
update_columns = {}
for name, dtype in self._columns.items():
if not issubclass(dtype, str) and name not in self._default_types:
update_columns[name] = dtype
for name, dtype in update_columns.items():
if dtype is int:
# BE-142 - workaround for converting string to int
# OneTick first convert string to float, and then to int, which leeds to losing precision
csv.sink(otq.AddField(field=f"_TMP_{name}", value="atol(" + name + ")"))
csv.sink(otq.Passthrough(fields=name, drop_fields=True))
csv.sink(otq.AddField(field=f"{name}", value=f"_TMP_{name}"))
csv.sink(otq.Passthrough(fields=f"_TMP_{name}", drop_fields=True))
elif dtype is float:
csv.sink(otq.UpdateField(field=name, value="atof(" + name + ")"))
elif dtype is ott.msectime:
csv.sink(otq.UpdateField(field=name, value='"1970/01/01 00:00:00.000"', where=name + '=""'))
csv.sink(otq.UpdateField(field=name, value=f'parse_time("%Y/%m/%d %H:%M:%S.%q",{name},_TIMEZONE)'))
elif dtype is ott.nsectime:
csv.sink(otq.UpdateField(field=name, value='"1970/1/1 00:00:00.000"', where=name + '=""'))
csv.sink(otq.UpdateField(field=name, value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J",{name},_TIMEZONE)'))
else:
raise TypeError(f"Unsupported type '{dtype}'")
# run converters
if self._converters:
for column, converter in self._converters.items():
if csv[column].dtype is not otp.nsectime and converter(csv[column]).dtype is otp.nsectime:
# workaround for resolve bug on column type changing:
# https://onemarketdata.atlassian.net/browse/PY-416
csv[f'_T_{name}'] = converter(csv[column])
del csv[column]
csv[column] = csv[f'_T_{name}']
del csv[f'_T_{name}']
else:
csv[column] = converter(csv[column])
if self._has_time:
# if timestamp_name column is defined in the csv, then apply tick time adjustment
if self._timestamp_name in self._converters:
# we assume that if timestamp_name field in converters,
# then it is already converted to otp.dt
csv.sink(
otq.UpdateField(
field="TIMESTAMP",
value=self._timestamp_name,
allow_unordered_output_times=True,
)
)
else:
if self._change_date_to:
self._change_date_to = self._change_date_to.strftime("%Y/%m/%d")
csv.sink(otq.UpdateField(field="Time",
value=f'"{self._change_date_to}" + substr({self._timestamp_name}, 10)'))
# by default we parse timestamp_name into TIMESTAMP field
# from typical/default Time format from OneTick dump
csv.sink(
otq.UpdateField(
field="TIMESTAMP",
value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J", {self._timestamp_name}, _TIMEZONE)',
allow_unordered_output_times=True,
)
)
# drop source timestamp_name field in favor of new TIMESTAMP field
self._to_drop.append(self._timestamp_name)
elif self._auto_increase_timestamps:
# default time for ticks are increasing from 0
csv.sink(otq.DeclareStateVariables(variables="long __TIMESTAMP_INC__ = 0"))
csv.sink(otq.UpdateField(
field="TIMESTAMP",
value='DATEADD("millisecond",STATE::__TIMESTAMP_INC__,TIMESTAMP,_TIMEZONE)'))
csv.sink(otq.UpdateField(field="STATE::__TIMESTAMP_INC__", value="STATE::__TIMESTAMP_INC__ + 1"))
if self._order_ticks:
csv.sort('TIMESTAMP', inplace=True)
if self._to_drop:
csv.sink(otq.Passthrough(fields=",".join(self._to_drop), drop_fields="True"))
return csv
class Trades(Source):
"""
Trade source object.
add 'PRICE' and 'SIZE' fields to schema
"""
def __init__(self, db=utils.adaptive_to_default, symbol=utils.adaptive,
date=None,
start=utils.adaptive, end=utils.adaptive, **kwargs):
if db is utils.adaptive_to_default:
db = configuration.config.default_db
if date:
start, end = date.start, date.end
super().__init__(
_symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
)
self.schema['PRICE'] = float
self.schema['SIZE'] = int
def base_ep(self, db):
db = str(db)
src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
src.tick_type(db + "::TRD")
return src
class Quotes(Source):
def __init__(self, db=utils.adaptive_to_default, symbol=utils.adaptive,
start=utils.adaptive, end=utils.adaptive, **kwargs):
if db is utils.adaptive_to_default:
db = configuration.config.default_db
super().__init__(
_symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
)
self.schema['ASK_PRICE'] = float
self.schema['BID_PRICE'] = float
self.schema['ASK_SIZE'] = int
self.schema['BID_SIZE'] = int
def base_ep(self, db):
db = str(db)
src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
src.tick_type(db + "::QTE")
return src
class NBBO(Source):
def __init__(self, db="TAQ_NBBO", symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, **kwargs):
super().__init__(
_symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
)
self.schema['ASK_PRICE'] = float
self.schema['BID_PRICE'] = float
self.schema['ASK_SIZE'] = int
self.schema['BID_SIZE'] = int
def base_ep(self, db):
db = str(db)
src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
src.tick_type(db + "::NBBO")
return src
[docs]class Query(Source):
def __init__(
self,
query_object=None,
out_pin=utils.adaptive,
symbol=utils.adaptive,
start=utils.adaptive,
end=utils.adaptive,
params=None,
**kwargs,
):
"""
Create data source object from .otq file or query object
Parameters
----------
query_object: path or :class:`query`
query to use as a data source
out_pin: str
query output pin name
symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`
Symbol(s) from which data should be taken.
start, end : :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>` or utils.adaptive
Time interval from which the data should be taken.
params: dict
params to pass to query.
Only applicable to string ``query_object``
"""
if self._try_default_constructor(**kwargs):
return
if params is None:
params = {}
# Ignore because of the "Only @runtime_checkable protocols can be used with instance and class checks"
if isinstance(query_object, (str, os.PathLike)): # type: ignore
query_object = query(str(query_object), **params)
elif isinstance(query_object, query):
if len(params) > 0:
raise ValueError("Cannot pass both params and a query() (not str) query_object parameter")
else:
raise ValueError("query_object parameter has to be either a str (path to the query) or a query object")
if symbol == utils.adaptive:
if not query_object.graph_info.has_unbound_sources:
symbol = None
super().__init__(
_symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(query_object, out_pin), **kwargs
)
def base_ep(self, query, out_pin):
nested = otq.NestedOtq(query.path, query.str_params)
graph = query.graph_info
if out_pin is utils.adaptive:
if len(graph.nested_outputs) == 1:
return Source(nested[graph.nested_outputs[0].NESTED_OUTPUT])
elif len(graph.nested_outputs) > 1:
raise Exception(
f'Query "{query.query_name}" has multiple outputs, but you have not '
"specified which one should be used. You could specify it"
' using "out_pin" parameter of the Query constructor.'
)
else:
# no output
return Source(nested, _has_output=False)
else:
existed_out_pins = set(map(operator.attrgetter("NESTED_OUTPUT"), graph.nested_outputs))
if out_pin not in existed_out_pins:
raise Exception(
f'Query "{query.query_name}" does not have the "{out_pin}" output, there are only following '
f"output pins exist: {','.join(existed_out_pins)}"
)
return Source(nested[out_pin])
[docs]class query:
"""
Constructs a query object with a certain path.
Keyword arguments specify query parameters.
You also can pass an instance of ``otp.query.config`` class as the second positional argument to
specify a query.
Parameters
----------
path : str
path to an .otq file.
If path is relative, then it's assumed that file is located in one of the directories
specified in OneTick ``OTQ_FILE_PATH`` configuration variable.
If there are more than one query in the file, then its name should be specified
in the format ``<path>::<query-name>``.
Also prefix ``remote://<database-name>::`` can be used to specify if query is located
on the remote server.
config:
optional ``otp.query.config`` object.
params:
parameters for the query.
Raises
------
ValueError, TypeError
Examples
--------
>>> otp.query('/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14) # doctest: +SKIP
>>> otp.query('remote://DATABASE::/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14) # doctest: +SKIP
"""
[docs] class config:
"""
The config allows to specify different query options.
"""
special_values = {"input"}
def __init__(self, output_columns=None):
"""
Parameters
----------
output_columns : str, list, dict, optional
The parameter defines what the outputs columns are.
Default value is ``None`` that means no output fields after applying query
for every output pin.
The ``input`` value means that output columns are the same as inputs for
every output pin
A list of tuples allows to define output columns with their types;
for example [('x', int), ('y', float), ...]. Applicable for every output
pin.
A dict allows to specify output columns for every output pin.
Raises
------
TypeError, ValueError
"""
if output_columns is not None:
if isinstance(output_columns, list):
self.validate_columns(output_columns)
elif isinstance(output_columns, dict):
for pin, columns in output_columns.items():
if not isinstance(pin, str):
raise TypeError(f"Name of pin '{type(pin)}' is of non-str type '%s'")
else:
self.validate_columns(columns)
elif not isinstance(output_columns, str):
raise TypeError(f'"output_columns" does not support value of the "{type(output_columns)}" type')
if isinstance(output_columns, str):
if output_columns not in self.special_values:
raise ValueError(f'Config does not support "{output_columns}" value')
self.output_columns = output_columns
[docs] def validate_list_item(self, item):
if isinstance(item, str):
if item not in self.special_values:
raise ValueError(f"Value {item} is not supported.")
else:
if not isinstance(item, (tuple, list)) or (len(item) != 2) or not isinstance(item[0], str):
raise TypeError("Value %s is not a name-type tuple.")
[docs] def validate_columns(self, columns):
if isinstance(columns, str):
if columns not in self.special_values:
raise ValueError(f"A pin has invalid output columns definition: '{columns}'")
elif isinstance(columns, list):
if columns.count("input") > 1:
raise ValueError(f"More than one 'input' value in {columns}")
for item in columns:
self.validate_list_item(item)
else:
raise TypeError(f"A pin's columns definition is of unsupported type '{type(columns)}'")
[docs] def get_output_columns_for_pin(self, out_pin_name):
if isinstance(self.output_columns, dict):
if out_pin_name not in self.output_columns:
raise ValueError(f"Pin {out_pin_name} wasn't declared in the config")
else:
return self.output_columns[out_pin_name]
else:
return self.output_columns
[docs] def apply(self, out_pin_name, src):
"""
Applying specified logic on a certain object. Used internally in the functions.apply_query
"""
columns_descriptor = self.get_output_columns_for_pin(out_pin_name)
if columns_descriptor is None:
# drop columns by default, because we don't know
# how an external query changes data schema
src.drop_columns()
elif columns_descriptor == "input":
pass
else:
if "input" not in columns_descriptor:
src.drop_columns()
for item in columns_descriptor:
if item != "input":
src[item]
def __init__(self, path, *config, **params):
path = str(path)
if path.startswith('remote://'):
self.path = path
remote, path = path.split('::', maxsplit=1)
else:
self.path = f"remote://{configuration.config.get('default_db', 'LOCAL')}::" + path
self.query_path, self.query_name = utils.query_to_path_and_name(path)
# if query_path does not exist, then we try
# to resolve it with OTQ_PATH assuming that
# a relative path is passed
if not os.path.exists(self.query_path):
otq_path = utils.get_config_param(os.environ["ONE_TICK_CONFIG"], "OTQ_FILE_PATH", "")
self.query_path = utils.abspath_to_query_by_otq_path(otq_path, self.query_path)
if self.query_name is None:
# it seems that query name was not passed, then try to find it
queries = query_inspector.get_queries(self.query_path)
if len(queries) > 1:
raise Exception(f"{self.query_path} has more than one query, "
f"but you have not specified which one to use.")
self.query_name = queries[0]
# prepare parameters
self._str_params = None
self.params = params
self.update_params()
# prepare configs
if len(config) > 1:
raise ValueError(f"It is allowed to specify only one config object, but passed {len(config)}")
elif len(config) == 1:
if not isinstance(config[0], self.config):
raise TypeError(
f'It is expected to see config of the "query.config" type, but got "{type(config[0])}"'
)
self.config = config[0]
else:
self.config = self.config()
self.graph_info = query_inspector.get_query_info(self.query_path, self.query_name)
def __call__(self, *ticks, **pins):
for key, value in pins.items():
if not isinstance(value, Source):
raise ValueError(f'Input "{key}" pin does not support "{type(value)}" type')
if len(pins) == 0 and len(ticks) == 1:
if len(self.graph_info.nested_inputs) != 1:
raise Exception(
f'It is expected the query "{self.query_path}" to have one input, but it'
f" has {len(self.graph_info.nested_inputs)}"
)
pins[self.graph_info.nested_inputs[0].NESTED_INPUT] = ticks[0]
elif len(pins) > 0 and len(ticks) == 0:
pass
elif len(pins) == 0 and len(ticks) == 0:
# it is the valid case, when query has no input pins
pass
else:
raise ValueError("It is allowed to pass only one non-specified input")
outputs = self._outputs()
outputs.query = self
outputs.in_sources = pins
return outputs
class _outputs(object):
def __getitem__(self, key):
output_pins = []
if type(key) is tuple:
output_pins = list(key)
elif isinstance(key, str):
output_pins = [key]
elif key is None:
# No output
pass
else:
raise ValueError(f'Output pins can not be of "{type(key)}" type')
return onetick.py.functions.apply_query(
self.query, in_sources=self.in_sources, output_pins=output_pins, **self.query.params
)
[docs] def to_eval_string(self):
"""Converts query object to `eval` string"""
res = '"' + self.path + '"'
if self.params:
res += f', "{self._params_to_str(self.params, with_expr=True)}"'
return "eval(" + res + ")"
[docs] def update_params(self, **new_params):
if new_params:
self.params.update(new_params)
@property
def str_params(self):
"""Query parameters converted to string"""
if self._str_params is None:
self._str_params = self._params_to_str(self.params)
return self._str_params
@staticmethod
def _params_to_str(params, *, with_expr=False):
""" converts param to str
Parameters
----------
params: dict
Parameters as dict(name=value)
with_expr:
If true return all expression in expr() function
Returns
-------
result: str
string representation of parameters ready for query evaluation
"""
def to_str(v):
if isinstance(v, list):
return "\\,".join(map(to_str, v))
else:
if with_expr:
is_dt = ott.is_time_type(v)
if is_dt:
v = ott.value2str(v)
result = query._escape_quotes_in_eval(v)
if isinstance(v, _Operation) and getattr(v, "name", None) != "_SYMBOL_NAME" or is_dt:
result = f"expr({result})"
else:
result = query._escape_characters_in_query_param(str(v))
return result
return ",".join(key + "=" + to_str(value) for key, value in params.items())
@staticmethod
def _escape_quotes_in_eval(v):
return str(v).translate(str.maketrans({"'": r"\'", '"': r'\"'}))
@staticmethod
def _escape_characters_in_query_param(result):
# 0 - no need to add backslash, 1 - need to add
char_map = [0] * len(result)
# put 1 between two quotes symbols
open_char = None
last_inx = 0
for inx, c in enumerate(result):
if open_char == c:
open_char = None
continue
if not open_char and c == "'" or c == '"':
open_char = c
last_inx = inx + 1
continue
if open_char:
char_map[inx] = 1
# clean open tail if necessary
if open_char:
char_map[last_inx:] = [0] * (len(result) - last_inx)
# apply mapping
res = []
last_esc = False # do not add esc if the previous one is already esc
n_brackets_in_expr_block = 0 # do not escape in expr(...)
for inx, c in enumerate(result):
if c == "(":
if n_brackets_in_expr_block:
n_brackets_in_expr_block += 1
elif result[inx - 4:inx] == "expr":
n_brackets_in_expr_block = 1
if c == ")" and n_brackets_in_expr_block:
n_brackets_in_expr_block -= 1
if c in _QUERY_PARAM_SPECIAL_CHARACTERS and char_map[inx] == 0:
if not last_esc and not n_brackets_in_expr_block:
c = "\\" + c
last_esc = c == "\\"
res.append(c)
return "".join(res)
class Orders(Source):
def __init__(self, db="S_ORDERS_FIX", symbol=utils.adaptive, start=utils.adaptive, end=utils.adaptive, **kwargs):
super().__init__(
_symbols=symbol, _start=start, _end=end, _base_ep_func=lambda: self.base_ep(db), **kwargs
)
self.schema['ID'] = str
self.schema['BUY_FLAG'] = int
self.schema['SIDE'] = str
self.schema['STATE'] = str
self.schema['ORDTYPE'] = str
self.schema['PRICE'] = float
self.schema['PRICE_FILLED'] = float
self.schema['QTY'] = int
self.schema['QTY_FILLED'] = int
def base_ep(self, db):
db = str(db)
src = Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE", drop_fields=True))
src.tick_type(db + "::ORDER")
return src
_db_doc = param_doc(
name='db',
desc="""
Name(s) of the database or the database object(s).
""",
str_annotation='str, list of str, :class:`otp.DB <onetick.py.DB>`',
default=None,
str_default='None',
)
_symbol_doc = param_doc(
name='symbol',
desc="""
Symbol(s) from which data should be taken.
""",
str_annotation='str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`',
default=utils.adaptive,
str_default=' :py:class:`onetick.py.adaptive`',
)
_symbols_doc = param_doc(
name='symbols',
desc="""
Symbol(s) from which data should be taken.
Alias for ``symbol`` parameter. Will take precedence over it.
""",
str_annotation='str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`',
default=None,
)
_tick_type_doc = param_doc(
name='tick_type',
desc="""
Tick type of the data.
If not specified, all ticks from `db` will be taken.
If ticks can't be found or there are many databases specified in `db` then default is "TRD".
""",
str_annotation='str, list of str',
default=utils.adaptive,
str_default=' :py:class:`onetick.py.adaptive`',
)
_start_doc = param_doc(
name='start',
desc="""
Start of the interval from which the data should be taken.
Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time
limits from the rest of the graph.
""",
str_annotation=(
':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,'
' :py:class:`onetick.py.adaptive`'
),
default=utils.adaptive,
str_default=' :py:class:`onetick.py.adaptive`',
)
_end_doc = param_doc(
name='end',
desc="""
End of the interval from which the data should be taken.
Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time
limits from the rest of the graph.
""",
str_annotation=(
':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,'
' :py:class:`onetick.py.adaptive`'
),
default=utils.adaptive,
str_default=' :py:class:`onetick.py.adaptive`',
)
_date_doc = param_doc(
name='date',
desc="""
Allows to specify a whole day instead of passing explicitly ``start`` and ``end`` parameters.
If it is set along with the ``start`` and ``end`` parameters then last two are ignored.
""",
str_annotation=":class:`datetime.datetime`, :class:`otp.datetime <onetick.py.datetime>`",
default=None,
)
_schema_policy_doc = param_doc(
name='schema_policy',
desc="""
Schema deduction policy:
- 'manual'
The resulting schema is a combination of ``desired_schema`` and database schema.
Compatibility with database schema will not be checked.
- 'manual_strict'
The resulting schema will be exactly ``desired_schema``.
Compatibility with database schema will not be checked.
- 'tolerant'
The resulting schema is a combination of ``desired_schema`` and database schema.
If the database schema can be deduced,
it's checked to be type-compatible with a ``desired_schema``,
and ValueError is raised if checks are failed.
Also, with this policy database is scanned 5 days back to find the schema.
It is useful when database is misconfigured or in case of holidays.
- 'tolerant_strict'
The resulting schema will be ``desired_schema`` if it's not empty.
Otherwise, database schema is used.
If the database schema can be deduced,
it's checked if it lacks fields from the ``desired_schema``
and it's checked to be type-compatible with a ``desired_schema``
and ValueError is raised if checks are failed.
Also, with this policy database is scanned 5 days back to find the schema.
It is useful when database is misconfigured or in case of holidays.
- 'fail'
The same as 'tolerant', but if the database schema can't be deduced, raises an Exception.
- 'fail_strict'
The same as 'tolerant_strict', but if the database schema can't be deduced, raises an Exception.
""",
str_annotation="'tolerant', 'tolerant_strict', 'fail', 'fail_strict', 'manual', 'manual_strict'",
default=None,
)
_guess_schema_doc = param_doc(
name='guess_schema',
desc="""
.. deprecated:: 1.3.16
Use ``schema_policy`` parameter instead.
""",
annotation=bool,
default=None,
)
_identify_input_ts_doc = param_doc(
name='identify_input_ts',
desc="""
If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks.
""",
annotation=bool,
default=False,
)
_back_to_first_tick_doc = param_doc(
name='back_to_first_tick',
desc="""
Determines how far back to go looking for the latest tick before ``start`` time.
If one is found, it is inserted into the output time series with the timestamp set to ``start`` time.
Note: it will be rounded to int, so otp.Millis(999) will be 0 seconds.
""",
str_annotation=('int, :ref:`offset <datetime_offsets>`, '
':class:`otp.expr <onetick.py.expr>`, '
':py:class:`~onetick.py.Operation`'),
default=0,
)
_keep_first_tick_timestamp_doc = param_doc(
name='keep_first_tick_timestamp',
desc="""
If set, new field with this name will be added to source.
This field contains original timestamp of the tick that was taken from before the start time of the query.
For all other ticks value in this field will be equal to the value of Time field.
This parameter is ignored if ``back_to_first_tick`` is not set.
""",
annotation=str,
default=None,
)
_presort_doc = param_doc(
name='presort',
desc="""
Add the presort EP in case of bound symbols.
Applicable only when ``symbols`` is not None.
By default, it is set to True if ``symbols`` are set
and to False otherwise.
""",
annotation=bool,
default=utils.adaptive,
str_default=' :py:class:`onetick.py.adaptive`',
)
_concurrency_doc = param_doc(
name='concurrency',
desc="""
Specifies number of CPU cores to utilize for the ``presort``
By default, the value from otp.config.default_concurrency is used.
""",
annotation=int,
default=None,
)
_batch_size_doc = param_doc(
name='batch_size',
desc="""
Specifies the query batch size for the ``presort``.
By default, the value from otp.config.default_batch_size is used.
""",
annotation=int,
default=None,
)
_desired_schema_doc = param_doc(
name='desired_schema',
desc="""
List of <column name> -> <column type> pairs that the source is expected to have.
If the type is irrelevant, provide None as the type in question.
""",
str_annotation='type[str]',
kind=inspect.Parameter.VAR_KEYWORD,
)
_max_back_ticks_to_prepend_doc = param_doc(
name='max_back_ticks_to_prepend',
desc="""
When the ``back_to_first_tick`` interval is specified, this parameter determines the maximum number
of the most recent ticks before start_time that will be prepended to the output time series.
Their timestamp will be changed to start_time.
""",
annotation=int,
default=1,
)
_where_clause_for_back_ticks_doc = param_doc(
name='where_clause_for_back_ticks',
desc="""
A logical expression that is computed only for the ticks encountered when a query goes back from the start time,
in search of the ticks to prepend. If it returns false, a tick is ignored.
""",
annotation=Raw,
default=None,
)
DATA_SOURCE_DOC_PARAMS = [
_db_doc, _symbol_doc, _tick_type_doc,
_start_doc, _end_doc, _date_doc,
_schema_policy_doc, _guess_schema_doc,
_identify_input_ts_doc,
_back_to_first_tick_doc, _keep_first_tick_timestamp_doc,
_max_back_ticks_to_prepend_doc,
_where_clause_for_back_ticks_doc,
_symbols_doc,
_presort_doc, _batch_size_doc, _concurrency_doc,
_desired_schema_doc,
]
[docs]class DataSource(Source):
POLICY_MANUAL = "manual"
POLICY_MANUAL_STRICT = "manual_strict"
POLICY_TOLERANT = "tolerant"
POLICY_TOLERANT_STRICT = "tolerant_strict"
POLICY_FAIL = "fail"
POLICY_FAIL_STRICT = "fail_strict"
_VALID_POLICIES = frozenset([POLICY_MANUAL, POLICY_MANUAL_STRICT,
POLICY_TOLERANT, POLICY_TOLERANT_STRICT,
POLICY_FAIL, POLICY_FAIL_STRICT])
_PROPERTIES = Source._PROPERTIES + ["_p_db", "_p_strict", "_p_schema"]
def __get_schema(self, db, start, schema_policy):
schema = {}
if start is utils.adaptive:
start = None # means that use the last date with data
if isinstance(db, list):
''' This case of a merge, since we need to get combined schema
across different tick types and dbs '''
for t_db in db:
_db = t_db.split(':')[0]
_tt = t_db.split(':')[-1]
db_obj = onetick.py.db._inspection.DB(_db)
if schema_policy == self.POLICY_TOLERANT and start:
# repeating the same logic as in db_obj.last_date
start = db_obj.last_not_empty_date(start, days_back=5, tick_type=_tt)
schema.update(db_obj.schema(date=start, tick_type=_tt))
if db is None or isinstance(db, _SymbolParamColumn):
''' In this case we can't get schema, because db is calculated dynamically.
Set to empty to indicate that in this case we expect the manually set schema. '''
schema = {}
return schema
def __prepare_schema(self, db, start, end, schema_policy, guess_schema, desired_schema):
if guess_schema is not None:
warnings.warn(
"guess_schema flag is deprecated; use schema_policy argument instead",
DeprecationWarning,
)
if schema_policy is not None:
raise ValueError("guess_schema and schema_policy cannot be set at the same time")
if guess_schema:
schema_policy = self.POLICY_FAIL
else:
schema_policy = self.POLICY_MANUAL
if schema_policy is None:
schema_policy = self.POLICY_TOLERANT
if schema_policy not in self._VALID_POLICIES:
raise ValueError(f"Invalid schema_policy; allowed values are: {self._VALID_POLICIES}")
actual_schema = {}
if schema_policy not in (self.POLICY_MANUAL, self.POLICY_MANUAL_STRICT):
actual_schema = self.__get_schema(db, start, schema_policy)
dbs = ', '.join(db if isinstance(db, list) else [])
if len(actual_schema) == 0:
if schema_policy in (self.POLICY_FAIL, self.POLICY_FAIL_STRICT):
raise Exception(f'No ticks found in database(-s) {dbs}')
# lets try to use at least something
return desired_schema.copy()
for k, v in desired_schema.items():
field_type = actual_schema.get(k, None)
incompatible_types = False
if field_type is None:
if self._p_strict or schema_policy in (self.POLICY_TOLERANT, self.POLICY_FAIL):
raise ValueError(f"Database(-s) {dbs} schema has no {k} field")
elif issubclass(field_type, str) and issubclass(v, str):
field_length = ott.string.DEFAULT_LENGTH
if issubclass(field_type, ott.string):
field_length = field_type.length
v_length = ott.string.DEFAULT_LENGTH
if issubclass(v, ott.string):
v_length = v.length
if issubclass(field_type, ott.varstring):
if not issubclass(v, ott.varstring):
incompatible_types = True
elif not issubclass(v, ott.varstring) and v_length < field_length:
incompatible_types = True
elif not issubclass(field_type, v):
incompatible_types = True
if incompatible_types:
error_message = f"Database(-s) {dbs} schema field {k} has type {field_type}, but {v} was requested"
if field_type in (str, ott.string) or v in (str, ott.string):
error_message = f"{error_message}. Notice, that `str` and `otp.string` lengths are 64"
raise ValueError(error_message)
if not self._p_strict:
desired_schema.update(actual_schema)
table_schema = desired_schema.copy()
if not self._p_strict:
# in this case we will table only fields specified by user
table_schema = {
k: v for k, v in table_schema.items() if k not in actual_schema
}
return table_schema
def __prepare_dates(self, date):
if isinstance(date, ott.datetime) or isinstance(date, ott.date):
start = date.start
end = date.end
if isinstance(date, dt.datetime) or isinstance(date, dt.date):
start = dt.datetime(date.year, date.month, date.day)
end = start + dt.timedelta(days=1, milliseconds=-1)
return start, end
def __prepare_db_tick_type(self, db, tick_type, symbol, start, end):
if isinstance(db, list):
''' If everything is correct then this branch should leave
the `db` var as a list of databases with tick types and the
`tick_type` var is None.
Valid cases:
- Fully defined case. The `db` parameter has a list of databases where
every database has a tick type, when the `tick_type`
parameter has default value or None (for backward compatibility)
- Partially defined case. The `db` parameter has a list of databases but
not every database has a tick type, and meantime the `tick_type`
is passed to not None value. In that case databases without tick type
are exetended with a tick type from the `tick_type` parameter
- No defined case. The `db` parameter has a list of databases and
every database there has no tick type, and the `tick_type` is
set to not None value. In that case every database is extended with
the tick type from the `tick_type`.
'''
def db_converter(_db):
if isinstance(_db, DB):
return _db.name
else:
return _db
db = [db_converter(_db) for _db in db]
res = all(('::' in _db and _db[-1] != ':' for _db in db))
if res:
if tick_type is utils.adaptive or tick_type is None:
tick_type = None # tick types is specified for all databases
else:
raise Exception('The `tick_type` is set as a parameter '
'and also as a part of the `db` parameter'
'for every database')
else:
dbs_without_tt = [_db.split(':')[0] for _db in db
if '::' not in _db or _db[-1] == ':']
if tick_type is utils.adaptive:
tick_type = 'TRD' # default one for backward compatibility and testing usecase
if tick_type is None:
raise Exception('The tick type is not set for databases: ' +
', '.join(dbs_without_tt))
else:
# extend databases with missing tick types from the tick tick parameter
dbs_with_tt = [_db for _db in db
if '::' in _db and _db[-1] != ':']
db = dbs_with_tt + [_db + '::' + tick_type for _db in dbs_without_tt]
tick_type = None
if isinstance(db, (DB, inspect_DB)):
db = db.name # ... and we go to the next branch
if isinstance(db, str):
''' The resulting `db` var contains a list with string value, that has the `db`
concatenated with the `tick_type`. '''
if '::' in db:
if tick_type is utils.adaptive or tick_type is None:
tick_type = db.split(':')[-1]
db = db.split('::')[0]
else:
raise Exception('The `tick_type` is set as a parameter '
'and also as a part of the `db` parameter')
else:
if tick_type is utils.adaptive or tick_type is None:
db_obj = onetick.py.db._inspection.DB(db)
# try to find at least one common tick type
# through all days
tick_types = None
if start is utils.adaptive:
start = db_obj.last_date
end = db_obj.last_date
if start and end: # could be None if there is no data
t_start = start
while t_start <= end:
t_tts = set(db_obj.tick_types(t_start))
t_start += dt.timedelta(days=1)
if len(t_tts) == 0:
continue
if tick_types is None:
tick_types = t_tts
else:
tick_types &= t_tts
if len(tick_types) == 0:
raise Exception(f'It seems that there is no common '
f'tick types for dates from {start} '
f'to {end}. Please specify a tick '
'type')
if tick_types is None:
if tick_type is utils.adaptive:
tick_types = ['TRD'] # the default one
else:
raise Exception(f'Could not find any data in from {start} '
f' to {end}. Could you check that tick type, '
' database and date range are correct.')
if len(tick_types) != 1:
raise Exception('The tick type is not specified, found '
'multiple tick types in the database : ' +
', '.join(tick_types))
tick_type = tick_types.pop()
if not isinstance(tick_type, str) and isinstance(tick_type, Iterable):
db = [f'{db}::{tt}' for tt in tick_type]
else:
db = [db + '::' + tick_type]
tick_type = None
if isinstance(db, _SymbolParamColumn):
''' Do nothing, because we don't know whether db will come with the tick
type or not. The only one thing that definetely we know that tick_type
can not be utils.adpative '''
if tick_type is utils.adaptive:
# TODO: need to test this case
raise Exception('The `db` is set to the symbol param, in that case '
'the `tick_type` should be set explicitly to some value '
'or to None')
if db is None:
''' This case means that database comes with the symbol name, then tick type
should be defined '''
if tick_type is utils.adaptive or tick_type is None:
raise Exception('The `db` is not specified that means database is '
'expected to be defined with the symbol name. '
'In that case the `tick_type` should be defined.')
if not isinstance(tick_type, str) and isinstance(tick_type, Iterable):
tick_type = '+'.join(tick_type)
return db, tick_type
@docstring(parameters=DATA_SOURCE_DOC_PARAMS, add_self=True)
def __init__(
self,
db=None,
symbol=utils.adaptive,
tick_type=utils.adaptive,
start=utils.adaptive,
end=utils.adaptive,
date=None,
schema_policy=None,
guess_schema=None,
identify_input_ts=False,
back_to_first_tick=0,
keep_first_tick_timestamp=None,
max_back_ticks_to_prepend=1,
where_clause_for_back_ticks=None,
symbols=None,
presort=utils.adaptive,
batch_size=None,
concurrency=None,
**desired_schema,
):
"""
Construct a source providing data from a given ``db``.
Examples
---------
Symbol can be a collection
>>> # OTdirective: snippet-name:fetch data.simple;
>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2'])
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.000 -3
2 2003-12-01 00:00:00.001 2
3 2003-12-01 00:00:00.001 -2
4 2003-12-01 00:00:00.002 3
5 2003-12-01 00:00:00.002 -1
Source also can be passed as symbols, in such case magic named column SYMBOL_NAME will be transform to symbol
and all other columns will be symbol parameters
>>> # OTdirective: snippet-name:fetch data.symbols as a source;
>>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'])
>>> data = otp.DataSource(db='SOME_DB', symbols=symbols, tick_type='TT')
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.000 -3
2 2003-12-01 00:00:00.001 2
3 2003-12-01 00:00:00.001 -2
4 2003-12-01 00:00:00.002 3
5 2003-12-01 00:00:00.002 -1
Default schema policy is **tolerant**.
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float, date=otp.dt(2022, 3, 1))
>>> data.schema
{'PRICE': <class 'float'>, 'SIZE': <class 'int'>}
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=int, date=otp.dt(2022, 3, 1))
Traceback (most recent call last):
...
ValueError: Database(-s) NYSE_TAQ::TRD schema field PRICE has type <class 'float'>,
but <class 'int'> was requested
Schema policy **manual** uses exactly ``desired_schema``:
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float,
... date=otp.dt(2022, 3, 1), schema_policy='manual')
>>> data.schema
{'PRICE': <class 'float'>}
Schema policy **fail** raises an exception if the schema cannot be deduced:
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2021, 3, 1),
... schema_policy='fail')
Traceback (most recent call last):
...
Exception: No ticks found in database(-s) NYSE_TAQ::TRD
``back_to_first_tick`` sets how far back to go looking for the latest tick before ``start`` time:
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
... back_to_first_tick=otp.Day(1))
>>> otp.run(data)
Time PRICE SIZE
0 2022-03-02 00:00:00.000 1.4 50
1 2022-03-02 00:00:00.000 1.0 100
2 2022-03-02 00:00:00.001 1.1 101
3 2022-03-02 00:00:00.002 1.2 102
``keep_first_tick_timestamp`` allows to show the original timestamp of the tick that was taken from before
the start time of the query:
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
... back_to_first_tick=otp.Day(1), keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
>>> otp.run(data)
Time PRICE SIZE ORIGIN_TIMESTAMP
0 2022-03-02 00:00:00.000 1.4 50 2022-03-01 00:00:00.002
1 2022-03-02 00:00:00.000 1.0 100 2022-03-02 00:00:00.000
2 2022-03-02 00:00:00.001 1.1 101 2022-03-02 00:00:00.001
3 2022-03-02 00:00:00.002 1.2 102 2022-03-02 00:00:00.002
``max_back_ticks_to_prepend`` is used with ``back_to_first_tick``
if more than 1 ticks before start time should be retrieved:
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
... max_back_ticks_to_prepend=2, back_to_first_tick=otp.Day(1),
... keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
>>> otp.run(data)
Time PRICE SIZE ORIGIN_TIMESTAMP
0 2022-03-02 00:00:00.000 1.4 10 2022-03-01 00:00:00.001
1 2022-03-02 00:00:00.000 1.4 50 2022-03-01 00:00:00.002
2 2022-03-02 00:00:00.000 1.0 100 2022-03-02 00:00:00.000
3 2022-03-02 00:00:00.001 1.1 101 2022-03-02 00:00:00.001
4 2022-03-02 00:00:00.002 1.2 102 2022-03-02 00:00:00.002
``where_clause_for_back_ticks`` is used to filter out ticks before the start time:
>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
... where_clause_for_back_ticks=otp.raw('SIZE>=50', dtype=bool),
... back_to_first_tick=otp.Day(1), max_back_ticks_to_prepend=2,
... keep_first_tick_timestamp='ORIGIN_TIMESTAMP') # doctest: +SKIP
>>> otp.run(data) # doctest: +SKIP
Time PRICE SIZE ORIGIN_TIMESTAMP
0 2022-03-02 00:00:00.000 1.3 100 2022-03-01 00:00:00.000
1 2022-03-02 00:00:00.000 1.4 50 2022-03-01 00:00:00.002
2 2022-03-02 00:00:00.000 1.0 100 2022-03-02 00:00:00.000
3 2022-03-02 00:00:00.001 1.1 101 2022-03-02 00:00:00.001
4 2022-03-02 00:00:00.002 1.2 102 2022-03-02 00:00:00.002
"""
if self._try_default_constructor(**desired_schema):
return
# for cases when we want to explicitly convert into string,
# it might be symbol param or join_with_query parameter
if isinstance(tick_type, _ParamColumn):
tick_type = str(tick_type)[1:-1]
if date:
# TODO: write a warning in that case
start, end = self.__prepare_dates(date)
db, tick_type = self.__prepare_db_tick_type(db,
tick_type,
symbol,
start,
end)
self._p_db = db
self._p_strict = schema_policy in (self.POLICY_FAIL_STRICT,
self.POLICY_TOLERANT_STRICT,
self.POLICY_MANUAL_STRICT)
self._p_schema = self.__prepare_schema(db, # tick type is embedded into the db
start,
end,
schema_policy,
guess_schema,
desired_schema)
if symbols is not None:
if symbol is utils.adaptive or symbol is None:
symbol = symbols
else:
# TODO: test it
raise Exception('You have set the `symbol` and `symbols` parameters'
'together, it is not allowed. Please, clarify parameters')
if isinstance(symbol, Symbols) and symbol._p_db is None:
symbol = Symbols.duplicate(symbol, db=db)
if identify_input_ts:
if "SYMBOL_NAME" in desired_schema:
raise Exception() # TODO: think about how user could workaround it
desired_schema["SYMBOL_NAME"] = str
if "TICK_TYPE" in desired_schema:
raise Exception()
desired_schema["TICK_TYPE"] = str
# unobvious way to convert otp.Minute/Hour/... to number of seconds
if type(back_to_first_tick).__name__ == '_DatePartCls':
back_to_first_tick = int((ott.dt(0) + back_to_first_tick).timestamp())
if isinstance(back_to_first_tick, _Operation):
back_to_first_tick = otp.expr(back_to_first_tick)
if back_to_first_tick != 0 and keep_first_tick_timestamp:
desired_schema[keep_first_tick_timestamp] = ott.nsectime
if max_back_ticks_to_prepend < 1:
raise ValueError(f'`max_back_ticks_to_prepend` must be at least 1 '
f'but {max_back_ticks_to_prepend} was passed')
if where_clause_for_back_ticks is not None:
if not isinstance(where_clause_for_back_ticks, Raw):
raise ValueError(f'Currently only otp.raw is supported for `where_clause_for_back_ticks` '
f'but {type(where_clause_for_back_ticks)} was passed')
if where_clause_for_back_ticks.dtype is not bool:
raise ValueError(f'Only bool dtype for otp.raw in `where_clause_for_back_ticks` is supported '
f'but {where_clause_for_back_ticks.dtype} was passed')
where_clause_for_back_ticks = str(where_clause_for_back_ticks)
if (
isinstance(symbol, Source)
or hasattr(symbol, "__iter__")
and not isinstance(symbol, dict)
and not isinstance(symbol, str)
or isinstance(symbol, query)
or isinstance(symbol, _QueryEvalWrapper)
):
super().__init__(
_start=start,
_end=end,
_base_ep_func=lambda: self._base_ep_for_cross_symbol(
db, symbol, tick_type,
identify_input_ts=identify_input_ts,
back_to_first_tick=back_to_first_tick,
keep_first_tick_timestamp=keep_first_tick_timestamp,
presort=presort, batch_size=batch_size, concurrency=concurrency,
max_back_ticks_to_prepend=max_back_ticks_to_prepend,
where_clause_for_back_ticks=where_clause_for_back_ticks,
),
**desired_schema
)
else:
super().__init__(
_symbols=symbol,
_start=start,
_end=end,
_base_ep_func=lambda: self.base_ep(
db,
tick_type,
identify_input_ts=identify_input_ts,
back_to_first_tick=back_to_first_tick,
keep_first_tick_timestamp=keep_first_tick_timestamp,
max_back_ticks_to_prepend=max_back_ticks_to_prepend,
where_clause_for_back_ticks=where_clause_for_back_ticks,
),
**desired_schema
)
@property
def db(self):
return self._p_db
@staticmethod
def _create_source(passthrough_ep, back_to_first_tick=0, keep_first_tick_timestamp=None):
"""Create graph that save original timestamp of first tick if needed"""
if back_to_first_tick != 0 and keep_first_tick_timestamp:
src = Source(otq.Passthrough())
src.sink(otq.AddField(field=keep_first_tick_timestamp, value='TIMESTAMP'))
src.sink(passthrough_ep)
return src
return Source(passthrough_ep)
def _table_schema(self, src):
return src.table(**self._p_schema, strict=self._p_strict)
def base_ep(
self,
db,
tick_type,
identify_input_ts,
back_to_first_tick=0,
keep_first_tick_timestamp=None,
max_back_ticks_to_prepend=1,
where_clause_for_back_ticks=None,
):
if db is not None:
if isinstance(db, list):
str_db = "+".join(db)
else:
str_db = str(db)
if tick_type:
if isinstance(db, _SymbolParamColumn):
str_db = f"expr({str_db} + '::{tick_type}')" # TODO: test
else:
if "::" not in str_db:
str_db += "::" + tick_type
else:
if isinstance(db, _SymbolParamColumn):
str_db = f"expr({str_db})" # TODO: test
else:
str_db = tick_type
params = dict(
go_back_to_first_tick=back_to_first_tick,
max_back_ticks_to_prepend=max_back_ticks_to_prepend,
)
if where_clause_for_back_ticks is not None:
params['where_clause_for_back_ticks'] = where_clause_for_back_ticks
if isinstance(db, list) or isinstance(db, _SymbolParamColumn):
src = self._create_source(otq.Passthrough(**params),
back_to_first_tick=back_to_first_tick,
keep_first_tick_timestamp=keep_first_tick_timestamp)
src.sink(otq.Merge(identify_input_ts=identify_input_ts))
else:
if identify_input_ts:
params["fields"] = "SYMBOL_NAME,TICK_TYPE"
params["drop_fields"] = True
src = self._create_source(otq.Passthrough(**params),
back_to_first_tick=back_to_first_tick,
keep_first_tick_timestamp=keep_first_tick_timestamp)
src.tick_type(str_db)
src = self._table_schema(src)
return src
def _base_ep_for_cross_symbol(
self, db, symbol, tick_type, identify_input_ts,
back_to_first_tick=0, keep_first_tick_timestamp=None,
presort=utils.adaptive, batch_size=None, concurrency=None,
max_back_ticks_to_prepend=1,
where_clause_for_back_ticks=None,
):
tmp_otq = TmpOtq()
if isinstance(symbol, _QueryEvalWrapper):
symbol = symbol.to_eval_string(tmp_otq=tmp_otq)
elif isinstance(symbol, query):
symbol = symbol.to_eval_string()
elif isinstance(symbol, Source):
symbol = self._convert_symbol_to_string(symbol, tmp_otq)
if db is not None:
if isinstance(db, list):
tick_type = "+".join(db)
else:
tick_type = f"{db}::{tick_type}"
kwargs = dict(
go_back_to_first_tick=back_to_first_tick,
max_back_ticks_to_prepend=max_back_ticks_to_prepend,
)
if where_clause_for_back_ticks is not None:
kwargs['where_clause_for_back_ticks'] = where_clause_for_back_ticks
src = self._create_source(otq.Passthrough(**kwargs),
back_to_first_tick=back_to_first_tick,
keep_first_tick_timestamp=keep_first_tick_timestamp)
if presort is utils.adaptive:
presort = True
if presort:
if batch_size is None:
batch_size = otp.config.default_batch_size
if concurrency is None:
concurrency = (
otp.config.default_concurrency
if otp.config.default_concurrency is not None
# otq.Presort does not support None
else ''
)
src.sink(
otq.Presort(batch_size=batch_size, max_concurrency=concurrency).symbols(symbol).tick_type(tick_type)
)
src.sink(otq.Merge(identify_input_ts=identify_input_ts))
else:
src.sink(
otq.Merge(identify_input_ts=identify_input_ts).symbols(symbol).tick_type(tick_type)
)
src._tmp_otq.merge(tmp_otq)
src = self._table_schema(src)
return src
Custom = DataSource # for backward compatiblity, previously we had only Custom
[docs]class Symbols(Source):
"""
Construct a source that returns ticks with information about symbols in a database.
The SYMBOL_NAME field is populated with symbol names. The TICK_TYPE field contains
corresponding tick type (enabled by the ``show_tick_type`` parameter).
Parameters
----------
db: str
Name of the database where to search symbols
tick_type: str
Tick type to use. Default is `ANY`
start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \
:py:class:`onetick.py.adaptive`
Time interval from which the data should be taken.
date: :py:class:`datetime.date`
Alternative way of setting instead of start/end times
keep_db: bool
Flag that indicates whether symbols should have a db prefix.
pattern: str
SQL syntax patter for symbols. Default is '%'
for_tick_type: str
Fetch only symbols belong to this tick type, if specified.
show_tick_type: bool
Add the TICK_TYPE column with the information about tick type
symbology: str
The destination symbology for a symbol name translation.
Translation is performed, if destination symbology is not empty
and is different from that of the queried database.
show_original_symbols: bool
Switches original symbol name propagation as a tick field ORIGINAL_SYMBOL_NAME
if symbol name translation is performed (if `symbology` is set).
Note that if this parameter is set to True,
database symbols with missing translations are also propagated.
Note
----
Additional fields that can be added to Symbols will be converted to symbol parameters
See also
--------
| :ref:`Symbols guide <Symbols>`
| **FIND_DB_SYMBOLS** OneTick event processor
Examples
--------
This class can be used to get a list of all symbols in the database:
>>> symbols = otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1))
>>> otp.run(symbols)
Time SYMBOL_NAME
0 2022-03-01 AAP
1 2022-03-01 AAPL
Also this class can be used to specify symbols for the main query:
>>> symbols = otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1))
>>> data = otp.DataSource('NYSE_TAQ', tick_type='TRD', date=otp.dt(2022, 3, 1))
>>> result = otp.run(data, symbols=symbols)
>>> result['AAPL']
Time PRICE SIZE
0 2022-03-01 00:00:00.000 1.3 100
1 2022-03-01 00:00:00.001 1.4 10
2 2022-03-01 00:00:00.002 1.4 50
>>> result['AAP']
Time PRICE
0 2022-03-01 00:00:00.000 45.37
1 2022-03-01 00:00:00.001 45.41
Additional fields of the ``otp.Symbols`` can be used in the main query as symbol parameters:
>>> symbols = otp.Symbols('SOME_DB', show_tick_type=True, keep_db=True)
>>> symbols['PARAM'] = symbols['SYMBOL_NAME'] + '__' + symbols['TICK_TYPE']
>>> data = otp.DataSource('SOME_DB')
>>> data['S_PARAM'] = data.Symbol.PARAM
>>> data = otp.merge([data], symbols=symbols)
>>> otp.run(data)
Time X S_PARAM
0 2003-12-01 00:00:00.000 1 SOME_DB::S1__TT
1 2003-12-01 00:00:00.000 -3 SOME_DB::S2__TT
2 2003-12-01 00:00:00.001 2 SOME_DB::S1__TT
3 2003-12-01 00:00:00.001 -2 SOME_DB::S2__TT
4 2003-12-01 00:00:00.002 3 SOME_DB::S1__TT
5 2003-12-01 00:00:00.002 -1 SOME_DB::S2__TT
"""
_PROPERTIES = Source._PROPERTIES + ["_p_db",
"_p_pattern",
"_p_start",
"_p_end",
"_p_for_tick_type",
"_p_keep_db"]
def __init__(
self,
db=None,
tick_type="ANY",
start=utils.adaptive,
end=utils.adaptive,
date=None,
find_params=None,
keep_db=False,
pattern='%',
for_tick_type=None,
show_tick_type=False,
symbology='',
show_original_symbols=False,
**kwargs
):
if self._try_default_constructor(**kwargs):
return
self._p_db = db
self._p_pattern = pattern
self._p_start = start
self._p_end = end
self._p_keep_db = keep_db
self._p_for_tick_type = for_tick_type
if date:
if isinstance(date, ott.datetime) or isinstance(date, ott.date):
start = date.start
end = date.end
_symbol = utils.adaptive
if db:
if isinstance(db, list):
_symbol = [f"{str(_db).split(':')[0]}::" for _db in db] # noqa
else:
_symbol = f"{str(db).split(':')[0]}::" # noqa
_find_params = find_params if find_params is not None else {}
_find_params.setdefault('pattern', pattern)
if for_tick_type:
_find_params['tick_type_field'] = for_tick_type
_find_params.setdefault('show_tick_type', show_tick_type)
_find_params.setdefault('symbology', symbology)
_find_params.setdefault('show_original_symbols', show_original_symbols)
super().__init__(
_symbols=_symbol,
_start=start,
_end=end,
_base_ep_func=lambda: self.base_ep(ep_tick_type=tick_type, keep_db=keep_db, **_find_params),
)
self.schema['SYMBOL_NAME'] = str
if _find_params['show_tick_type']:
self.schema['TICK_TYPE'] = str
if _find_params['symbology'] and _find_params['show_original_symbols']:
self.schema['ORIGINAL_SYMBOL_NAME'] = str
def base_ep(self, ep_tick_type, keep_db, **params):
src = Source(otq.FindDbSymbols(**params))
src.tick_type(ep_tick_type)
src.schema['SYMBOL_NAME'] = str
if not keep_db:
src["SYMBOL_NAME"] = src["SYMBOL_NAME"].str.regex_replace('.*::', '')
return src
@staticmethod
def duplicate(obj, db=None):
return Symbols(db=obj._p_db if db is None else db,
pattern=obj._p_pattern,
start=obj._p_start,
end=obj._p_end,
keep_db=obj._p_keep_db,
for_tick_type=obj._p_for_tick_type)
def default_date_converter(date):
return pd.to_datetime(date, format='%Y%m%d%H%M%S.%f')
def to_timestamp_nanos(date, date_converter, tz):
date = date_converter(date)
if isinstance(date, ott.dt):
date = date.ts
else:
date = pd.to_datetime(date)
return date.tz_localize(tz)
def LocalCSVTicks(path,
start=utils.adaptive,
end=utils.adaptive,
date_converter=default_date_converter,
additional_date_columns=None,
converters=None,
tz=None,
):
"""
Loads ticks from csv file, and creating otp.Ticks object from them
Parameters
----------
path: str
Absolute path to csv file
start: datetime object
Start of the query interval
end: datetime object
End of the query interval
date_converter:
A converter from string to datetime format, by default used only to TIMESTAMP column
additional_date_columns:
Other columns to convert to datetime format
converters:
Non default converters to columns from strings
tz:
timezone
Returns
-------
otp.Ticks
"""
if tz is None:
tz = configuration.config.tz
c = {'TIMESTAMP': partial(to_timestamp_nanos, date_converter=date_converter, tz=tz)}
if converters is not None:
c.update(converters)
if additional_date_columns is not None:
c.update({column: partial(to_timestamp_nanos,
date_converter=date_converter,
tz=tz,
) for column in additional_date_columns})
df = pd.read_csv(path, converters=c)
df['TS_'] = df['TIMESTAMP']
df['SYMBOL_NAME'] = df['#SYMBOL_NAME']
d = df.to_dict(orient='list')
del d['TIMESTAMP']
del d['#SYMBOL_NAME']
ticks = Ticks(d, start=start, end=end)
ticks['TIMESTAMP'] = ticks['TS_']
ticks = ticks.drop('TS_')
return ticks
class SymbologyMapping(Source):
_PROPERTIES = Source._PROPERTIES + ["_p_dest_symbology"]
def __init__(self,
dest_symbology: str = None,
tick_type: str = None,
start=utils.adaptive,
end=utils.adaptive,
symbols=utils.adaptive,
**desired_schema):
if self._try_default_constructor(**desired_schema):
return
if not dest_symbology or not tick_type:
raise TypeError("Missing required argument: 'dest_symbology' or 'tick_type'")
self._p_dest_symbology = dest_symbology
super().__init__(
_symbols=symbols,
_start=start,
_end=end,
_base_ep_func=lambda: self.base_ep(dest_symbology, tick_type),
**desired_schema
)
self.schema['MAPPED_SYMBOL_NAME'] = str
self.schema['END_DATETIME'] = ott.nsectime
@property
def dest_symbology(self):
return self._p_dest_symbology
def base_ep(self, dest_symbology, tick_type):
src = Source(otq.SymbologyMapping(dest_symbology=dest_symbology))
src.tick_type(tick_type)
return src
class SplitQueryOutputBySymbol(Source):
def __init__(self,
query=None,
symbol_field=None,
single_invocation=False,
db=utils.adaptive_to_default,
tick_type=utils.adaptive,
start=utils.adaptive,
end=utils.adaptive,
symbols=utils.adaptive,
**desired_schema):
if self._try_default_constructor(**desired_schema):
return
if isinstance(query, Source): # TODO: support already existing queries
query = query.copy()
otq_query = query._save_as_tmp_otq()
q_start, q_end, _ = query._get_date_range()
if start is utils.adaptive and end is utils.adaptive:
start, end = q_start, q_end
else:
raise Exception('Non supported type of the `query` is specified')
if db is utils.adaptive_to_default:
db = configuration.config.get('default_db')
if tick_type is utils.adaptive:
tick_type = 'SPLIT_BY_SYMBOL'
super().__init__(
_symbols=symbols,
_start=start,
_end=end,
_base_ep_func=partial(self.build, db, tick_type, symbol_field, otq_query, single_invocation),
**desired_schema
)
def build(self, db, tick_type, symbol_field_name, otq_query, single_invocation):
src = Source(otq.SplitQueryOutputBySymbol(otq_query=otq_query,
symbol_field_name=str(symbol_field_name),
ensure_single_invocation=single_invocation))
if db:
tick_type = str(db) + f'::{tick_type}'
src.tick_type(tick_type)
return src
[docs]def by_symbol(src: Source,
symbol_field,
single_invocation=False,
db=utils.adaptive_to_default,
tick_type=utils.adaptive,
start=utils.adaptive,
end=utils.adaptive,
) -> Source:
"""
Create a separate data series for each unique value of ``symbol_field`` in the output of ``src``.
``src`` must specify enough parameters to be run (e.g., symbols, query range). A typical use case is to split a
single data series (e.g., from a CSV file) into separate data series by symbol. This method is a source.
Parameters
----------
src: Source
a query which output is to be split by ``symbol_field``
symbol_field: str
the name of the field carrying symbol name in the ``src`` query
single_invocation: bool, optional
``True`` means that the ``src`` query is run once and the result stored in memory speeding up the execution.
``False`` means that the ``src`` query is run for every symbol of the query saving memory
but slowing down query execution.
Default: ``False``
db: str, optional
Database for running the query. Doesn't affect the ``src`` query. The default value
is ``otp.config['default_db']``.
tick_type: str, optional
Tick type for the query. Doesn't affect the ``src`` query.
start: otp.dt, optional
By default it is taken from the ``src`` start time
end: otp.dt, optional
By default it is taken from the ``src`` end time
See also
--------
**SPLIT_QUERY_OUTPUT_BY_SYMBOL** OneTick event processor
Examples
--------
>>> executions = otp.CSV( # doctest: +SKIP
... otp.utils.file(os.path.join(cur_dir, 'data', 'example_events.csv')),
... converters={"time_number": lambda c: c.apply(otp.nsectime)},
... timestamp_name="time_number",
... start=otp.dt(2022, 7, 1),
... end=otp.dt(2022, 7, 2),
... order_ticks=True
... )[['stock', 'px']]
>>> csv = otp.by_symbol(executions, 'stock') # doctest: +SKIP
>>> trd = otp.DataSource( # doctest: +SKIP
... db='NYSE_TAQ',
... tick_type='TRD',
... start=otp.dt(2022, 7, 1),
... end=otp.dt(2022, 7, 2)
... )[['PRICE', 'SIZE']]
>>> data = otp.funcs.join_by_time([csv, trd]) # doctest: +SKIP
>>> result = otp.run(data, symbols=executions.distinct(keys='stock')[['stock']], concurrency=8) # doctest: +SKIP
>>> result['THG'] # doctest: +SKIP
Time stock px PRICE SIZE
0 2022-07-01 11:37:56.432947200 THG 148.02 146.48 1
>>> result['TFX'] # doctest: +SKIP
Time stock px PRICE SIZE
0 2022-07-01 11:39:45.882808576 TFX 255.61 251.97 1
>>> result['BURL'] # doctest: +SKIP
Time stock px PRICE SIZE
0 2022-07-01 11:42:35.125718016 BURL 137.53 135.41 2
"""
result = SplitQueryOutputBySymbol(src,
symbol_field=symbol_field,
single_invocation=single_invocation,
db=db,
tick_type=tick_type,
start=start,
end=end)
result.schema.set(**src.schema)
return result
[docs]@docstring(parameters=OB_SNAPSHOT_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS)
def ObSnapshot(*args, **kwargs):
"""
Construct a source providing order book snapshot for a given ``db``.
This is just a shortcut for otp.DataSource + otp.agg.ob_snapshot.
See also
--------
| :class:`onetick.py.DataSource`
| :meth:`onetick.py.Source.ob_snapshot`
| :func:`onetick.py.agg.ob_snapshot`
| **OB_SNAPSHOT** OneTick event processor
Examples
---------
>>> data = otp.ObSnapshot(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP
>>> otp.run(data) # doctest: +SKIP
Time PRICE UPDATE_TIME SIZE LEVEL BUY_SELL_FLAG
0 2003-12-04 2.0 2003-12-01 00:00:00.003 6 1 1
1 2003-12-04 5.0 2003-12-01 00:00:00.004 7 1 0
"""
aggregation_params = {
param.name: kwargs.pop(param.name, param.default)
for _, param in OB_SNAPSHOT_DOC_PARAMS
}
src = otp.DataSource(*args, **kwargs)
return otp.agg.ob_snapshot(**aggregation_params).apply(src)
[docs]@docstring(parameters=OB_SNAPSHOT_WIDE_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS)
def ObSnapshotWide(*args, **kwargs):
"""
Construct a source providing order book wide snapshot for a given ``db``.
This is just a shortcut for otp.DataSource + otp.agg.ob_snapshot_wide.
See also
--------
| :class:`onetick.py.DataSource`
| :meth:`onetick.py.Source.ob_snapshot_wide`
| :func:`onetick.py.agg.ob_snapshot_wide`
| **OB_SNAPSHOT_WIDE** OneTick event processor
Examples
---------
>>> data = otp.ObSnapshotWide(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP
>>> otp.run(data) # doctest: +SKIP
Time BID_PRICE BID_UPDATE_TIME BID_SIZE ASK_PRICE ASK_UPDATE_TIME ASK_SIZE LEVEL
0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 1
"""
aggregation_params = {
param.name: kwargs.pop(param.name, param.default)
for _, param in OB_SNAPSHOT_WIDE_DOC_PARAMS
}
src = otp.DataSource(*args, **kwargs)
return otp.agg.ob_snapshot_wide(**aggregation_params).apply(src)
[docs]@docstring(parameters=OB_SNAPSHOT_FLAT_DOC_PARAMS + DATA_SOURCE_DOC_PARAMS)
def ObSnapshotFlat(*args, **kwargs):
"""
Construct a source providing order book flat snapshot for a given ``db``.
This is just a shortcut for otp.DataSource + otp.agg.ob_snapshot_flat.
See also
--------
| :class:`onetick.py.DataSource`
| :meth:`onetick.py.Source.ob_snapshot_flat`
| :func:`onetick.py.agg.ob_snapshot_flat`
| **OB_SNAPSHOT_FLAT** OneTick event processor
Examples
---------
>>> data = otp.ObSnapshotFlat(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP
>>> otp.run(data) # doctest: +SKIP
Time BID_PRICE1 BID_UPDATE_TIME1 BID_SIZE1 ASK_PRICE1 ASK_UPDATE_TIME1 ASK_SIZE1
0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6
"""
aggregation_params = {
param.name: kwargs.pop(param.name, param.default)
for _, param in OB_SNAPSHOT_FLAT_DOC_PARAMS
}
src = otp.DataSource(*args, **kwargs)
return otp.agg.ob_snapshot_flat(**aggregation_params).apply(src)